import agentpy as ap import numpy as np from random import uniform import math from worker import WorkerAgent from firm import FirmAgent # plt.ion() class Env(ap.Model): float_market_size: float # percent_rh: float percent_search: float is_RH_ratio: float is_FH_ratio: float n_worker: int n_firm: int e_revenue: float a_lst_worker: ap.AgentList a_lst_firm: ap.AgentList """ Worker: Mean(s), Gini(s) Firm: Mean((pi_{j,t})), Gini(pi_{j,t}), Mean(s_a_yield) Env: Percent(IsHired) """ out_w_avg_salary: float out_w_gini_salary: float out_f_avg_profit: float out_f_avg_yield: float out_f_gini_profit: float out_w_percent_hired: float def __init__(self, dct_all, _run_id=None): super().__init__() # 工作人员、企业数量、搜寻企业数量赋值 self.n_worker = int(dct_all['n_worker']) self.n_firm = int(dct_all['n_firm']) self.percent_search = float(dct_all['percent_search']) self.is_RH_ratio = float(dct_all['is_RH_ratio']) self.is_FH_ratio = float(dct_all['is_FH_ratio']) # 工人、企业列表 self.a_lst_worker = ap.AgentList(self) self.a_lst_firm = ap.AgentList(self) self.e_revenue = 5815100000000 # 在工人列表中添加工人 for i in range(self.n_worker): # 初始化 worker agent,并把alpha属性传过去 w = WorkerAgent(self, float(dct_all['alpha'])) self.a_lst_worker.append(w) # 在企业列表中添加企业,放入一个is_RH_ratio, 即有多大比例的企业是属于RH类型的 for i in range(self.n_firm): # 对于企业属性true or false 的判断, 影响到firm 板块下, self.s_IsRH = is_RH 语句的判断 f = FirmAgent(self, self.is_RH_ratio >= uniform(0, 1), self.is_FH_ratio >= uniform(0, 1)) self.a_lst_firm.append(f) self.running = True self.t = 0 def update_e_revenue(self): self.e_revenue += 0.0027 * self.e_revenue def step(self): self.update_e_revenue() # 先清空每次的选择列表 self.a_lst_firm.empty_apply() # 一开始worker要去选择很多firm self.a_lst_worker.select_firm() # 第二步, firm 去选 worker self.a_lst_firm.select_worker() self.a_lst_firm.update_yields() self.provide_logit_share() self.a_lst_firm.update_s_profit() self.create_and_destroy_bankrupt_firms() self.a_lst_worker.update_wd_by_is_hired() # self.picture_out() self.update() if self.t >= 200: self.running = False self.stop() else: self.t += 1 # print(f"running the {self.t} step") def update(self): lst_salary = [] n_hired = 0 for w in self.a_lst_worker: lst_salary.append(w.s_salary) if w.s_is_hired: n_hired += 1 n_workers = len(lst_salary) self.out_w_avg_salary = sum(lst_salary) / n_workers self.out_w_gini_salary = self.gini(lst_salary) lst_profit = [] lst_yield = [] # n_w_firm = 0 for f in self.a_lst_firm: lst_profit.append(f.s_profit) lst_yield.append(f.s_a_yield) # if f.s_profit > 0: # n_w_firm += 1 n_firms = len(lst_profit) self.out_f_avg_profit = sum(lst_profit) / n_firms self.out_f_avg_yield = sum(lst_yield) / n_firms self.out_f_gini_profit = self.gini(lst_profit) self.out_w_percent_hired = n_hired / n_workers self.record('out_w_avg_salary') self.record('out_w_gini_salary') self.record('out_f_avg_profit') self.record('out_f_avg_yield') self.record('out_f_gini_profit') self.record('out_w_percent_hired') def create_and_destroy_bankrupt_firms(self): for f in self.a_lst_firm: if f.s_value < 0: # 直接淘汰企业,并新增一个企业 for worker in f.l_senior_workers: worker.s_is_hired = False worker.working_firm = None for worker in f.l_junior_workers: worker.s_is_hired = False worker.working_firm = None self.a_lst_firm.remove(f) del f new_f = FirmAgent(self, self.is_RH_ratio >= uniform(0, 1), self.is_FH_ratio >= uniform(0, 1)) self.a_lst_firm.append(new_f) else: if f.s_profit < 0: f.l_all_w = f.l_junior_workers + f.l_senior_workers min_unit_yield_salary, worst_worker = float('inf'), None for worker in f.l_all_w: unit_yield_salary = worker.s_yield if f.s_IsFH else worker.s_yield * 10000 / worker.s_salary if unit_yield_salary < min_unit_yield_salary: min_unit_yield_salary = unit_yield_salary worst_worker = worker worst_worker.s_is_hired = False worst_worker.working_firm = None assert len(self.a_lst_firm) == self.n_firm, \ f'current num firm {len(self.a_lst_firm)} != expected num firm {self.n_firm}' def provide_lst_random_firms(self, the_worker: WorkerAgent): """ 选择企业数量 = 企业总数*百分比 选择企业的列表 = 随机选择的企业的个数 如果员工处于被雇佣的状态: 如果员工工作的企业在随机选定的企业列表中 打开列表中的企业 移除该企业 返回值:移除后,再重新选择随机选择企业 否则: 返回值:选择企业列表 """ n_select_firms = int(self.percent_search * self.n_firm) a_lst_select_firms = self.a_lst_firm.random(n_select_firms) if the_worker.s_is_hired: if the_worker.working_firm in a_lst_select_firms: # 转换为 list lst_f = list(self.a_lst_firm) lst_f.remove(the_worker.working_firm) return ap.AgentList(self, lst_f).random(n_select_firms) # 假如以上都不满足, 直接返回 return ap.AgentList(self, a_lst_select_firms) def provide_logit_share(self): fen_mu = 0 for f in self.a_lst_firm: fen_mu += math.exp(f.s_a_yield) for f in self.a_lst_firm: f.s_revenue = self.e_revenue * math.exp(f.s_a_yield) / fen_mu # def picture_out(self, the_worker: WorkerAgent): # a = self.t # b = len(the_worker.s_salary) # plt.plot(a, b, 'ro') # return plt.show() @staticmethod def gini(x): x = [ele for ele in x if ele > 0] if len(x) == 0: return 0 if sum(x) == 0: return 0 x = np.array(x) mad = np.abs(np.subtract.outer(x, x)).mean() # Mean absolute difference r_mad = mad / np.mean(x) # Relative mean absolute difference return 0.5 * r_mad if __name__ == '__main__': # dict_para = {'n_worker': 1000, # 'n_firm': 100, # 'percent_search': 0.2, # 'alpha': 0.5, # 'is_RH_ratio': 0.5} # my_model = Env(dict_para) # # print(my_model.gini([10000, 0, 0, 0, 0, 0, 0])) # my_model.run() # # a = range(0, 101) # # b = len(WorkerAgent.s_yield) # # plt.plot(a,b) # # plt.show() # plt.show() parameters = { 'n_worker': 1000, 'n_firm': 100, 'alpha': 0.5, 'percent_search': 0.2, 'is_RH_ratio': 0.5, 'is_FH_ratio': 0.5, } sample = ap.Sample(parameters) # sample = ap.Sample(parameters, n=3) # exp = ap.Experiment(Env, sample, iterations=10, record=True) results = exp.run() results['variables']['Env'].to_excel('env_data.xlsx', engine='openpyxl')