salary02/env.py

240 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import agentpy as ap
import numpy as np
from random import uniform
import math
from worker import WorkerAgent
from firm import FirmAgent
# plt.ion()
class Env(ap.Model):
float_market_size: float
# percent_rh: float
percent_search: float
is_RH_ratio: float
is_FH_ratio: float
n_worker: int
n_firm: int
e_revenue: float
a_lst_worker: ap.AgentList
a_lst_firm: ap.AgentList
"""
Worker: Mean(s), Gini(s)
Firm: Mean((pi_{j,t})), Gini(pi_{j,t}), Mean(s_a_yield)
Env: Percent(IsHired)
"""
out_w_avg_salary: float
out_w_gini_salary: float
out_f_avg_profit: float
out_f_avg_yield: float
out_f_gini_profit: float
out_w_percent_hired: float
def __init__(self, dct_all, _run_id=None):
super().__init__()
# 工作人员、企业数量、搜寻企业数量赋值
self.n_worker = int(dct_all['n_worker'])
self.n_firm = int(dct_all['n_firm'])
self.percent_search = float(dct_all['percent_search'])
self.is_RH_ratio = float(dct_all['is_RH_ratio'])
self.is_FH_ratio = float(dct_all['is_FH_ratio'])
# 工人、企业列表
self.a_lst_worker = ap.AgentList(self)
self.a_lst_firm = ap.AgentList(self)
self.e_revenue = 5815100000000
# 在工人列表中添加工人
for i in range(self.n_worker):
# 初始化 worker agent并把alpha属性传过去
w = WorkerAgent(self, float(dct_all['alpha']))
self.a_lst_worker.append(w)
# 在企业列表中添加企业放入一个is_RH_ratio, 即有多大比例的企业是属于RH类型的
for i in range(self.n_firm):
# 对于企业属性true or false 的判断, 影响到firm 板块下, self.s_IsRH = is_RH 语句的判断
f = FirmAgent(self, self.is_RH_ratio >= uniform(0, 1), self.is_FH_ratio >= uniform(0, 1))
self.a_lst_firm.append(f)
self.running = True
self.t = 0
def update_e_revenue(self):
self.e_revenue += 0.0027 * self.e_revenue
def step(self):
self.update_e_revenue()
# 先清空每次的选择列表
self.a_lst_firm.empty_apply()
# 一开始worker要去选择很多firm
self.a_lst_worker.select_firm()
# 第二步, firm 去选 worker
self.a_lst_firm.select_worker()
self.a_lst_firm.update_yields()
self.provide_logit_share()
self.a_lst_firm.update_s_profit()
self.create_and_destroy_bankrupt_firms()
self.a_lst_worker.update_wd_by_is_hired()
# self.picture_out()
self.update()
if self.t >= 200:
self.running = False
self.stop()
else:
self.t += 1
# print(f"running the {self.t} step")
def update(self):
lst_salary = []
n_hired = 0
for w in self.a_lst_worker:
lst_salary.append(w.s_salary)
if w.s_is_hired:
n_hired += 1
n_workers = len(lst_salary)
self.out_w_avg_salary = sum(lst_salary) / n_workers
self.out_w_gini_salary = self.gini(lst_salary)
lst_profit = []
lst_yield = []
# n_w_firm = 0
for f in self.a_lst_firm:
lst_profit.append(f.s_profit)
lst_yield.append(f.s_a_yield)
# if f.s_profit > 0:
# n_w_firm += 1
n_firms = len(lst_profit)
self.out_f_avg_profit = sum(lst_profit) / n_firms
self.out_f_avg_yield = sum(lst_yield) / n_firms
self.out_f_gini_profit = self.gini(lst_profit)
self.out_w_percent_hired = n_hired / n_workers
self.record('out_w_avg_salary')
self.record('out_w_gini_salary')
self.record('out_f_avg_profit')
self.record('out_f_avg_yield')
self.record('out_f_gini_profit')
self.record('out_w_percent_hired')
def create_and_destroy_bankrupt_firms(self):
for f in self.a_lst_firm:
if f.s_value < 0:
# 直接淘汰企业,并新增一个企业
for worker in f.l_senior_workers:
worker.s_is_hired = False
worker.working_firm = None
for worker in f.l_junior_workers:
worker.s_is_hired = False
worker.working_firm = None
self.a_lst_firm.remove(f)
del f
new_f = FirmAgent(self, self.is_RH_ratio >= uniform(0, 1), self.is_FH_ratio >= uniform(0, 1))
self.a_lst_firm.append(new_f)
else:
if f.s_profit < 0:
f.l_all_w = f.l_junior_workers + f.l_senior_workers
min_unit_yield_salary, worst_worker = float('inf'), None
for worker in f.l_all_w:
unit_yield_salary = worker.s_yield if f.s_IsFH else worker.s_yield * 10000 / worker.s_salary
if unit_yield_salary < min_unit_yield_salary:
min_unit_yield_salary = unit_yield_salary
worst_worker = worker
worst_worker.s_is_hired = False
worst_worker.working_firm = None
assert len(self.a_lst_firm) == self.n_firm, \
f'current num firm {len(self.a_lst_firm)} != expected num firm {self.n_firm}'
def provide_lst_random_firms(self, the_worker: WorkerAgent):
"""
选择企业数量 = 企业总数*百分比
选择企业的列表 = 随机选择的企业的个数
如果员工处于被雇佣的状态:
如果员工工作的企业在随机选定的企业列表中
打开列表中的企业
移除该企业
返回值:移除后,再重新选择随机选择企业
否则:
返回值:选择企业列表
"""
n_select_firms = int(self.percent_search * self.n_firm)
a_lst_select_firms = self.a_lst_firm.random(n_select_firms)
if the_worker.s_is_hired:
if the_worker.working_firm in a_lst_select_firms:
# 转换为 list
lst_f = list(self.a_lst_firm)
lst_f.remove(the_worker.working_firm)
return ap.AgentList(self, lst_f).random(n_select_firms)
# 假如以上都不满足, 直接返回
return ap.AgentList(self, a_lst_select_firms)
def provide_logit_share(self):
fen_mu = 0
for f in self.a_lst_firm:
fen_mu += math.exp(f.s_a_yield)
for f in self.a_lst_firm:
f.s_revenue = self.e_revenue * math.exp(f.s_a_yield) / fen_mu
# def picture_out(self, the_worker: WorkerAgent):
# a = self.t
# b = len(the_worker.s_salary)
# plt.plot(a, b, 'ro')
# return plt.show()
@staticmethod
def gini(x):
x = [ele for ele in x if ele > 0]
if len(x) == 0:
return 0
if sum(x) == 0:
return 0
x = np.array(x)
mad = np.abs(np.subtract.outer(x, x)).mean() # Mean absolute difference
r_mad = mad / np.mean(x) # Relative mean absolute difference
return 0.5 * r_mad
if __name__ == '__main__':
# dict_para = {'n_worker': 1000,
# 'n_firm': 100,
# 'percent_search': 0.2,
# 'alpha': 0.5,
# 'is_RH_ratio': 0.5}
# my_model = Env(dict_para)
#
# print(my_model.gini([10000, 0, 0, 0, 0, 0, 0]))
# my_model.run()
# # a = range(0, 101)
# # b = len(WorkerAgent.s_yield)
# # plt.plot(a,b)
# # plt.show()
# plt.show()
parameters = {
'n_worker': 1000,
'n_firm': 100,
'alpha': 0.5,
'percent_search': 0.2,
'is_RH_ratio': 0.5,
'is_FH_ratio': 0.5,
}
sample = ap.Sample(parameters)
# sample = ap.Sample(parameters, n=3)
#
exp = ap.Experiment(Env, sample, iterations=10, record=True)
results = exp.run()
results['variables']['Env'].to_excel('env_data.xlsx', engine='openpyxl')