FAS_reproduce/fas_instance.py

313 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import salabim as sim
from numpy.random import normal
from faults.wear_and_tear import WearAndTear
from faults.retry_delay import RetryDelay
class ItemGenerator(sim.Component):
def animation_objects(self, id):
if id == 'Item':
ao0 = sim.AnimateText(text=self.name(), textcolor='fg', text_anchor='nw')
return 0, 10, ao0
else:
ao0 = sim.AnimateRectangle((-20, 0, 20, 20),
text=self.name(), fillcolor=id, textcolor='white', arg=self)
return 45, 0, ao0
def process(self):
for i in range(30):
# 设定为 30 items
# Clock()
Item()
print("item" + str(i) + " generated——————————")
yield self.hold(sim.Uniform(1, 2).sample())
class Item(sim.Component):
def process(self):
self.enter(q)
yield self.request(crane)
Crane('Crane1', 30, logger)
yield self.request(Manual_Step)
ManualStep("MANUAL_INSPECTION", 37, logger).spawn()
yield self.request(conveyor)
Conveyor("CONVEYOR1", 30, logger).spawn()
yield self.request(Bowl_Feeder)
BowlFeeder("Bowl1", 30, logger)
ManualStep("MANUAL_ADD_COMPONENTS1", 21, logger)
Conveyor("CONVEYOR2", 30, logger)
# ————————添加retry delay故障——————————
# self.bowl2 = BowlFeeder("BOWL2", 10, logger).add_fault(RetryDelay(BowlFeeder("BOWL2", 10, logger)))
BowlFeeder("BOWL2", 10, logger)
ManualStep("MANUAL_ADD_COMPONENTS2", 34, logger)
Conveyor("CONVEYOR3", 30, logger)
Crane("CRANE_INPUT_SUBASSEMBLY_A", 10, logger)
ManualStep("MANUAL_COMBINE_SUBASSEMBLY_A", 34, logger)
Conveyor("CONVEYOR4", 30, logger)
Conveyor("CONVEYOR_INPUT_SUBASSEMBLY_B", 10, logger)
ManualStep("MANUAL_COMBINE_SUBASSEMBLY_B", 35, logger)
# ———————— 添加磨损故障 ————————————
# Conveyor("CONVEYOR5", 30, logger).add_fault(WearAndTear(Conveyor("CONVEYOR5", 30, logger)))
Conveyor("CONVEYOR5", 30, logger)
BowlFeeder("BOWL3", 5, logger)
Conveyor("CONVEYOR6", 10, logger)
ManualStep("MANUAL_ADD_COVER_AND_BOLTS", 76, logger)
Conveyor("CONVEYOR7", 30, logger)
ManualStep("MANUAL_TIGHTEN_BOLTS1", 28, logger)
Conveyor("CONVEYOR8", 30, logger)
Conveyor("CONVEYOR_INPUT_SUBASSEMBLY_C", 10, logger)
ManualStep("MANUAL_COMBINE_SUBASSEMBLY_C", 60, logger)
Conveyor("CONVEYOR9", 21, logger)
ManualStep("MANUAL_TIGHTEN_BOLTS2", 16, logger)
Conveyor("CONVEYOR10", 21, logger)
BowlFeeder("BOWL4", 5, logger)
ManualStep("MANUAL_ADD_COMPONENTS3", 11, logger)
Conveyor("CONVEYOR11", 21, logger)
ManualStep("MANUAL_TIGHTEN_BOLTS3", 32, logger)
self.leave(q)
# 添加leave语句时与上述步骤同时开始执行queue始终为0不增不添加leave语句只增不减
class BowlFeeder(sim.Component):
def __init__(self, name, duration, logger, debug=True):
super(BowlFeeder, self).__init__()
self.name = name
self.duration = duration
self.faults = []
self.logger = logger
self.debug = debug
def add_fault(self, fault):
self.faults.append(fault)
def process(self):
if self.debug:
sim.waiting = "waiting"
waiting.set()
if self.debug:
print(self.name + ": give")
giving.set()
giving.trigger(max=1)
print(self.name + ": given")
yield self.hold(delay(self.duration, 1))
self.logger.addMessage(self.name + " GIVEN")
self.release()
def spawn(self): # 尝试修改return的结果后调用spawn 未发现明显变化
self.process()
return self.passivate()
def get_events(self):
return [self.name + " GIVEN"]
class ManualStep(sim.Component):
def __init__(self, name, duration, logger, debug=True):
super(ManualStep, self).__init__()
self.debug = debug
self.duration = duration
self.queue = 0
self.logger = logger
self.name = name
def process(self):
if self.debug:
print(self.name + ": input")
self.queue = self.queue + 1
if (self.queue >= 5):
self.logger.addMessage(self.name + " QUEUE_ALARM")
if self.debug:
print(self.name + ": process")
running.set()
self.queue = self.queue - 1
if self.debug:
print(self.name + ": ok")
if self.debug:
print(self.name + ": wait")
waiting.set()
yield self.hold(delay(self.duration, 5))
self.logger.addMessage(self.name + " OK")
self.release()
def spawn(self):
return self.process()
def get_events(self):
return [self.name + " QUEUE_ALARM", self.name + " OK"]
class Crane(sim.Component):
def __init__(self, name, duration, logger, debug=True):
super(Crane, self).__init__()
self.debug = debug
self.duration = duration
self.queue = 0
self.logger = logger
self.name = name
def process(self):
if self.debug:
print(self.name + ": input")
# yield self.request(crane)
self.queue = self.queue + 1
if self.debug:
print(self.name + ": go_forward")
self.logger.addMessage(self.name + " FORWARD")
yield self.hold(delay(self.duration, 1))
self.release()
self.queue = self.queue - 1
running.set()
if self.debug:
print(self.name + ": wait")
waiting.set()
if self.debug:
print(self.name + ": item_taken")
running.set()
print(self.name + ": go_back")
self.logger.addMessage(self.name + " BACKWARD")
yield self.hold(delay(self.duration, 1))
print(self.name + ": stop")
self.logger.addMessage(self.name + " STOP")
self.passivate()
def spawn(self):
return self.process()
def get_events(self):
return [self.name + " FORWARD", self.name + " BACKWARD",
self.name + " STOP"]
class Conveyor(sim.Component):
def __init__(self, name, duration, logger, debug= True):
super(Conveyor, self).__init__()
self.name = name
self.duration = duration
self.faults = []
self.logger = logger
self.debug = debug
def add_fault(self, fault):
self.faults.append(fault)
def process(self):
if self.debug:
waiting.set()
print(self.name + ": input")
self.logger.addMessage(self.name + " CONVEYOR_GATE")
giving.set()
giving.trigger(max=1)
if self.debug:
print(self.name + ": to_next_step")
# self.item = waitingline.pop()
# self.state = "giving"
yield delay(self.duration, 1)
for fault in self.faults:
yield fault.spawn()
def spawn(self): # 没有找到合适的函数替代原本spawn函数
return self.activate(process='process')
def get_events(self):
return [self.name + " CONVEYOR_GATE"]
class Clock():# 未调用
def __init__(self, env, logger, debug=True):
self.debug = debug
self.env= env
self.logger = logger
def process(self):
print("Tick")
self.logger.addMessage("TICK")
yield 10
def spawn(self):
return self.process()
def get_events(self):
return ["TICK"]
def delay(duration, percentage_variation):
stdev = percentage_variation / 100.0 * duration
random_additive_noise = normal(0, stdev)
return max(0, int(duration + random_additive_noise))
class Logger:
""" This class logs all the messages. """
def __init__(self,env):
self.env = env
self.loglines = []
def addMessage(self, type, metadata = ""):
logline = LogLine(self.env.now(), type, metadata)
self.loglines.append(logline)
def indentLine(self, lineStr):
return " " + lineStr
def indent(self, linesStr):
lines = linesStr.split("\n")
return "\n".join(map(self.indentLine, lines))
def getLoglines(self):
return "[\n" + (",\n".join(map(self.indent, map(str, self.loglines))) + "\n]\n")
class LogLine:
def __init__(self, timestamp, msgtype, metadata):
self.timestamp = timestamp
self.msgtype = msgtype
self.metadata = metadata
def __str__(self):
return "{\n \"timestamp\": " + str(self.timestamp) + ",\n \"type\": \"" + self.msgtype + "\"\n}"
env = sim.Environment()
# env = sim.Environment(trace= True)
giving = sim.State("giving")
waiting = sim.State("waiting")
running = sim.State("running")
logger = Logger(env)
ItemGenerator()
# clock = Clock(env, logger)
# clock.spawn() # clock函数不显示
q = sim.Queue('queue')
env.background_color('20%gray')
Bowl_Feeder = sim.Resource("Bowl_Feeder", 1)
conveyor = sim.Resource("conveyor", 1)
crane = sim.Resource("crane", 1)
Manual_Step= sim.Resource("Manual_Step",1)
# 直接运行并存储 ——————————————————————————————————————————————
# env.run()
# print()
# q.print_statistics()
# f = open("output.json", "w")
# f.write(logger.getLoglines())
# f.close()
# 可视化 ————————————————————————————————————————————————————
qa0 = sim.AnimateQueue(q, x=100, y=250, title='queue, normal', direction='e', id='blue')
qa1 = sim.AnimateQueue(q, x=100, y=350, title='queue, maximum 6 components', direction='e', max_length=6, id='red')
qa2 = sim.AnimateQueue(q, x=100, y=150, title='queue, reversed', direction='e', reverse=True, id='green')
sim.AnimateMonitor(q.length, x=10, y=450, width=480, height=100, horizontal_scale=3, vertical_scale=3)
sim.AnimateMonitor(q.length_of_stay, x=10, y=570, width=480, height=100, horizontal_scale=5, vertical_scale=5)
sim.AnimateText(text=lambda: q.length.print_histogram(as_str=True), x=500, y=750,
text_anchor='nw', font='narrow', fontsize=10)
sim.AnimateText(text=lambda: q.print_info(as_str=True), x=500, y=340,
text_anchor='nw', font='narrow', fontsize=10)
env.animate(True)
env.modelname('Demo queue animation')
env.run()