import salabim as sim from numpy.random import normal from faults.wear_and_tear import WearAndTear from faults.retry_delay import RetryDelay class ItemGenerator(sim.Component): def animation_objects(self, id): if id == 'Item': ao0 = sim.AnimateText(text=self.name(), textcolor='fg', text_anchor='nw') return 0, 10, ao0 else: ao0 = sim.AnimateRectangle((-20, 0, 20, 20), text=self.name(), fillcolor=id, textcolor='white', arg=self) return 45, 0, ao0 def process(self): for i in range(30): # 设定为 30 items # Clock() Item() print("item" + str(i) + " generated——————————") yield self.hold(sim.Uniform(1, 2).sample()) class Item(sim.Component): def process(self): self.enter(q) yield self.request(crane) Crane('Crane1', 30, logger) yield self.request(Manual_Step) ManualStep("MANUAL_INSPECTION", 37, logger).spawn() yield self.request(conveyor) Conveyor("CONVEYOR1", 30, logger).spawn() yield self.request(Bowl_Feeder) BowlFeeder("Bowl1", 30, logger) ManualStep("MANUAL_ADD_COMPONENTS1", 21, logger) Conveyor("CONVEYOR2", 30, logger) # ————————添加retry delay故障—————————— # self.bowl2 = BowlFeeder("BOWL2", 10, logger).add_fault(RetryDelay(BowlFeeder("BOWL2", 10, logger))) BowlFeeder("BOWL2", 10, logger) ManualStep("MANUAL_ADD_COMPONENTS2", 34, logger) Conveyor("CONVEYOR3", 30, logger) Crane("CRANE_INPUT_SUBASSEMBLY_A", 10, logger) ManualStep("MANUAL_COMBINE_SUBASSEMBLY_A", 34, logger) Conveyor("CONVEYOR4", 30, logger) Conveyor("CONVEYOR_INPUT_SUBASSEMBLY_B", 10, logger) ManualStep("MANUAL_COMBINE_SUBASSEMBLY_B", 35, logger) # ———————— 添加磨损故障 ———————————— # Conveyor("CONVEYOR5", 30, logger).add_fault(WearAndTear(Conveyor("CONVEYOR5", 30, logger))) Conveyor("CONVEYOR5", 30, logger) BowlFeeder("BOWL3", 5, logger) Conveyor("CONVEYOR6", 10, logger) ManualStep("MANUAL_ADD_COVER_AND_BOLTS", 76, logger) Conveyor("CONVEYOR7", 30, logger) ManualStep("MANUAL_TIGHTEN_BOLTS1", 28, logger) Conveyor("CONVEYOR8", 30, logger) Conveyor("CONVEYOR_INPUT_SUBASSEMBLY_C", 10, logger) ManualStep("MANUAL_COMBINE_SUBASSEMBLY_C", 60, logger) Conveyor("CONVEYOR9", 21, logger) ManualStep("MANUAL_TIGHTEN_BOLTS2", 16, logger) Conveyor("CONVEYOR10", 21, logger) BowlFeeder("BOWL4", 5, logger) ManualStep("MANUAL_ADD_COMPONENTS3", 11, logger) Conveyor("CONVEYOR11", 21, logger) ManualStep("MANUAL_TIGHTEN_BOLTS3", 32, logger) self.leave(q) # 添加leave语句时,与上述步骤同时开始执行,queue始终为0,不增;(不添加leave语句只增不减) class BowlFeeder(sim.Component): def __init__(self, name, duration, logger, debug=True): super(BowlFeeder, self).__init__() self.name = name self.duration = duration self.faults = [] self.logger = logger self.debug = debug def add_fault(self, fault): self.faults.append(fault) def process(self): if self.debug: sim.waiting = "waiting" waiting.set() if self.debug: print(self.name + ": give") giving.set() giving.trigger(max=1) print(self.name + ": given") yield self.hold(delay(self.duration, 1)) self.logger.addMessage(self.name + " GIVEN") self.release() def spawn(self): # 尝试修改return的结果后调用spawn 未发现明显变化 self.process() return self.passivate() def get_events(self): return [self.name + " GIVEN"] class ManualStep(sim.Component): def __init__(self, name, duration, logger, debug=True): super(ManualStep, self).__init__() self.debug = debug self.duration = duration self.queue = 0 self.logger = logger self.name = name def process(self): if self.debug: print(self.name + ": input") self.queue = self.queue + 1 if (self.queue >= 5): self.logger.addMessage(self.name + " QUEUE_ALARM") if self.debug: print(self.name + ": process") running.set() self.queue = self.queue - 1 if self.debug: print(self.name + ": ok") if self.debug: print(self.name + ": wait") waiting.set() yield self.hold(delay(self.duration, 5)) self.logger.addMessage(self.name + " OK") self.release() def spawn(self): return self.process() def get_events(self): return [self.name + " QUEUE_ALARM", self.name + " OK"] class Crane(sim.Component): def __init__(self, name, duration, logger, debug=True): super(Crane, self).__init__() self.debug = debug self.duration = duration self.queue = 0 self.logger = logger self.name = name def process(self): if self.debug: print(self.name + ": input") # yield self.request(crane) self.queue = self.queue + 1 if self.debug: print(self.name + ": go_forward") self.logger.addMessage(self.name + " FORWARD") yield self.hold(delay(self.duration, 1)) self.release() self.queue = self.queue - 1 running.set() if self.debug: print(self.name + ": wait") waiting.set() if self.debug: print(self.name + ": item_taken") running.set() print(self.name + ": go_back") self.logger.addMessage(self.name + " BACKWARD") yield self.hold(delay(self.duration, 1)) print(self.name + ": stop") self.logger.addMessage(self.name + " STOP") self.passivate() def spawn(self): return self.process() def get_events(self): return [self.name + " FORWARD", self.name + " BACKWARD", self.name + " STOP"] class Conveyor(sim.Component): def __init__(self, name, duration, logger, debug= True): super(Conveyor, self).__init__() self.name = name self.duration = duration self.faults = [] self.logger = logger self.debug = debug def add_fault(self, fault): self.faults.append(fault) def process(self): if self.debug: waiting.set() print(self.name + ": input") self.logger.addMessage(self.name + " CONVEYOR_GATE") giving.set() giving.trigger(max=1) if self.debug: print(self.name + ": to_next_step") # self.item = waitingline.pop() # self.state = "giving" yield delay(self.duration, 1) for fault in self.faults: yield fault.spawn() def spawn(self): # 没有找到合适的函数替代原本spawn函数 return self.activate(process='process') def get_events(self): return [self.name + " CONVEYOR_GATE"] class Clock():# 未调用 def __init__(self, env, logger, debug=True): self.debug = debug self.env= env self.logger = logger def process(self): print("Tick") self.logger.addMessage("TICK") yield 10 def spawn(self): return self.process() def get_events(self): return ["TICK"] def delay(duration, percentage_variation): stdev = percentage_variation / 100.0 * duration random_additive_noise = normal(0, stdev) return max(0, int(duration + random_additive_noise)) class Logger: """ This class logs all the messages. """ def __init__(self,env): self.env = env self.loglines = [] def addMessage(self, type, metadata = ""): logline = LogLine(self.env.now(), type, metadata) self.loglines.append(logline) def indentLine(self, lineStr): return " " + lineStr def indent(self, linesStr): lines = linesStr.split("\n") return "\n".join(map(self.indentLine, lines)) def getLoglines(self): return "[\n" + (",\n".join(map(self.indent, map(str, self.loglines))) + "\n]\n") class LogLine: def __init__(self, timestamp, msgtype, metadata): self.timestamp = timestamp self.msgtype = msgtype self.metadata = metadata def __str__(self): return "{\n \"timestamp\": " + str(self.timestamp) + ",\n \"type\": \"" + self.msgtype + "\"\n}" env = sim.Environment() # env = sim.Environment(trace= True) giving = sim.State("giving") waiting = sim.State("waiting") running = sim.State("running") logger = Logger(env) ItemGenerator() # clock = Clock(env, logger) # clock.spawn() # clock函数不显示 q = sim.Queue('queue') env.background_color('20%gray') Bowl_Feeder = sim.Resource("Bowl_Feeder", 1) conveyor = sim.Resource("conveyor", 1) crane = sim.Resource("crane", 1) Manual_Step= sim.Resource("Manual_Step",1) # 直接运行并存储 —————————————————————————————————————————————— # env.run() # print() # q.print_statistics() # f = open("output.json", "w") # f.write(logger.getLoglines()) # f.close() # 可视化 ———————————————————————————————————————————————————— qa0 = sim.AnimateQueue(q, x=100, y=250, title='queue, normal', direction='e', id='blue') qa1 = sim.AnimateQueue(q, x=100, y=350, title='queue, maximum 6 components', direction='e', max_length=6, id='red') qa2 = sim.AnimateQueue(q, x=100, y=150, title='queue, reversed', direction='e', reverse=True, id='green') sim.AnimateMonitor(q.length, x=10, y=450, width=480, height=100, horizontal_scale=3, vertical_scale=3) sim.AnimateMonitor(q.length_of_stay, x=10, y=570, width=480, height=100, horizontal_scale=5, vertical_scale=5) sim.AnimateText(text=lambda: q.length.print_histogram(as_str=True), x=500, y=750, text_anchor='nw', font='narrow', fontsize=10) sim.AnimateText(text=lambda: q.print_info(as_str=True), x=500, y=340, text_anchor='nw', font='narrow', fontsize=10) env.animate(True) env.modelname('Demo queue animation') env.run()