import agentpy as ap import numpy as np from random import uniform import math from traits.trait_types import self from worker import WorkerAgent from firm import FirmAgent # plt.ion() class Env(ap.Model): float_market_size: float percent_rh: float percent_search: float n_worker: int n_firm: int e_revenue: float a_lst_worker: ap.AgentList a_lst_firm: ap.AgentList """ Worker: Mean(s), Gini(s) Firm: Mean(($$\pi_{j,t}$$)), Gini($$\pi_{j,t}$$) Env: Percent(IsHired) """ out_w_avg_salary: float out_w_gini_salary: float out_f_avg_profit: float out_f_gini_profit: float out_w_percent_hired: float def setup(self): # 工作人员、企业数量、搜寻企业数量赋值 self.n_worker = self.p.n_worker self.n_firm = self.p.n_firm self.percent_search = self.p.percent_search # 工人、企业列表 self.a_lst_worker = ap.AgentList(self) self.a_lst_firm = ap.AgentList(self) self.e_revenue = 5815100000000 # 在工人列表中添加工人 for i in range(self.n_worker): # 初始化 workeragent,并把alpha属性传过去 w = WorkerAgent(self, self.p.alpha) self.a_lst_worker.append(w) # 在企业列表中添加企业,放入一个is_RH_ratio, 即有多大比例的企业是属于RH类型的 for i in range(self.n_firm): # 对于企业属性true or false 的判断, 影响到firm 板块下, self.s_IsRH = is_RH 语句的判断 f = FirmAgent(self, self.p.is_RH_ratio >= uniform(0, 1), self.p.is_FH_ratio >= uniform(0, 1)) self.a_lst_firm.append(f) def update_e_revenue(self): self.e_revenue += 0.01 * self.e_revenue def step(self): self.update_e_revenue() # 先清空每次的选择列表 self.a_lst_firm.empty_apply() # 一开始worker要去选择很多firm self.a_lst_worker.select_firm() # 第二步, firm 去选 worker self.a_lst_firm.select_worker() self.a_lst_firm.update_yields() self.provide_logit_share() self.a_lst_firm.update_s_profit() self.create_and_destroy_bankrupt_firms() # self.picture_out() self.update() if self.t == 200: self.stop() # self.picture_out() pass def update(self): lst_salary = [] n_hired = 0 for w in self.a_lst_worker: lst_salary.append(w.s_salary) if w.s_is_hired: n_hired += 1 n_workers = len(lst_salary) self.out_w_avg_salary = sum(lst_salary) / n_workers self.out_w_gini_salary = self.gini(lst_salary) lst_profit = [] n_w_firm = 0 for f in self.a_lst_firm: lst_profit.append(f.s_profit) if f.s_profit > 0: n_w_firm += 1 n_firms = len(lst_profit) self.out_f_avg_profit = sum(lst_profit) / n_firms self.out_f_gini_profit = self.gini(lst_profit) self.out_w_percent_hired = n_hired / n_workers self.record('out_w_avg_salary') self.record('out_w_gini_salary') self.record('out_f_avg_profit') self.record('out_f_gini_profit') self.record('out_w_percent_hired') def create_and_destroy_bankrupt_firms(self): n_bankrupt_firms = 0 for f in self.a_lst_firm: if f.s_value < 0: # 直接淘汰企业(此处设定为清空企业员工,清空企业利润值和价值,相当于重新添加了一个新的企业?) n_bankrupt_firms += 1 for work in f.l_senior_workers: work.s_is_hired = False for work in f.l_junior_workers: work.s_is_hired = False f.s_profit = 0 f.s_value = 0 else: if f.s_profit < 0: if f.s_IsFH: # 第一种方式,末位淘汰制,淘汰所有员工中生产价值最低的 f.l_junior_workers.sort(key=lambda x: x['s_yield'], reverse=True) for work in f.l_junior_workers: # f.l_junior_workers.sort(key=lambda x: x['s_yield'], reverse=True) if work == f.l_junior_workers[-1]: work.s_is_hired = False else: # 第二种方式,末位淘汰制,淘汰所有员工中单位产值(产值/工资)价值最低的 f.l_all_w = f.l_junior_workers + f.l_senior_workers for work in f.l_all_w: work.unit_yield_salary = work.s_yield * 10000 / work.s_salary f.l_all_w.sort(key=lambda x: x['unit_yield_salary'], reverse=True) if work == f.l_all_w[-1]: work.s_is_hired = False # for f in self.a_lst_firm: # if f.s_profit < 0: # TODO # n_bankrupt_firms += 1 # for work in f.l_senior_workers: # work.s_is_hired = False # for work in f.l_junior_workers: # work.s_is_hired = False # self.a_lst_firm.remove(f) # del f # if n_bankrupt_firms > 0: # for _ in range(n_bankrupt_firms): # f = FirmAgent(self, self.p.is_RH_ratio >= uniform(0, 1)) # self.a_lst_firm.append(f) assert len(self.a_lst_firm) == self.n_firm, \ f'current num firm {len(self.a_lst_firm)} != expected num firm {self.n_firm}' def provide_lst_random_firms(self, the_worker: WorkerAgent): '''选择企业数量 = 企业总数*百分比 选择企业的列表 = 随机选择的企业的个数 如果员工处于被雇佣的状态: 如果员工工作的企业在随机选定的企业列表中 打开列表中的企业 移除该企业 返回值:移除后,再重新选择随机选择企业 否则: 返回值:选择企业列表 ''' n_select_firms = int(self.percent_search * self.n_firm) a_lst_select_firms = self.a_lst_firm.random(n_select_firms) if the_worker.s_is_hired: if the_worker.working_firm in a_lst_select_firms: # 转换为 list lst_f = list(self.a_lst_firm) lst_f.remove(the_worker.working_firm) return ap.AgentList(self, lst_f).random(n_select_firms) # 假如以上都不满足, 直接返回 return ap.AgentList(self, a_lst_select_firms) def provide_logit_share(self): fen_mu = 0 for f in self.a_lst_firm: fen_mu += math.exp(f.s_a_yield) for f in self.a_lst_firm: f.s_revenue = self.e_revenue * math.exp(f.s_a_yield) / fen_mu # def picture_out(self, the_worker: WorkerAgent): # a = self.t # b = len(the_worker.s_salary) # plt.plot(a, b, 'ro') # return plt.show() @staticmethod def gini(x): if len(x) == 0: return 0 if sum(x) == 0: return 0 x = np.array(x) mad = np.abs(np.subtract.outer(x, x)).mean() # Mean absolute difference r_mad = mad / np.mean(x) # Relative mean absolute difference return 0.5 * r_mad if __name__ == '__main__': # dict_para = {'n_worker': 1000, # 'n_firm': 100, # 'percent_search': 0.2, # 'alpha': 0.5, # 'is_RH_ratio': 0.5} # my_model = Env(dict_para) # # print(my_model.gini([10000, 0, 0, 0, 0, 0, 0])) # my_model.run() # # a = range(0, 101) # # b = len(WorkerAgent.s_yield) # # plt.plot(a,b) # # plt.show() # plt.show() parameters = { 'n_worker': 1000, 'n_firm': 100, 'percent_search': 0.2, 'alpha': 0.5, # 'alpha': ap.Range(0, 1, 0.5), 'is_RH_ratio': 0.5, 'is_FH_ratio': 0.5, } sample = ap.Sample(parameters) # sample = ap.Sample(parameters, n=3) # exp = ap.Experiment(Env, sample, iterations=10, record=True) results = exp.run() results['variables']['Env'].to_excel('env_data.xlsx', engine='openpyxl')