#!/usr/bin/python
# -*- coding: utf-8 -*-
"""test_base_workflow."""
import datetime
import os
from pDESy.model.base_component import BaseComponent
from pDESy.model.base_facility import BaseFacility
from pDESy.model.base_task import BaseTask, BaseTaskDependency, BaseTaskState
from pDESy.model.base_worker import BaseWorker
from pDESy.model.base_workflow import BaseWorkflow
import pytest
[docs]
@pytest.fixture
def dummy_workflow(scope="function"):
"""dummy_workflow."""
task1 = BaseTask("task1")
task2 = BaseTask("task2")
task3 = BaseTask("task3")
task4 = BaseTask("task4")
task5 = BaseTask("task5")
task3.extend_input_task_list([task1, task2])
task5.extend_input_task_list([task3, task4])
w = BaseWorkflow(task_list=[task1, task2, task3, task4, task5])
return w
[docs]
@pytest.fixture
def SS_workflow(scope="function"):
"""SS_workflow.
Simple workflow including StartToStart dependency.
"""
task1 = BaseTask("task1")
task2 = BaseTask("task2")
task3 = BaseTask("task3")
task2.append_input_task(task1, task_dependency_mode=BaseTaskDependency.SS)
task3.append_input_task(task1, task_dependency_mode=BaseTaskDependency.FS)
task3.append_input_task(task2, task_dependency_mode=BaseTaskDependency.SS)
return BaseWorkflow(task_list=[task1, task2, task3])
[docs]
@pytest.fixture
def FF_workflow(scope="function"):
"""FF_workflow.
Simple workflow including FinishToFinish dependency.
"""
task1 = BaseTask("task1")
task2 = BaseTask("task2")
task3 = BaseTask("task3")
task2.append_input_task(task1, task_dependency_mode=BaseTaskDependency.FF)
task3.append_input_task(task1, task_dependency_mode=BaseTaskDependency.FS)
task3.append_input_task(task2, task_dependency_mode=BaseTaskDependency.SS)
return BaseWorkflow(task_list=[task1, task2, task3])
[docs]
@pytest.fixture
def SF_workflow(scope="function"):
"""SF_workflow.
Simple workflow including StartToFinish dependency.
"""
task1 = BaseTask("task1")
task2 = BaseTask("task2")
task3 = BaseTask("task3")
task2.append_input_task(task1, task_dependency_mode=BaseTaskDependency.SF)
task3.append_input_task(task1, task_dependency_mode=BaseTaskDependency.FS)
task3.append_input_task(task2, task_dependency_mode=BaseTaskDependency.SS)
return BaseWorkflow(task_list=[task1, task2, task3])
[docs]
def test_reverse_dependencies(dummy_workflow):
"""test_reverse_dependencies."""
assert dummy_workflow.task_list[0].input_task_list == []
assert dummy_workflow.task_list[0].output_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[1].input_task_list == []
assert dummy_workflow.task_list[1].output_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[2].input_task_list == [
[dummy_workflow.task_list[0], BaseTaskDependency.FS],
[dummy_workflow.task_list[1], BaseTaskDependency.FS],
]
assert dummy_workflow.task_list[2].output_task_list == [
[dummy_workflow.task_list[4], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[3].input_task_list == []
assert dummy_workflow.task_list[3].output_task_list == [
[dummy_workflow.task_list[4], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[4].input_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS],
[dummy_workflow.task_list[3], BaseTaskDependency.FS],
]
assert dummy_workflow.task_list[4].output_task_list == []
dummy_workflow.reverse_dependencies()
assert dummy_workflow.task_list[0].output_task_list == []
assert dummy_workflow.task_list[0].input_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[1].output_task_list == []
assert dummy_workflow.task_list[1].input_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[2].output_task_list == [
[dummy_workflow.task_list[0], BaseTaskDependency.FS],
[dummy_workflow.task_list[1], BaseTaskDependency.FS],
]
assert dummy_workflow.task_list[2].input_task_list == [
[dummy_workflow.task_list[4], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[3].output_task_list == []
assert dummy_workflow.task_list[3].input_task_list == [
[dummy_workflow.task_list[4], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[4].output_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS],
[dummy_workflow.task_list[3], BaseTaskDependency.FS],
]
assert dummy_workflow.task_list[4].input_task_list == []
dummy_workflow.reverse_dependencies()
assert dummy_workflow.task_list[0].input_task_list == []
assert dummy_workflow.task_list[0].output_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[1].input_task_list == []
assert dummy_workflow.task_list[1].output_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[2].input_task_list == [
[dummy_workflow.task_list[0], BaseTaskDependency.FS],
[dummy_workflow.task_list[1], BaseTaskDependency.FS],
]
assert dummy_workflow.task_list[2].output_task_list == [
[dummy_workflow.task_list[4], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[3].input_task_list == []
assert dummy_workflow.task_list[3].output_task_list == [
[dummy_workflow.task_list[4], BaseTaskDependency.FS]
]
assert dummy_workflow.task_list[4].input_task_list == [
[dummy_workflow.task_list[2], BaseTaskDependency.FS],
[dummy_workflow.task_list[3], BaseTaskDependency.FS],
]
assert dummy_workflow.task_list[4].output_task_list == []
[docs]
def test_init():
"""test_init."""
task1 = BaseTask("task1")
task2 = BaseTask("task2")
task2.append_input_task(task1)
w = BaseWorkflow(task_list=[task1, task2])
assert w.task_list == [task1, task2]
assert w.critical_path_length == 0.0
[docs]
def test_str():
"""test_str."""
print(BaseWorkflow(task_list=[]))
[docs]
def test_get_task_list(dummy_workflow):
"""test_get_task_list."""
# TODO if we have enough time for setting test case...
assert (
len(
dummy_workflow.get_task_list(
name="test",
ID="test",
default_work_amount=0,
input_task_list=[],
output_task_list=[],
allocated_team_list=[],
allocated_workplace_list=[],
need_facility=False,
target_component="test",
default_progress=0.85,
due_time=99,
auto_task=False,
fixing_allocating_worker_id_list=[],
fixing_allocating_facility_id_list=[],
# search param
est=1,
eft=2,
lst=3,
lft=4,
remaining_work_amount=999,
state=BaseTaskState.READY,
allocated_worker_list=[],
allocated_worker_id_record=[],
allocated_facility_list=[],
allocated_facility_id_record=[],
)
)
== 0
)
[docs]
def test_initialize():
"""test_initialize."""
task = BaseTask("task")
task.est = 2.0
task.eft = 10.0
task.lst = 3.0
task.lft = 11.0
task.remaining_work_amount = 7
task.actual_work_amount = 6
task.state = BaseTaskState.FINISHED
task.additional_task_flag = True
task.allocated_worker_list = [BaseWorker("w1")]
task_after1 = BaseTask("task_after1")
task_after2 = BaseTask("task_after2", default_work_amount=5.0)
task_after1.append_input_task(task)
task_after2.append_input_task(task)
w = BaseWorkflow(task_list=[task, task_after1, task_after2])
w.critical_path_length = 100.0
w.initialize()
assert w.critical_path_length == 20.0
assert w.task_list[0].est == 0.0
assert w.task_list[0].eft == 10.0
assert w.task_list[0].lst == 0.0
assert w.task_list[0].lft == 10.0
assert w.task_list[0].state == BaseTaskState.READY
assert w.task_list[1].est == 10.0
assert w.task_list[1].eft == 20.0
assert w.task_list[1].lst == 10.0
assert w.task_list[1].lft == 20.0
assert w.task_list[1].state == BaseTaskState.NONE
assert w.task_list[2].est == 10.0
assert w.task_list[2].eft == 15.0
assert w.task_list[2].lst == 15.0
assert w.task_list[2].lft == 20.0
assert w.task_list[2].state == BaseTaskState.NONE
[docs]
def test_update_PERT_data(SS_workflow, FF_workflow, SF_workflow):
"""test_update_PERT_data."""
SS_workflow.update_PERT_data(0)
assert (SS_workflow.task_list[0].est, SS_workflow.task_list[0].eft) == (0, 10)
assert (SS_workflow.task_list[1].est, SS_workflow.task_list[1].eft) == (0, 10)
assert (SS_workflow.task_list[2].est, SS_workflow.task_list[2].eft) == (10, 20)
assert (SS_workflow.task_list[0].lst, SS_workflow.task_list[0].lft) == (0, 10)
assert (SS_workflow.task_list[1].lst, SS_workflow.task_list[1].lft) == (10, 20)
assert (SS_workflow.task_list[2].lst, SS_workflow.task_list[2].lft) == (10, 20)
FF_workflow.update_PERT_data(0)
assert (FF_workflow.task_list[0].est, FF_workflow.task_list[0].eft) == (0, 10)
assert (FF_workflow.task_list[1].est, FF_workflow.task_list[1].eft) == (0, 10)
assert (FF_workflow.task_list[2].est, FF_workflow.task_list[2].eft) == (10, 20)
assert (FF_workflow.task_list[0].lst, FF_workflow.task_list[0].lft) == (0, 10)
assert (FF_workflow.task_list[1].lst, FF_workflow.task_list[1].lft) == (10, 20)
assert (FF_workflow.task_list[2].lst, FF_workflow.task_list[2].lft) == (10, 20)
SF_workflow.update_PERT_data(0)
assert (SF_workflow.task_list[0].est, SF_workflow.task_list[0].eft) == (0, 10)
assert (SF_workflow.task_list[1].est, SF_workflow.task_list[1].eft) == (0, 10)
assert (SF_workflow.task_list[2].est, SF_workflow.task_list[2].eft) == (10, 20)
assert (SF_workflow.task_list[0].lst, SF_workflow.task_list[0].lft) == (0, 10)
assert (SF_workflow.task_list[1].lst, SF_workflow.task_list[1].lft) == (10, 20)
assert (SF_workflow.task_list[2].lst, SF_workflow.task_list[2].lft) == (10, 20)
[docs]
def test_check_state():
"""test_check_state."""
task1 = BaseTask("task1")
task2 = BaseTask("task2")
task3 = BaseTask("task3")
task4 = BaseTask("task4")
task5 = BaseTask("task5")
task3.extend_input_task_list([task1, task2])
task5.extend_input_task_list([task3, task4])
w = BaseWorkflow(task_list=[task1, task2, task3, task4, task5])
w1 = BaseWorker("w1", assigned_task_list=[task1])
# __check_ready test
task1.state = BaseTaskState.FINISHED
task2.state = BaseTaskState.FINISHED
task3.state = BaseTaskState.NONE
task4.state = BaseTaskState.NONE
task5.state = BaseTaskState.NONE
w.check_state(2, BaseTaskState.READY)
assert task1.state == BaseTaskState.FINISHED
assert task2.state == BaseTaskState.FINISHED
assert task3.state == BaseTaskState.READY
assert task4.state == BaseTaskState.READY
assert task5.state == BaseTaskState.NONE
# __check_working test
task1.state = BaseTaskState.READY
task2.state = BaseTaskState.READY
task2.allocated_worker_list = [w1]
task3.state = BaseTaskState.NONE
task4.state = BaseTaskState.NONE
task5.state = BaseTaskState.NONE
w.check_state(2, BaseTaskState.WORKING)
assert task1.state == BaseTaskState.READY
assert task2.state == BaseTaskState.WORKING
assert task3.state == BaseTaskState.NONE
assert task4.state == BaseTaskState.NONE
assert task5.state == BaseTaskState.NONE
task1.state = BaseTaskState.WORKING
task1.need_facility = True
w2 = BaseWorker("w2", assigned_task_list=[task1])
f2 = BaseFacility("f2", assigned_task_list=[task1])
task1.allocated_worker_list = [w2]
task1.allocated_facility_list = [f2]
w.check_state(2, BaseTaskState.WORKING)
# __check_finished test
task1.state = BaseTaskState.WORKING
task1.allocated_worker_list = [w1]
task1.remaining_work_amount = 0.0
task2.state = BaseTaskState.FINISHED
task3.state = BaseTaskState.NONE
task4.state = BaseTaskState.NONE
task5.state = BaseTaskState.NONE
w.check_state(2, BaseTaskState.FINISHED)
assert task1.state == BaseTaskState.FINISHED
assert task2.state == BaseTaskState.FINISHED
assert task3.state == BaseTaskState.NONE
assert task4.state == BaseTaskState.NONE
assert task5.state == BaseTaskState.NONE
[docs]
def test___check_ready(SS_workflow, SF_workflow, FF_workflow):
"""test___check_ready."""
# For SS_workflow
SS_workflow.check_state(-1, BaseTaskState.READY)
assert SS_workflow.task_list[0].state == BaseTaskState.READY
assert SS_workflow.task_list[1].state == BaseTaskState.NONE
assert SS_workflow.task_list[2].state == BaseTaskState.NONE
SS_workflow.task_list[0].state = BaseTaskState.WORKING
SS_workflow.check_state(0, BaseTaskState.READY)
assert SS_workflow.task_list[0].state == BaseTaskState.WORKING
assert SS_workflow.task_list[1].state == BaseTaskState.READY
assert SS_workflow.task_list[2].state == BaseTaskState.NONE
SS_workflow.task_list[1].state = BaseTaskState.WORKING
SS_workflow.check_state(1, BaseTaskState.READY)
assert SS_workflow.task_list[0].state == BaseTaskState.WORKING
assert SS_workflow.task_list[1].state == BaseTaskState.WORKING
assert SS_workflow.task_list[2].state == BaseTaskState.NONE
SS_workflow.task_list[0].state = BaseTaskState.FINISHED
SS_workflow.check_state(2, BaseTaskState.READY)
assert SS_workflow.task_list[0].state == BaseTaskState.FINISHED
assert SS_workflow.task_list[1].state == BaseTaskState.WORKING
assert SS_workflow.task_list[2].state == BaseTaskState.READY
# For FF_workflow
FF_workflow.check_state(-1, BaseTaskState.READY)
assert FF_workflow.task_list[0].state == BaseTaskState.READY
assert FF_workflow.task_list[1].state == BaseTaskState.READY
assert FF_workflow.task_list[2].state == BaseTaskState.NONE
# For SF_workflow
SF_workflow.check_state(-1, BaseTaskState.READY)
assert SF_workflow.task_list[0].state == BaseTaskState.READY
assert SF_workflow.task_list[1].state == BaseTaskState.READY
assert SF_workflow.task_list[2].state == BaseTaskState.NONE
[docs]
def test___check_working():
"""test___check_working."""
# this method is tested in test_check_state()
pass
[docs]
def test___check_finished(SF_workflow, FF_workflow):
"""test___check_finished."""
# For SF_workflow
SF_workflow.check_state(-1, BaseTaskState.READY)
assert SF_workflow.task_list[0].state == BaseTaskState.READY
assert SF_workflow.task_list[1].state == BaseTaskState.READY
assert SF_workflow.task_list[2].state == BaseTaskState.NONE
SF_workflow.task_list[1].state = BaseTaskState.WORKING
SF_workflow.task_list[1].remaining_work_amount = 0
SF_workflow.check_state(0, BaseTaskState.FINISHED)
assert SF_workflow.task_list[0].state == BaseTaskState.READY
assert SF_workflow.task_list[1].state == BaseTaskState.WORKING
assert SF_workflow.task_list[2].state == BaseTaskState.NONE
SF_workflow.task_list[0].state = BaseTaskState.WORKING
SF_workflow.check_state(0, BaseTaskState.FINISHED)
assert SF_workflow.task_list[0].state == BaseTaskState.WORKING
assert SF_workflow.task_list[1].state == BaseTaskState.FINISHED
assert SF_workflow.task_list[2].state == BaseTaskState.NONE
# For FF_workflow
FF_workflow.check_state(-1, BaseTaskState.READY)
FF_workflow.task_list[1].state = BaseTaskState.WORKING
FF_workflow.task_list[1].remaining_work_amount = 0
FF_workflow.check_state(0, BaseTaskState.FINISHED)
assert FF_workflow.task_list[0].state == BaseTaskState.READY
assert FF_workflow.task_list[1].state == BaseTaskState.WORKING
assert FF_workflow.task_list[2].state == BaseTaskState.NONE
FF_workflow.task_list[0].state = BaseTaskState.WORKING
FF_workflow.check_state(1, BaseTaskState.FINISHED)
assert FF_workflow.task_list[0].state == BaseTaskState.WORKING
assert FF_workflow.task_list[1].state == BaseTaskState.WORKING
assert FF_workflow.task_list[2].state == BaseTaskState.NONE
FF_workflow.task_list[0].remaining_work_amount = 0
FF_workflow.check_state(2, BaseTaskState.FINISHED)
assert FF_workflow.task_list[0].state == BaseTaskState.FINISHED
assert FF_workflow.task_list[2].state == BaseTaskState.NONE
[docs]
def test___set_est_eft_data():
"""test___set_est_eft_data."""
# this method is tested in test_initialize()
pass
[docs]
def test___set_lst_lft_criticalpath_data():
"""test___set_lst_lft_criticalpath_data."""
# this method is tested in test_initialize()
pass
[docs]
def test_plot_simple_gantt(tmpdir):
"""test_plot_simple_gantt."""
task0 = BaseTask("auto", auto_task=True)
task0.state_record_list = [
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.WORKING,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
]
task1 = BaseTask("task1")
task1.state_record_list = [
BaseTaskState.WORKING,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2 = BaseTask("task2")
task2.state_record_list = [
BaseTaskState.WORKING,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
w = BaseWorkflow(task_list=[task1, task2, task0])
w.plot_simple_gantt(finish_margin=1.0, view_auto_task=True, view_ready=False)
for ext in ["png"]:
save_fig_path = os.path.join(str(tmpdir), "test." + ext)
w.plot_simple_gantt(
view_ready=True, view_auto_task=True, save_fig_path=save_fig_path
)
[docs]
def test_create_data_for_gantt_plotly():
"""test_create_data_for_gantt_plotly."""
task1 = BaseTask("task1")
task1.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2 = BaseTask("task2")
task2.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2.append_input_task(task1)
w = BaseWorkflow(task_list=[task1, task2])
init_datetime = datetime.datetime(2020, 4, 1, 8, 0, 0)
timedelta = datetime.timedelta(days=1)
w.create_data_for_gantt_plotly(init_datetime, timedelta, view_ready=True)
[docs]
def test_create_gantt_plotly(tmpdir):
"""test_create_gantt_plotly."""
task1 = BaseTask("task1")
task1.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2 = BaseTask("task2")
task2.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2.append_input_task(task1)
w = BaseWorkflow(task_list=[task1, task2])
init_datetime = datetime.datetime(2020, 4, 1, 8, 0, 0)
timedelta = datetime.timedelta(days=1)
for ext in ["png", "html", "json"]:
save_fig_path = os.path.join(str(tmpdir), "test." + ext)
w.create_gantt_plotly(init_datetime, timedelta, save_fig_path=save_fig_path)
[docs]
def test_get_networkx_graph():
"""test_get_networkx_graph."""
task1 = BaseTask("task1")
task1.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2 = BaseTask("task2")
task2.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2.append_input_task(task1)
w = BaseWorkflow(task_list=[task1, task2])
w.get_networkx_graph()
[docs]
def test_draw_networkx(tmpdir):
"""test_draw_networkx."""
task0 = BaseTask("auto", auto_task=True)
task1 = BaseTask("task1")
task1.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2 = BaseTask("task2")
task2.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2.append_input_task(task1)
w = BaseWorkflow(task_list=[task1, task2, task0])
for ext in ["png"]:
save_fig_path = os.path.join(str(tmpdir), "test." + ext)
w.draw_networkx(save_fig_path=save_fig_path)
[docs]
def test_get_node_and_edge_trace_for_plotly_network():
"""test_get_node_and_edge_trace_for_plotly_network."""
task1 = BaseTask("task1")
task1.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2 = BaseTask("task2")
task2.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2.append_input_task(task1)
w = BaseWorkflow(task_list=[task1, task2])
(
task_node_trace,
auto_task_node_trace,
edge_trace,
) = w.get_node_and_edge_trace_for_plotly_network()
(
task_node_trace,
auto_task_node_trace,
edge_trace,
) = w.get_node_and_edge_trace_for_plotly_network()
[docs]
def test_draw_plotly_network(tmpdir):
"""test_draw_plotly_network."""
task0 = BaseTask("auto", auto_task=True)
task1 = BaseTask("task1")
task1.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2 = BaseTask("task2")
task2.state_record_list = [
BaseTaskState.READY,
BaseTaskState.READY,
BaseTaskState.WORKING,
BaseTaskState.FINISHED,
BaseTaskState.FINISHED,
]
task2.append_input_task(task1)
w = BaseWorkflow(task_list=[task1, task2, task0])
for ext in ["png", "html", "json"]:
save_fig_path = os.path.join(str(tmpdir), "test." + ext)
w.draw_plotly_network(save_fig_path=save_fig_path)
[docs]
def test_remove_insert_absence_time_list():
"""test_remove_insert_absence_time_list."""
w1 = BaseTask("w1", "----")
w1.remaining_work_amount_record_list = [3, 2, 1, 1, 1, 0]
w1.allocated_worker_id_record = ["aa", "bb", "cc", "dd", "ee", "ff"]
w1.allocated_facility_id_record = ["aa", "bb", "cc", "dd", "ee", "ff"]
w1.state_record_list = [0, 1, 2, 3, 4, 5]
w2 = BaseTask("w2", "----")
w2.remaining_work_amount_record_list = [3, 2, 1, 1, 1, 0]
w2.allocated_worker_id_record = ["aa", "bb", "cc", "dd", "ee", "ff"]
w2.allocated_facility_id_record = ["aa", "bb", "cc", "dd", "ee", "ff"]
w2.state_record_list = [0, 1, 2, 3, 4, 5]
w2.append_input_task(w1)
workflow = BaseWorkflow(task_list=[w1, w2])
absence_time_list = [3, 4]
workflow.remove_absence_time_list(absence_time_list)
assert w1.allocated_worker_id_record == ["aa", "bb", "cc", "ff"]
assert w1.allocated_facility_id_record == ["aa", "bb", "cc", "ff"]
assert w1.state_record_list == [0, 1, 2, 5]
assert w1.remaining_work_amount_record_list == [3, 2, 1, 0]
assert w2.allocated_worker_id_record == ["aa", "bb", "cc", "ff"]
assert w2.allocated_facility_id_record == ["aa", "bb", "cc", "ff"]
assert w2.state_record_list == [0, 1, 2, 5]
assert w2.remaining_work_amount_record_list == [3, 2, 1, 0]
workflow.insert_absence_time_list(absence_time_list)
assert w1.allocated_worker_id_record == ["aa", "bb", "cc", "cc", "cc", "ff"]
assert w1.allocated_facility_id_record == ["aa", "bb", "cc", "cc", "cc", "ff"]
assert w1.state_record_list == [0, 1, 2, 1, 1, 5]
assert w1.remaining_work_amount_record_list == [3, 2, 1, 1, 1, 0]
assert w2.allocated_worker_id_record == ["aa", "bb", "cc", "cc", "cc", "ff"]
assert w2.allocated_facility_id_record == ["aa", "bb", "cc", "cc", "cc", "ff"]
assert w2.state_record_list == [0, 1, 2, 1, 1, 5]
assert w2.remaining_work_amount_record_list == [3, 2, 1, 1, 1, 0]
[docs]
def test_print_mermaid_diagram(dummy_workflow):
"""test_print_mermaid_diagram."""
dummy_workflow.print_mermaid_diagram(
orientations="LR",
print_work_amount_info=True,
print_dependency_type=False,
)
dummy_workflow.print_mermaid_diagram(
orientations="LR",
print_work_amount_info=False,
print_dependency_type=True,
subgraph=True,
)