Source code for tests.model.test_base_organization

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""test_base_organization."""

import datetime
import os

from pDESy.model.base_facility import BaseFacility, BaseFacilityState
from pDESy.model.base_organization import BaseOrganization
from pDESy.model.base_team import BaseTeam
from pDESy.model.base_worker import BaseWorker, BaseWorkerState
from pDESy.model.base_workplace import BaseWorkplace

import pytest


[docs] @pytest.fixture def dummy_organization(scope="function"): """dummy_organization.""" c1 = BaseTeam("c1") w11 = BaseWorker("w11", cost_per_time=10.0) w12 = BaseWorker("w12", cost_per_time=5.0) w11.state_record_list = [ BaseWorkerState.WORKING, BaseWorkerState.WORKING, BaseWorkerState.FREE, BaseWorkerState.WORKING, BaseWorkerState.FREE, BaseWorkerState.FREE, ] w12.state_record_list = [ BaseWorkerState.WORKING, BaseWorkerState.WORKING, BaseWorkerState.FREE, BaseWorkerState.WORKING, BaseWorkerState.FREE, BaseWorkerState.FREE, ] c1.worker_list = [w11, w12] c2 = BaseTeam("c2") w2 = BaseWorker("w2", cost_per_time=5.0) w2.state_record_list = [ BaseWorkerState.WORKING, BaseWorkerState.WORKING, BaseWorkerState.FREE, BaseWorkerState.WORKING, BaseWorkerState.FREE, BaseWorkerState.FREE, ] c2.worker_list = [w2] c2.parent_team = c1 f = BaseFacility("f", cost_per_time=20.0) f.state_record_list = [ BaseFacilityState.WORKING, BaseFacilityState.WORKING, BaseFacilityState.FREE, BaseFacilityState.WORKING, BaseFacilityState.FREE, BaseFacilityState.FREE, ] workplace = BaseWorkplace("workplace", facility_list=[f]) dummy_workplace = BaseWorkplace("dummy") workplace.parent_workplace = dummy_workplace organization = BaseOrganization( team_list=[c1, c2], workplace_list=[workplace, dummy_workplace] ) return organization
[docs] def test_init(dummy_organization): """test_init.""" assert [team.name for team in dummy_organization.team_list] == ["c1", "c2"] assert [workplace.name for workplace in dummy_organization.workplace_list] == [ "workplace", "dummy", ]
[docs] def test_initialize(dummy_organization): """test_initialize.""" team = dummy_organization.team_list[0] workplace = dummy_organization.workplace_list[0] team.cost_list = [4.0] workplace.cost_list = [4.0] dummy_organization.cost_list = [8.0] assert team.cost_list == [4.0] assert workplace.cost_list == [4.0] assert dummy_organization.cost_list == [8.0] dummy_organization.initialize() assert team.cost_list == [] assert workplace.cost_list == [] assert dummy_organization.cost_list == []
[docs] def test_str(dummy_organization): """test_str.""" print(dummy_organization)
[docs] def test_get_team_list(dummy_organization): """test_get_team_list.""" # TODO if we have enough time for setting test case... assert ( len( dummy_organization.get_team_list( name="test", ID="test", worker_list=[], targeted_task_list=[], parent_team=[], cost_list=[], ) ) == 0 )
[docs] def test_get_workplace_list(dummy_organization): """test_get_workplace_list.""" # TODO if we have enough time for setting test case... assert ( len( dummy_organization.get_workplace_list( name="test", ID="test", facility_list=[], targeted_task_list=[], parent_workplace=[], max_space_size=99876, cost_list=[], placed_component_list=[], placed_component_id_record=[], ) ) == 0 )
[docs] def test_add_labor_cost(dummy_organization): """test_add_labor_cost.""" w11 = dummy_organization.team_list[0].worker_list[0] w12 = dummy_organization.team_list[0].worker_list[1] w21 = dummy_organization.team_list[1].worker_list[0] facility = dummy_organization.workplace_list[0].facility_list[0] w11.state = BaseWorkerState.WORKING w12.state = BaseWorkerState.FREE w21.state = BaseWorkerState.WORKING facility.state = BaseFacilityState.WORKING dummy_organization.add_labor_cost( only_working=False, add_zero_to_all_workers=False, add_zero_to_all_facilities=False, ) assert dummy_organization.cost_list == [40.0] dummy_organization.add_labor_cost( only_working=False, add_zero_to_all_workers=True, add_zero_to_all_facilities=True, ) assert dummy_organization.cost_list == [40.0, 0.0] dummy_organization.add_labor_cost( only_working=True, add_zero_to_all_workers=False, add_zero_to_all_facilities=False, ) assert dummy_organization.cost_list == [40.0, 0.0, 35.0]
[docs] def test_plot_simple_gantt(dummy_organization, tmpdir): """test_plot_simple_gantt.""" for ext in ["png"]: save_fig_path = os.path.join(str(tmpdir), "test." + ext) dummy_organization.plot_simple_gantt(save_fig_path=save_fig_path)
[docs] def test_create_data_for_gantt_plotly(dummy_organization): """test_create_data_for_gantt_plotly.""" init_datetime = datetime.datetime(2020, 4, 1, 8, 0, 0) timedelta = datetime.timedelta(days=1) dummy_organization.create_data_for_gantt_plotly(init_datetime, timedelta)
[docs] def test_create_gantt_plotly(dummy_organization, tmpdir): """test_create_gantt_plotly.""" init_datetime = datetime.datetime(2020, 4, 1, 8, 0, 0) timedelta = datetime.timedelta(days=1) for ext in ["png", "html", "json"]: save_fig_path = os.path.join(str(tmpdir), "test." + ext) dummy_organization.create_gantt_plotly( init_datetime, timedelta, save_fig_path=save_fig_path )
[docs] def test_create_data_for_cost_history_plotly(dummy_organization): """test_create_data_for_cost_history_plotly.""" init_datetime = datetime.datetime(2020, 4, 1, 8, 0, 0) timedelta = datetime.timedelta(days=1) data = dummy_organization.create_data_for_cost_history_plotly( init_datetime, timedelta ) x = [ (init_datetime + time * timedelta).strftime("%Y-%m-%d %H:%M:%S") for time in range(len(dummy_organization.cost_list)) ] # w1 assert data[0].name == dummy_organization.team_list[0].name assert data[0].x == tuple(x) assert data[0].y == tuple(dummy_organization.team_list[0].cost_list)
[docs] def test_create_cost_history_plotly(dummy_organization, tmpdir): """test_create_cost_history_plotly.""" init_datetime = datetime.datetime(2020, 4, 1, 8, 0, 0) timedelta = datetime.timedelta(days=1) dummy_organization.create_cost_history_plotly(init_datetime, timedelta) for ext in ["png", "html", "json"]: save_fig_path = os.path.join(str(tmpdir), "test." + ext) dummy_organization.create_cost_history_plotly( init_datetime, timedelta, title="bbbbbbb", save_fig_path=save_fig_path )
[docs] def test_get_networkx_graph(dummy_organization): """test_get_networkx_graph.""" dummy_organization.get_networkx_graph() dummy_organization.get_networkx_graph(view_workers=True, view_facilities=True)
[docs] def test_draw_networkx(dummy_organization, tmpdir): """test_draw_networkx.""" for ext in ["png"]: save_fig_path = os.path.join(str(tmpdir), "test." + ext) dummy_organization.draw_networkx( view_workers=True, view_facilities=True, save_fig_path=save_fig_path )
[docs] def test_get_node_and_edge_trace_for_plotly_network(dummy_organization): """test_get_node_and_edge_trace_for_plotly_network.""" ( team_node_trace, worker_node_trace, workplace_node_trace, facility_node_trace, edge_trace, ) = dummy_organization.get_node_and_edge_trace_for_plotly_network() ( team_node_trace, worker_node_trace, workplace_node_trace, facility_node_trace, edge_trace, ) = dummy_organization.get_node_and_edge_trace_for_plotly_network( view_workers=True, view_facilities=True )
[docs] def test_draw_plotly_network(dummy_organization, tmpdir): """test_draw_plotly_network.""" for ext in ["png", "html", "json"]: save_fig_path = os.path.join(str(tmpdir), "test." + ext) dummy_organization.draw_plotly_network(save_fig_path=save_fig_path)
[docs] def test_remove_insert_absence_time_list(): """test_remove_insert_absence_time_list.""" f1 = BaseFacility("w1", "----") f1.cost_list = [1.0, 0.0, 1.0, 0.0, 0.0, 1.0] f1.assigned_task_id_record = ["aa", "bb", "cc", "dd", "ee", "ff"] f1.state_record_list = [2, 1, 2, 1, 1, 2] f2 = BaseFacility("w1", "----") f2.cost_list = [1.0, 0.0, 1.0, 0.0, 0.0, 1.0] f2.assigned_task_id_record = ["aa", "bb", "cc", "dd", "ee", "ff"] f2.state_record_list = [2, 1, 2, 1, 1, 2] workplace = BaseWorkplace("aa", facility_list=[f1, f2]) workplace.cost_list = [2.0, 0.0, 2.0, 0.0, 0.0, 2.0] w1 = BaseWorker("w1", "----") w1.cost_list = [1.0, 0.0, 1.0, 0.0, 0.0, 1.0] w1.assigned_task_id_record = ["aa", "bb", "cc", "dd", "ee", "ff"] w1.state_record_list = [2, 1, 2, 1, 1, 2] w2 = BaseWorker("w1", "----") w2.cost_list = [1.0, 0.0, 1.0, 0.0, 0.0, 1.0] w2.assigned_task_id_record = ["aa", "bb", "cc", "dd", "ee", "ff"] w2.state_record_list = [2, 1, 2, 1, 1, 2] team = BaseTeam("aa", worker_list=[w1, w2]) team.cost_list = [2.0, 0.0, 2.0, 0.0, 0.0, 2.0] organization = BaseOrganization(team_list=[team], workplace_list=[workplace]) organization.cost_list = [4.0, 0.0, 4.0, 0.0, 0.0, 4.0] absence_time_list = [1, 3, 4] organization.remove_absence_time_list(absence_time_list) assert organization.cost_list == [4.0, 4.0, 4.0] assert workplace.cost_list == [2.0, 2.0, 2.0] assert f1.cost_list == [1.0, 1.0, 1.0] assert f1.assigned_task_id_record == ["aa", "cc", "ff"] assert f1.state_record_list == [2, 2, 2] assert f2.cost_list == [1.0, 1.0, 1.0] assert f2.assigned_task_id_record == ["aa", "cc", "ff"] assert f2.state_record_list == [2, 2, 2] assert team.cost_list == [2.0, 2.0, 2.0] assert w1.cost_list == [1.0, 1.0, 1.0] assert w1.assigned_task_id_record == ["aa", "cc", "ff"] assert w1.state_record_list == [2, 2, 2] assert w2.cost_list == [1.0, 1.0, 1.0] assert w2.assigned_task_id_record == ["aa", "cc", "ff"] assert w2.state_record_list == [2, 2, 2] organization.insert_absence_time_list(absence_time_list) assert organization.cost_list == [4.0, 0.0, 4.0, 0.0, 0.0, 4.0] assert team.cost_list == [2.0, 0.0, 2.0, 0.0, 0.0, 2.0] assert w1.cost_list == [1.0, 0.0, 1.0, 0.0, 0.0, 1.0] assert w1.assigned_task_id_record == ["aa", "aa", "cc", "cc", "cc", "ff"] assert w1.state_record_list == [2, 0, 2, 0, 0, 2] assert w2.cost_list == [1.0, 0.0, 1.0, 0.0, 0.0, 1.0] assert w2.assigned_task_id_record == ["aa", "aa", "cc", "cc", "cc", "ff"] assert w2.state_record_list == [2, 0, 2, 0, 0, 2] assert workplace.cost_list == [2.0, 0.0, 2.0, 0.0, 0.0, 2.0] assert f1.cost_list == [1.0, 0.0, 1.0, 0.0, 0.0, 1.0] assert f1.assigned_task_id_record == ["aa", "aa", "cc", "cc", "cc", "ff"] assert f1.state_record_list == [2, 0, 2, 0, 0, 2] assert f2.cost_list == [1.0, 0.0, 1.0, 0.0, 0.0, 1.0] assert f2.assigned_task_id_record == ["aa", "aa", "cc", "cc", "cc", "ff"] assert f2.state_record_list == [2, 0, 2, 0, 0, 2]
[docs] def test_print_mermaid_diagram(dummy_organization): """test_print_mermaid_diagram.""" dummy_organization.print_mermaid_diagram(orientations="LR", subgraph=True)