import random import numpy as np import pandas as pd import agentpy as ap from datetime import datetime from numpy import random import json from Firm import Firm # import passive agents from Order import Order from fake_api import get_plan_by_pp_id, get_bom_by_prd_id class FMSEnv(ap.Model): # put all parameters here, not in any other places # xv_int_max_order: int # ev_n_order_created: int the_firm: Firm # Firm类 # record data, define below # op_os_n_total_order: int # op_os_n_total_order_delayed: int op_os_all_delay_time: list # op_os_delay_ratio: float # op_is_flt_material_room_left: float # op_is_flt_product_room_left: float op_os_int_status: int op_os_to_dlv: np.ndarray op_is_current_product: np.ndarray op_is_current_material: np.ndarray op_is_trans_material: np.ndarray op_ps_back_trans_material: np.ndarray op_ps_produced_num: np.ndarray op_is_ip_mat_id: np.ndarray op_ip_prd_s: np.ndarray op_ip_prd_big_s: np.ndarray op_ip_prd_est_pfm: int op_os_n_total_order: int def __init__(self, dct_all_para, _run_id=None): super().__init__() # create agents here self.the_firm = Firm(env=self, dct_all_para=dct_all_para) # get the input parameter values from the dictionary self.int_stop_time = int(dct_all_para['time']) # 停止接单时间 # self.xv_int_max_order = int(dct_all_para['xv_int_max_order']) # self.xv_dlv_product_para = np.asarray(dct_all_para['xv_dlv_product_para']) # self.xv_int_dlv_period_lam = int(dct_all_para['xv_int_dlv_period_lam']) # self.ev_n_order_created = 0 self.op_os_n_total_order = 0 self.op_os_int_status = 0 self.running = True self.t = 0 # Creation of orders should be done in the environment def create_order(self): # Check if maximum number of orders has been reached xv_int_create_order_num = 1 # xv_int_create_order_num = random.poisson(lam=xv_int_create_order_lam, size=None) # if self.ev_n_order_created < xv_int_max_order: # for i in range(xv_int_create_order_num): new_order = Order(model=self, time_created=self.t) return new_order # return None # Execute the interactions of each time step in the simulation. def step(self): # organize the interactions of agents in each time step here new_order = self.create_order() # 接收创建的订单 self.the_firm.the_os.accept_order(new_order=new_order) self.the_firm.operating() self.update() if self.t >= self.int_stop_time: self.running = False self.stop() else: print(f"running the {self.t} step") print(self.the_firm.the_os.ev_ave_delay_time) # Record data after each simulation def update(self): # ? self.op_os_n_total_order = len(self.the_firm.the_os.a_lst_order) # 订单个数 # self.op_os_n_total_order_delayed = len([e for e in self.the_firm.the_os.a_lst_order if e.xv_dlv_t < self.t]) self.op_os_to_dlv = self.the_firm.the_os.ev_ary_to_dlv # 当期及之前未满足需求总和 # self.op_os_all_delay_time = self.the_firm.the_os.ev_lst_all_delay_time self.op_ps_produced_num = self.the_firm.the_ps.ev_ary_produced_num # 当期产品生产数量 self.op_ps_str_status = self.the_firm.the_os.ev_int_produce_type # 当期生产状态 # self.op_is_current_product = self.model.the_firm.the_is.ev_ary_current_product # # self.op_is_current_material = self.model.the_firm.the_is.ev_ary_current_material # # self.op_is_trans_material= self.model.the_firm.the_is.ev_lst_trans_material self.op_ps_back_trans_material = self.model.the_firm.the_ps.ev_lst_backtrans_material self.record([att for att in self.__dict__.keys() if att.startswith('op_')]) # ? # pass if __name__ == '__main__': dct_para = { 'time': 60, # 进行总时间数 # 'xv_int_max_order': random.randint(30, 50), # 'xv_dlv_product_para': tuple([(30, 100), (30, 50)]), # 'xv_dlv_product_para': tuple([30,40,30,20]), # 读取生产率 np.read. # 'xv_int_dlv_period_lam': 8.5, # 'xv_int_create_order_lam': 2, # 'xv_ary_price_product': tuple([0.3,0.2,0.5,1]), # 'xv_ary_cost_material_per': tuple([0.1,0.1,0.2,0.4]), # 'xv_ary_volume_material': tuple([1.0, 1.5]), # 'xv_ary_volume_product': tuple([3.0, 5.0]), # 'xv_array_lead_time': 2, # 读取原材料表格 np.read, 暂时不读 变量代表的含义 # 'xv_int_lead_time_c': 3, # 'xv_int_lead_time_d': 1, 'xv_ary_product_id': pd.read_excel("initial_product.xlsx").to_numpy()[:, 0], # 产成品id顺序 'xv_ary_material_id': pd.read_excel("initial_material.xlsx").to_numpy()[:, 0], # 原材料id顺序 'xv_product_num': len(pd.read_excel("initial_product.xlsx").to_numpy()), # 产成品个数 'xv_material_num': len(pd.read_excel("initial_material.xlsx").to_numpy()), # 原材料个数 'xv_ary_initial_product_num': pd.read_excel("initial_product.xlsx").to_numpy(), # 初始产成品库存 23x2 'xv_ary_initial_material_num': pd.read_excel("initial_material.xlsx").to_numpy(), # 初始原材料库存 115x2 'xv_ary_bom': pd.read_excel("bom23.xlsx").to_numpy(), # bom表 'xv_ary_plan': pd.read_excel("plan.xlsx").to_numpy(), # plan表 'xv_ary_s': pd.read_excel("rawmaterial - s.xlsx").to_numpy(), # s 'xv_ary_S': pd.read_excel("rawmaterialS.xlsx").to_numpy(), # S # 应读取遗传算法中随机生成的s,暂写为'1' 创建两个excel分别存储产品和原材料的库存 每个excel中存系统代码和库存 # 'xv_flt_initial_cash': 50000.0, # 'dct_status_info': json.dumps({ #需要引入生产状态表 # "0": {"xv_flt_produce_rate": tuple([0.0, 0.0]), # "xv_ary_mat_material": tuple([0.0, 0.0]), # "xv_flt_broken_rate": 0, # "xv_flt_run_cost": 0.0, # "name": "wait" # }, # "1": {"xv_flt_produce_rate": tuple([90.0, 0.0]), # "xv_ary_mat_material": tuple([4.0, 1.0]), # "xv_flt_broken_rate": 0.03, # "xv_flt_run_cost": 40.0, # "name": "produceA" # }, # "2": {"xv_flt_produce_rate": tuple([0.0, 60.0]), # "xv_ary_mat_material": tuple([1.5, 5.0]), # "xv_flt_broken_rate": 0.05, # "xv_flt_run_cost": 50.0, # "name": "produceB" # }, # "3": {"xv_flt_produce_rate": tuple([55.0, 30.0]), # "xv_ary_mat_material": tuple([2.0, 1.5]), # "xv_flt_broken_rate": 0.07, # "xv_flt_run_cost": 60.0, # "name": "produceAB" # }, # "-1": {"xv_flt_produce_rate": 0.0, # "xv_ary_mat_material": tuple([0.0, 0.0]), # "xv_flt_broken_rate": 0.1, # "xv_flt_run_cost": 100.0, # "name": "failed" # } # }) } sample = ap.Sample(dct_para) exp = ap.Experiment(FMSEnv, sample, iterations=1, record=True) results = exp.run() # results['variables']['FMSEnv'].to_excel(f"simulation-results-{datetime.today().strftime('%Y-%m-%d-%H-%M-%S')}.xlsx", # engine='openpyxl')