HTSim/Environment.py

166 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import numpy as np
import pandas as pd
import agentpy as ap
from datetime import datetime
from numpy import random
import json
from Firm import Firm
# import passive agents
from Order import Order
from fake_api import get_plan_by_pp_id, get_bom_by_prd_id
class FMSEnv(ap.Model):
# put all parameters here, not in any other places
# xv_int_max_order: int
# ev_n_order_created: int
the_firm: Firm
# record data, define below
# op_os_n_total_order: int
# op_os_n_total_order_delayed: int
op_os_all_delay_time: list
# op_os_delay_ratio: float
# op_is_flt_material_room_left: float
# op_is_flt_product_room_left: float
op_ps_str_status: str
op_os_to_dlv: np.ndarray
op_is_current_product: np.ndarray
op_is_current_material: np.ndarray
op_is_trans_material: np.ndarray
op_ps_back_trans_material: np.ndarray
op_ps_produced_num: np.ndarray
op_is_ip_mat_id: np.ndarray
op_ip_prd_s: np.ndarray
op_ip_prd_big_s: np.ndarray
op_ip_prd_est_pfm: int
def __init__(self, dct_all_para, _run_id=None):
super().__init__()
# create agents here
self.the_firm = Firm(env=self, dct_all_para=dct_all_para)
# get the input parameter values from the dictionary
self.int_stop_time = int(dct_all_para['time'])
# self.xv_int_max_order = int(dct_all_para['xv_int_max_order'])
self.xv_dlv_product_para = np.asarray(dct_all_para['xv_dlv_product_para'])
# self.xv_int_dlv_period_lam = int(dct_all_para['xv_int_dlv_period_lam'])
# self.ev_n_order_created = 0
self.running = True
self.t = 0
# Creation of orders should be done in the environment
def create_order(self):
# Check if maximum number of orders has been reached
xv_int_create_order_num = 1
# xv_int_create_order_num = random.poisson(lam=xv_int_create_order_lam, size=None)
# if self.ev_n_order_created < xv_int_max_order:
for i in range(xv_int_create_order_num):
new_order = Order(model=self, time_created=self.t, xv_dlv_product_para=self.xv_dlv_product_para,
leadtime=max(self.the_firm.xv_ary_lead_time))
return new_order
return None
# Execute the interactions of each time step in the simulation.
def step(self):
# organize the interactions of agents in each time step here
new_order = self.create_order()
self.the_firm.the_os.accept_order(new_order)
self.the_firm.operating()
self.update()
if self.t >= self.int_stop_time:
self.running = False
self.stop()
else:
print(f"running the {self.t} step")
# Record data after each simulation
def update(self):
self.op_os_n_total_order = len(self.the_firm.the_os.a_lst_order)
# self.op_os_n_total_order_delayed = len([e for e in self.the_firm.the_os.a_lst_order if e.xv_dlv_t < self.t])
self.op_os_to_dlv = self.the_firm.the_os.ev_ary_to_dlv
# self.op_os_all_delay_time = self.the_firm.the_os.ev_lst_all_delay_time
self.op_ps_produced_num = self.the_firm.the_ps.ev_ary_produced_num
self.op_ps_str_status = self.the_firm.the_ps.ev_str_status
# self.op_is_current_product = self.model.the_firm.the_is.ev_ary_current_product
#
# self.op_is_current_material = self.model.the_firm.the_is.ev_ary_current_material
#
# self.op_is_trans_material= self.model.the_firm.the_is.ev_lst_trans_material
self.op_ps_back_trans_material = self.model.the_firm.the_ps.ev_lst_backtrans_material
self.record([att for att in self.__dict__.keys() if att.startswith('op_')])
pass
if __name__ == '__main__':
dct_para = {
'time': 60,
# 'xv_int_max_order': random.randint(30, 50),
# 'xv_dlv_product_para': tuple([(30, 100), (30, 50)]),
# 'xv_dlv_product_para': tuple([30,40,30,20]), # 读取生产率 np.read.
# 'xv_int_dlv_period_lam': 8.5,
# 'xv_int_create_order_lam': 2,
# 'xv_ary_price_product': tuple([0.3,0.2,0.5,1]),
# 'xv_ary_cost_material_per': tuple([0.1,0.1,0.2,0.4]),
# 'xv_ary_volume_material': tuple([1.0, 1.5]),
# 'xv_ary_volume_product': tuple([3.0, 5.0]),
'xv_array_lead_time': 2, # 读取原材料表格 np.read.,暂时不读
# 'xv_int_lead_time_c': 3,
# 'xv_int_lead_time_d': 1,
'xv_ary_initial_product_num': pd.read_excel("initial_product.xlsx").to_numpy(),
'xv_ary_initial_material_num': pd.read_excel("initial_material.xlsx").to_numpy(), # 应读取遗传算法中随机生成的s暂写为'1' 创建两个excel分别存储产品和原材料的库存 每个excel中存系统代码和库存
# 'xv_flt_initial_cash': 50000.0,
# 'dct_status_info': json.dumps({ #需要引入生产状态表
# "0": {"xv_flt_produce_rate": tuple([0.0, 0.0]),
# "xv_ary_mat_material": tuple([0.0, 0.0]),
# "xv_flt_broken_rate": 0,
# "xv_flt_run_cost": 0.0,
# "name": "wait"
# },
# "1": {"xv_flt_produce_rate": tuple([90.0, 0.0]),
# "xv_ary_mat_material": tuple([4.0, 1.0]),
# "xv_flt_broken_rate": 0.03,
# "xv_flt_run_cost": 40.0,
# "name": "produceA"
# },
# "2": {"xv_flt_produce_rate": tuple([0.0, 60.0]),
# "xv_ary_mat_material": tuple([1.5, 5.0]),
# "xv_flt_broken_rate": 0.05,
# "xv_flt_run_cost": 50.0,
# "name": "produceB"
# },
# "3": {"xv_flt_produce_rate": tuple([55.0, 30.0]),
# "xv_ary_mat_material": tuple([2.0, 1.5]),
# "xv_flt_broken_rate": 0.07,
# "xv_flt_run_cost": 60.0,
# "name": "produceAB"
# },
# "-1": {"xv_flt_produce_rate": 0.0,
# "xv_ary_mat_material": tuple([0.0, 0.0]),
# "xv_flt_broken_rate": 0.1,
# "xv_flt_run_cost": 100.0,
# "name": "failed"
# }
# })
}
sample = ap.Sample(dct_para)
exp = ap.Experiment(FMSEnv, sample, iterations=1, record=True)
results = exp.run()
# results['variables']['FMSEnv'].to_excel(f"simulation-results-{datetime.today().strftime('%Y-%m-%d-%H-%M-%S')}.xlsx",
# engine='openpyxl')