from mesa.agent import Agent class ProductionLineAgent(Agent): """A production line agent with basic production scheduling.""" def __init__(self, model, line_id, region, factory, is_new_factory, schedule, ramp_ranges, line_factor: float): super().__init__(model) self.line_id = line_id self.region = region self.factory = factory self.is_new_factory = is_new_factory self.line_factor = float(line_factor) # List of dicts: {"product": str, "start_month": int, "end_month": int} self.schedule = schedule # ramp_ranges: {product: {1: value, 2: value, 3: value, 4: value}} self.ramp_ranges = ramp_ranges self.blade_stock = {} # 临时叶片库存 {product: blades not yet assembled} self.unit_stock = {} # 叶片机组库存 {product: assembled units} def _sample_cycle_hours(self, product, month_index): idx = min(max(month_index, 1), 4) prod_key = str(product).strip() base_by_month = self.ramp_ranges.get(prod_key) if base_by_month is None: raise KeyError(f"未找到产品 {prod_key} 的生产效率。") base = base_by_month.get(idx, base_by_month.get(4)) if base is None: raise KeyError(f"产品 {prod_key} 在 month{idx} 缺少生产效率。") factor = self.model.get_factory_factor(self.factory, self.line_id, self.line_factor) return base * factor def step(self): month = self.model.current_month available_hours = self.model.get_available_hours(month) active = [ entry for entry in self.schedule if entry["start_month"] <= month <= entry["end_month"] ] if not active: return # If multiple entries overlap, take the first one in the list. task = active[0] product = task["product"] if not self.is_new_factory: cycle_hours = self._sample_cycle_hours(product, 4) else: if task["start_month"] == 1: month_index = 4 else: month_index = month - task["start_month"] + 1 cycle_hours = self._sample_cycle_hours(product, month_index) produced_blades = available_hours / cycle_hours self.model.record_blade_production( line_id=self.line_id, factory=self.factory, region=self.region, month=month, product=product, blades=produced_blades, ) # Update blade and unit inventories current_blades = self.blade_stock.get(product, 0) + produced_blades units_to_add = int(current_blades // 3) remaining_blades = current_blades - units_to_add * 3 if units_to_add > 0: self.unit_stock[product] = self.unit_stock.get(product, 0) + units_to_add self.blade_stock[product] = remaining_blades self.model.record_blade_stock( line_id=self.line_id, factory=self.factory, region=self.region, month=month, product=product, blades_stock=remaining_blades, ) self.model.record_production( line_id=self.line_id, factory=self.factory, region=self.region, month=month, product=product, units=units_to_add, )