diff --git a/env.py b/env.py index 40c3e87..b92e811 100644 --- a/env.py +++ b/env.py @@ -1,77 +1,91 @@ -import agentpy as ap - -from random import uniform - -from worker import WorkerAgent -from firm import FirmAgent - - -class Env(ap.Model): - - float_market_size: float - percent_rh: float - percent_search: float - n_worker: int - n_firm: int - - a_lst_worker: ap.AgentList - a_lst_firm: ap.AgentList - - def setup(self): - # 工作人员、企业数量、搜寻企业数量赋值 - self.n_worker = self.p.n_worker - self.n_firm = self.p.n_firm - self.percent_search = self.p.percent_search - # 工人、企业列表 - self.a_lst_worker = ap.AgentList(self) - self.a_lst_firm = ap.AgentList(self) - - # 在工人列表中添加工人 - for i in range(self.n_worker): - # 初始化 workeragent,并把alpha属性传过去 - w = WorkerAgent(self, self.p.alpha) - self.a_lst_worker.append(w) - - # 在企业列表中添加企业 - for i in range(self.n_firm): - f = FirmAgent(self) - self.a_lst_firm.append(f) - - def step(self): - self.a_lst_worker.select_firm() - - if self.t == 100: - self.stop() - pass - - def provide_lst_random_firms(self, the_worker: WorkerAgent): - '''选择企业数量 = 企业总数*百分比 - 选择企业的列表 = 随机选择的企业的个数 - 如果员工处于被雇佣的状态: - 如果员工工作的企业在随机选定的企业列表中: - 打开列表中的企业 - 移除该企业 - 返回值:移除后,再重新选择随机选择企业 - 否则: - 返回值:选择企业列表 - ''' - n_select_firms = int(self.percent_search * self.n_firm) - a_lst_select_firms = self.a_lst_firm.random(n_select_firms) - if the_worker.s_is_hired: - if the_worker.working_firm in a_lst_select_firms: - lst_f = self.a_lst_firm.to_list() - lst_f.remove(the_worker.working_firm) - return ap.AgentList(self, lst_f).random(n_select_firms) - else: - return ap.AgentList(self, a_lst_select_firms) - - -if __name__ == '__main__': - dict_para = {'n_worker': 100, - 'n_firm': 10, - 'percent_search': 0.2, - 'alpha': 0.5} - my_model = Env(dict_para) - my_model.run() - - +import agentpy as ap + +from random import uniform + +from worker import WorkerAgent +from firm import FirmAgent + + +class Env(ap.Model): + + float_market_size: float + percent_rh: float + percent_search: float + n_worker: int + n_firm: int + e_revenue: float + + a_lst_worker: ap.AgentList + a_lst_firm: ap.AgentList + + def setup(self): + # 工作人员、企业数量、搜寻企业数量赋值 + self.n_worker = self.p.n_worker + self.n_firm = self.p.n_firm + self.percent_search = self.p.percent_search + # 工人、企业列表 + self.a_lst_worker = ap.AgentList(self) + self.a_lst_firm = ap.AgentList(self) + self.e_revenue = 119.3 + + # 在工人列表中添加工人 + for i in range(self.n_worker): + # 初始化 workeragent,并把alpha属性传过去 + w = WorkerAgent(self, self.p.alpha) + self.a_lst_worker.append(w) + + # 在企业列表中添加企业,放入一个is_RH_ratio, 即有多大比例的企业是属于RH类型的 + for i in range(self.n_firm): + # 对于企业属性true or false 的判断, 影响到firm 板块下, self.s_IsRH = is_RH 语句的判断 + f = FirmAgent(self, self.p.is_RH_ratio >= uniform(0, 1)) + self.a_lst_firm.append(f) + + def update_e_revenue(self): + self.e_revenue += 0.01 * self.e_revenue + + def step(self): + self.update_e_revenue() + # 先清空每次的选择列表 + self.a_lst_firm.empty_apply() + # 一开始worker要去选择很多firm + self.a_lst_worker.select_firm() + # 第二步, firm 去选 worker + self.a_lst_firm.select_worker() + + if self.t == 100: + self.stop() + pass + + def provide_lst_random_firms(self, the_worker: WorkerAgent): + '''选择企业数量 = 企业总数*百分比 + 选择企业的列表 = 随机选择的企业的个数 + 如果员工处于被雇佣的状态: + 如果员工工作的企业在随机选定的企业列表中: + 打开列表中的企业 + 移除该企业 + 返回值:移除后,再重新选择随机选择企业 + 否则: + 返回值:选择企业列表 + ''' + n_select_firms = int(self.percent_search * self.n_firm) + a_lst_select_firms = self.a_lst_firm.random(n_select_firms) + if the_worker.s_is_hired: + if the_worker.working_firm in a_lst_select_firms: + # 转换为 list + lst_f = list(self.a_lst_firm) + lst_f.remove(the_worker.working_firm) + return ap.AgentList(self, lst_f).random(n_select_firms) + # 假如以上都不满足, 直接返回 + return ap.AgentList(self, a_lst_select_firms) + + +if __name__ == '__main__': + dict_para = {'n_worker': 100, + 'n_firm': 20, + 'percent_search': 0.2, + 'alpha': 0.5, + 'is_RH_ratio': 0.5} + my_model = Env(dict_para) + my_model.run() + + diff --git a/firm.py b/firm.py index 3121928..c535e9c 100644 --- a/firm.py +++ b/firm.py @@ -1,105 +1,138 @@ -from typing import Union, Any - -import agentpy as ap -from random import uniform, randint - -class FirmAgent(ap.Agent): - c_incentive: float - s_IsRH: bool - s_a_senior_yield: float - s_a_junior_yield: float - s_a_yield: float - s_salary: float - s_revenue: float - s_profit: float - l_senior_workers: list - l_junior_workers: list - l_applied_workers: list - - def setup(self): - self.c_incentive = uniform(0, 1) - self.s_profit = randint(10, 20) - self.l_applied_workers = [] - - def apply(self, the_worker): - self.l_applied_workers.append(the_worker) - - def select_worker(self): - ''' - 企业找到最想招聘的员工 - :return: - ''' - n_a_firms = len(self.l_applied_workers) - # 对员工产出进行排队和比较 - max_s_yield: float - max_s_yield, best_worker = 0, None - for i in self.l_applied_workers: - y = max_s_yield - if y > max_s_yield: - max_s_yield = y - best_worker = i - # print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}') - # return best_worker - # 当企业是想要利用产出最高的员工时,从申请的员工中选出产出最高的员工 - '''if self.s_IsRH : - # 计算该名员工的工资水平: 原有工资水平* (1+incentive) - bw_salary = self.best_worker.salary() - - # 将该名员工的产出与公司原有员工的产出进行对比,如果高于senior列表中的最后一名的产出,就进入senior_list, 否则进入junior_list - self.l_senior_workers.append(best_worker) - else: - self.l_junior_workers.append(best_worker) - # best_worker.apply(self) - else: - # 计算由于改名员工下一期的薪资总数(根据薪资函数更新)以及企业下一期的利润的差值,并选出最大值 - ''' - - def S_a_Yield(self): - # 更新总产出 - # s_total_senior_yield = self.s_a_senior_yield * len(self.l_senior_workers) - # s_total_junior_yield = self.s_a_junior_yield * len(self.l_senior_workers) - l_s_workers = self.l_senior_workers - l_j_workers = self.l_junior_workers - # s_a_yield = s_total_yield / len(self.l_senior_workers + self.l_junior_workers) - s_total_senior_yield, s_total_junior_yield = 0, 0 - '''for i in range(len(self.l_senior_workers)): - # 加入工人主体的产出属性 - s_total_senior_yield += '工人的yield' - for i in range (len(self.l_junior_workers)): - s_total_junior_yield += '工人的yield' - s_a_senior_yield = s_total_senior_yield / len(l_s_workers) - s_a_junior_yield = s_total_junior_yield / len(l_j_workers) - s_a_yield = 0.8*s_a_senior_yield+0.2*s_a_junior_yield - return s_a_yield - ''' - return - - def LogitShare(self): - ''' - self.logitshare = exp(S_a_Yield)/ np.sum(exp(S_a_Yield)) - :return: - ''' - return - - def Sum_salary(self, l_senior_workers, l_junior_workers): - ''' - 计算某公司整体的薪金水平 - ''' - l_all_workers = l_senior_workers + l_junior_workers - for i in l_all_workers: - #self.sum_salary = np.sum() - return - - def S_Profit(self): - # self.s_profit =self.LogitShare() * EnvironmentAgent.s_e_Irevenue - self.Sum_salary() - return - - def step(self): - ''' - 更新三个集合: - 首先,判断员工i在企业j第t时期的l_applied_workers列表里; - 第二步,判断企业j 的s_IsRH是0或者是1; - 如果是1,则将应聘员工中s_w_yield最大的员工从l_applied_workers列表转移到l_senior_workers或l_junior_workers列表; - 否则(即如果是0),则计算每名员工如果进入企业,下一期企业利润的提升max的员工, - 从l_applied_workers列表转移到l_senior_workers或l_junior_workers列表。 - ''' +import math +from typing import Union, Any + +import agentpy as ap +from random import uniform, randint + +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from worker import WorkerAgent + + +class FirmAgent(ap.Agent): + c_incentive: float + s_IsRH: bool + s_avg_senior_yield: float + s_avg_junior_yield: float + s_a_yield: float + # s_salary: float + initial_f_salary: float + s_revenue: float + s_profit: float + l_senior_workers: list + l_junior_workers: list + l_applied_workers: list + + def setup(self, is_RH): + self.c_incentive = uniform(0, 1) + # self.s_profit = randint(10, 20) + self.l_applied_workers = [] + self.s_IsRH = is_RH + self.initial_f_salary = randint(8000, 10000) + + def apply(self, the_worker): + self.l_applied_workers.append(the_worker) + + def empty_apply(self): + self.l_applied_workers = [] + + def select_worker(self): + ''' + 企业找到最想招聘的员工 + :return: + ''' + n_workers = len(self.l_applied_workers) + if n_workers > 0: + selected_worker = None + if n_workers > 1: + # 对员工产出进行排队和比较 + # 先判断是什么选择方式 + if self.s_IsRH: + max_s_yield, best_worker = 0.0, None + for the_worker in self.l_applied_workers: + if the_worker.s_yield > max_s_yield: + max_s_yield = the_worker.s_yield + best_worker = the_worker + selected_worker = best_worker + else: + selected_worker = self.l_applied_workers[0] # TODO + # print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}') + # return best_worker + # 当企业是想要利用产出最高的员工时,从申请的员工中选出产出最高的员工 + # if self.s_IsRH : + # # 计算该名员工的工资水平: 原有工资水平* (1+incentive) + # # bw_salary = self.best_worker.salary() + # + # # 将该名员工的产出与公司原有员工的产出进行对比,如果高于senior列表中的最后一名的产出,就进入senior_list, 否则进入junior_list + # self.l_senior_workers.append(best_worker) + # else: + # self.l_junior_workers.append(best_worker) + # # best_worker.apply(self) + # else: + # # 计算由于改名员工下一期的薪资总数(根据薪资函数更新)以及企业下一期的利润的差值,并选出最大值 + else: + selected_worker = self.l_applied_workers[0] + selected_worker.update_working_firm_is_hired(self) + self.update_two_worker_list(selected_worker) + + def update_two_worker_list(self, new_worker): + lst_all_worker = self.l_senior_workers + self.l_junior_workers + [new_worker] + lst_sorted = lst_all_worker.sort(key=lambda x: x['s_yield'], reverse=True) # from highest yield to lowest + n_all_worker = len(lst_all_worker) + # 向上取整数值 + n_senior_worker = math.ceil(n_all_worker * 0.2) + n_junior_worker = n_all_worker - n_senior_worker + self.l_senior_workers = lst_sorted[:n_senior_worker] + if n_junior_worker == 0: + self.l_junior_workers = [] + else: + self.l_junior_workers = lst_sorted[n_senior_worker:] + + def update_yields(self): # 需要改 + n_sw, n_jw = len(self.l_senior_workers), len(self.l_junior_workers) + # acc_all_yield = 0 + + if n_sw == 0: + self.s_avg_senior_yield = 0 + else: + s_acc_senior_yield = 0 + for sw in self.l_senior_workers: + # 加入工人主体的产出属性 + s_acc_senior_yield += sw.s_yield + # acc_all_yield += sw.s_yield + self.s_avg_senior_yield = s_acc_senior_yield / n_sw + + if n_jw == 0: + self.s_avg_junior_yield = 0 + else: + s_acc_junior_yield = 0 + for jw in self.l_junior_workers: + # 加入工人主体的产出属性 + s_acc_junior_yield += jw.s_yield + self.s_avg_junior_yield = s_acc_junior_yield / n_jw + self.s_a_yield = 0.8 * self.s_avg_senior_yield + 0.2 * self.s_avg_junior_yield + + def logit_share(self): + logit_share = math.exp(self.s_a_yield) / np.sum(math.exp(self.s_a_yield)) + return logit_share + + def sum_salary(self, l_senior_workers, l_junior_workers): + ''' + 计算某公司整体的薪金水平 + ''' + l_all_workers = l_senior_workers + l_junior_workers + sum_salary = self.salary + for the_worker in l_all_workers: + sum_salary += the_worker.salary + return + + def s_profit(self): + # ? + self.s_profit = self.logit_share() * self.p.e_revenue - self.sum_salary() + return + + def step(self): + pass diff --git a/main.py b/main.py index 5596b44..bf8b9e5 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,16 @@ -# This is a sample Python script. - -# Press Shift+F10 to execute it or replace it with your code. -# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings. - - -def print_hi(name): - # Use a breakpoint in the code line below to debug your script. - print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint. - - -# Press the green button in the gutter to run the script. -if __name__ == '__main__': - print_hi('PyCharm') - -# See PyCharm help at https://www.jetbrains.com/help/pycharm/ +# This is a sample Python script. + +# Press Shift+F10 to execute it or replace it with your code. +# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings. + + +def print_hi(name): + # Use a breakpoint in the code line below to debug your script. + print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint. + + +# Press the green button in the gutter to run the script. +if __name__ == '__main__': + print_hi('PyCharm') + +# See PyCharm help at https://www.jetbrains.com/help/pycharm/ diff --git a/worker.py b/worker.py index e05fc0f..7a5d89b 100644 --- a/worker.py +++ b/worker.py @@ -1,77 +1,93 @@ -import math - -import agentpy as ap -from random import uniform, randint -from firm import FirmAgent - - -class WorkerAgent(ap.Agent): - select_firm: object - c_effort: float - s_is_hired: bool - c_work_months: int - s_work_duration: int - working_firm: FirmAgent - s_salary: float - s_yield: float - # s_w_applied: bool - c_alpha: float - - def setup(self, alpha): - # super().__init__(unique_id, model) - # self.num_workers = 10000 - self.c_effort = uniform(0, 1) - self.c_work_months = randint(0, 60) - self.s_is_hired = False - self.c_alpha = alpha - # return - - # def A_Utility(self, c_w_weight=0.5): - # a = np.exp(FirmAgent.c_f_incentive, c_w_weight) - # b = np.exp(FirmAgent.s_f_profit/max(FirmAgent.s_f_profit), 1-c_w_weight) - # self.select_firm = a * b - - def select_firm(self): - ''' - 挑选出来的企业列表 - 数量:列表的长度 - ''' - lst_firms = self.model.provide_lst_random_firms(self) - n_firms = len(lst_firms) - # find the max incentive and profit among all firms - max_incentive, max_profit = 0, 0 - for f in lst_firms: - if f.c_incentive > max_incentive: - max_incentive = f.c_incentive - if f.s_profit > max_profit: - max_profit = f.s_profit - # computer the utility for each firm - max_utility, best_firm = 0, None - for f in lst_firms: - u = math.pow(f.c_incentive / max_incentive, self.c_alpha) * math.pow(f.s_profit/max_profit, 1-self.c_alpha) - if u > max_utility: - max_utility = u - best_firm = f - # print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}') - # 选出能够给自己带来最好的效用的企业,并输出/返回 - best_firm.apply(self) - return best_firm - - def salary(self, the_firm: FirmAgent): - ''' - 如果员工首次受到雇佣,薪资 = 某公司初始薪资 - 如果员工更换了公司, 薪资 = 原薪资 * (1+c_incentive) - ? 换公司 - :return: - ''' - pass - - - - def step(self): - if self.s_is_hired == 1: - self.s_work_duration +=1 - self.s_yield = 2 / (1 + math.exp(-0.01 * self.s_work_duration * self.c_effort)) - 1 - ''' - 是否更换公司成功的状态转换? +import math + +import agentpy as ap +from random import uniform, randint + +# 编程可以自动补充一些东西,减少报错 +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from firm import FirmAgent + + +class WorkerAgent(ap.Agent): + select_firm: object + c_effort: float + s_is_hired: bool + # c_work_months: int + s_work_duration: int + working_firm: 'FirmAgent' + s_salary: float + s_yield: float + # s_w_applied: bool + c_alpha: float + + def setup(self, alpha): + # super().__init__(unique_id, model) + # self.num_workers = 10000 + self.c_effort = uniform(0, 1) + # self.c_work_months = randint(0, 60) + self.s_is_hired = False + self.s_work_duration = randint(0, 60) + self.c_alpha = alpha + self.update_yield() + self.s_salary = 0 + + # return + + # def A_Utility(self, c_w_weight=0.5): + # a = np.exp(FirmAgent.c_f_incentive, c_w_weight) + # b = np.exp(FirmAgent.s_f_profit/max(FirmAgent.s_f_profit), 1-c_w_weight) + # self.select_firm = a * b + + def select_firm(self): + ''' + 挑选出来的企业列表 + 数量:列表的长度 + ''' + lst_firms = self.model.provide_lst_random_firms(self) + n_firms = len(lst_firms) + # find the max incentive and profit among all firms + max_incentive, max_profit = 0, 0 + for f in lst_firms: + if f.c_incentive > max_incentive: + max_incentive = f.c_incentive + if f.s_profit > max_profit: + max_profit = f.s_profit + # computer the utility for each firm + max_utility, best_firm = 0, None + for f in lst_firms: + u = math.pow(f.c_incentive / max_incentive, self.c_alpha) * math.pow(f.s_profit/max_profit, 1-self.c_alpha) + if u > max_utility: + max_utility = u + best_firm = f + # print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}') + # 选出能够给自己带来最好的效用的企业,并输出/返回 + best_firm.apply(self) + return best_firm + + def update_wd_by_is_hired(self): + if self.s_is_hired == 1: + self.s_work_duration += 1 + self.update_yield() + self.update_salary() + + def update_salary(self, the_firm: 'FirmAgent'): + if self.s_salary == 0: + self.s_salary = the_firm.initial_f_salary + pass + else: + self.s_salary = self.s_salary * (1 + the_firm.c_incentive) + pass + + def update_yield(self): + self.s_yield = 2 / (1 + math.exp(-0.01 * self.s_work_duration * self.c_effort)) - 1 + + def update_working_firm_is_hired(self, f: 'FirmAgent'): + self.s_is_hired = True + self.working_firm = f + + def step(self): + self.update_wd_by_is_hired() + ''' + 是否更换公司成功的状态转换? ''' \ No newline at end of file