import math from typing import Union, Any import agentpy as ap from random import uniform, randint from typing import TYPE_CHECKING import numpy as np if TYPE_CHECKING: from worker import WorkerAgent class FirmAgent(ap.Agent): c_incentive: float s_IsRH: bool s_avg_senior_yield: float s_avg_junior_yield: float s_a_yield: float # s_salary: float initial_f_salary: float s_revenue: float s_profit: float s_value: float firing_worker: 'WorkerAgent' l_senior_workers: list l_junior_workers: list l_applied_workers: list # def __init__(self, model, *args, **kwargs): # super().__init__(model, args, kwargs) # self.l_all_workers = None def setup(self, is_RH): self.c_incentive = uniform(0, 1) # self.s_profit = randint(10, 20) self.l_senior_workers, self.l_junior_workers = [], [] self.l_all_workers = [] self.l_applied_workers = [] self.s_IsRH = is_RH self.initial_f_salary = randint(8000, 10000) self.s_profit = 0 self.s_value = 0 def apply(self, the_worker): self.l_applied_workers.append(the_worker) def empty_apply(self): self.l_applied_workers = [] def select_worker(self): ''' 企业找到最想招聘的员工 :return: ''' n_workers = len(self.l_applied_workers) if n_workers > 0: selected_worker = None if n_workers > 1: # 对员工产出进行排队和比较 # 先判断是什么选择方式 if self.s_IsRH: max_s_yield, best_worker = 0.0, None for the_worker in self.l_applied_workers: if the_worker.s_yield > max_s_yield: max_s_yield = the_worker.s_yield best_worker = the_worker selected_worker = best_worker else: # selected_worker = self.l_applied_workers[0] # TODO # 加入worker进入到某个企业,企业更新了自己的利润,利润差值作为该名员工能带来的单位利润,排序利润 # 但是代码中对于员工产出是(0,1)值,需要先更新员工所在列表,然后更新企业产出、利润、工资总额,再用假设利润-上期利润 # 问题是,当期的估算不能考虑到有员工会在下一期离指的问题,或许考虑换成?max_单位产出成本? max_unit_yield_salary, p_salary, best_worker= 0.0,0.0, None for the_worker in self.l_applied_workers: if the_worker.s_salary == 0: the_worker.p_salary = self.initial_f_salary else: the_worker.p_salary = the_worker.s_salary*(1+self.c_incentive) the_worker.unit_yield_salary = the_worker.s_yield * 10000 / the_worker.p_salary if the_worker.unit_yield_salary > max_unit_yield_salary: max_unit_yield_salary = the_worker.unit_yield_salary best_worker = the_worker selected_worker = best_worker # print(f'{self}: my best firm is {best_firm} from {n_firms} firms with utility {max_utility}') # return best_worker # 当企业是想要利用产出最高的员工时,从申请的员工中选出产出最高的员工 # if self.s_IsRH : # # 计算该名员工的工资水平: 原有工资水平* (1+incentive) # # bw_salary = self.best_worker.salary() # # # 将该名员工的产出与公司原有员工的产出进行对比,如果高于senior列表中的最后一名的产出,就进入senior_list, 否则进入junior_list # self.l_senior_workers.append(best_worker) # else: # self.l_junior_workers.append(best_worker) # # best_worker.apply(self) # else: # # 计算由于改名员工下一期的薪资总数(根据薪资函数更新)以及企业下一期的利润的差值,并选出最大值 else: selected_worker = self.l_applied_workers[0] selected_worker.update_working_firm_is_hired(self) self.update_two_worker_list(selected_worker) def update_two_worker_list(self, new_worker): lst_all_worker = self.l_senior_workers + self.l_junior_workers + [new_worker] lst_all_worker.sort(key=lambda x: x['s_yield'], reverse=True) # from highest yield to lowest n_all_worker = len(lst_all_worker) # 向上取整数值 n_senior_worker = math.ceil(n_all_worker * 0.2) n_junior_worker = n_all_worker - n_senior_worker self.l_senior_workers = lst_all_worker[:n_senior_worker] if n_junior_worker == 0: self.l_junior_workers = [] else: self.l_junior_workers = lst_all_worker[n_senior_worker:] def update_yields(self): n_sw, n_jw = len(self.l_senior_workers), len(self.l_junior_workers) # acc_all_yield = 0 if n_sw == 0: self.s_avg_senior_yield = 0 else: s_acc_senior_yield = 0 for sw in self.l_senior_workers: # 加入工人主体的产出属性 s_acc_senior_yield += sw.s_yield # acc_all_yield += sw.s_yield self.s_avg_senior_yield = s_acc_senior_yield / n_sw if n_jw == 0: self.s_avg_junior_yield = 0 else: s_acc_junior_yield = 0 for jw in self.l_junior_workers: # 加入工人主体的产出属性 s_acc_junior_yield += jw.s_yield self.s_avg_junior_yield = s_acc_junior_yield / n_jw self.s_a_yield = 0.8 * self.s_avg_senior_yield + 0.2 * self.s_avg_junior_yield def get_sum_salary(self): ''' 计算某公司整体的薪金水平 ''' l_all_workers = self.l_senior_workers + self.l_junior_workers sum_salary = 0.0 for the_worker in l_all_workers: sum_salary += the_worker.s_salary return sum_salary def update_s_profit(self): self.s_profit = self.s_revenue - self.get_sum_salary() self.s_value += self.s_profit def step(self): pass