FAS_reproduce/fas_instance.py

313 lines
10 KiB
Python
Raw Permalink Normal View History

2023-01-30 14:26:29 +08:00
import salabim as sim
from numpy.random import normal
from faults.wear_and_tear import WearAndTear
from faults.retry_delay import RetryDelay
class ItemGenerator(sim.Component):
def animation_objects(self, id):
if id == 'Item':
ao0 = sim.AnimateText(text=self.name(), textcolor='fg', text_anchor='nw')
return 0, 10, ao0
else:
ao0 = sim.AnimateRectangle((-20, 0, 20, 20),
text=self.name(), fillcolor=id, textcolor='white', arg=self)
return 45, 0, ao0
def process(self):
for i in range(30):
# 设定为 30 items
# Clock()
Item()
print("item" + str(i) + " generated——————————")
yield self.hold(sim.Uniform(1, 2).sample())
class Item(sim.Component):
def process(self):
self.enter(q)
yield self.request(crane)
Crane('Crane1', 30, logger)
yield self.request(Manual_Step)
ManualStep("MANUAL_INSPECTION", 37, logger).spawn()
yield self.request(conveyor)
Conveyor("CONVEYOR1", 30, logger).spawn()
yield self.request(Bowl_Feeder)
BowlFeeder("Bowl1", 30, logger)
ManualStep("MANUAL_ADD_COMPONENTS1", 21, logger)
Conveyor("CONVEYOR2", 30, logger)
# ————————添加retry delay故障——————————
# self.bowl2 = BowlFeeder("BOWL2", 10, logger).add_fault(RetryDelay(BowlFeeder("BOWL2", 10, logger)))
BowlFeeder("BOWL2", 10, logger)
ManualStep("MANUAL_ADD_COMPONENTS2", 34, logger)
Conveyor("CONVEYOR3", 30, logger)
Crane("CRANE_INPUT_SUBASSEMBLY_A", 10, logger)
ManualStep("MANUAL_COMBINE_SUBASSEMBLY_A", 34, logger)
Conveyor("CONVEYOR4", 30, logger)
Conveyor("CONVEYOR_INPUT_SUBASSEMBLY_B", 10, logger)
ManualStep("MANUAL_COMBINE_SUBASSEMBLY_B", 35, logger)
# ———————— 添加磨损故障 ————————————
# Conveyor("CONVEYOR5", 30, logger).add_fault(WearAndTear(Conveyor("CONVEYOR5", 30, logger)))
Conveyor("CONVEYOR5", 30, logger)
BowlFeeder("BOWL3", 5, logger)
Conveyor("CONVEYOR6", 10, logger)
ManualStep("MANUAL_ADD_COVER_AND_BOLTS", 76, logger)
Conveyor("CONVEYOR7", 30, logger)
ManualStep("MANUAL_TIGHTEN_BOLTS1", 28, logger)
Conveyor("CONVEYOR8", 30, logger)
Conveyor("CONVEYOR_INPUT_SUBASSEMBLY_C", 10, logger)
ManualStep("MANUAL_COMBINE_SUBASSEMBLY_C", 60, logger)
Conveyor("CONVEYOR9", 21, logger)
ManualStep("MANUAL_TIGHTEN_BOLTS2", 16, logger)
Conveyor("CONVEYOR10", 21, logger)
BowlFeeder("BOWL4", 5, logger)
ManualStep("MANUAL_ADD_COMPONENTS3", 11, logger)
Conveyor("CONVEYOR11", 21, logger)
ManualStep("MANUAL_TIGHTEN_BOLTS3", 32, logger)
self.leave(q)
# 添加leave语句时与上述步骤同时开始执行queue始终为0不增不添加leave语句只增不减
class BowlFeeder(sim.Component):
def __init__(self, name, duration, logger, debug=True):
super(BowlFeeder, self).__init__()
self.name = name
self.duration = duration
self.faults = []
self.logger = logger
self.debug = debug
def add_fault(self, fault):
self.faults.append(fault)
def process(self):
if self.debug:
sim.waiting = "waiting"
waiting.set()
if self.debug:
print(self.name + ": give")
giving.set()
giving.trigger(max=1)
print(self.name + ": given")
yield self.hold(delay(self.duration, 1))
self.logger.addMessage(self.name + " GIVEN")
self.release()
def spawn(self): # 尝试修改return的结果后调用spawn 未发现明显变化
self.process()
return self.passivate()
def get_events(self):
return [self.name + " GIVEN"]
class ManualStep(sim.Component):
def __init__(self, name, duration, logger, debug=True):
super(ManualStep, self).__init__()
self.debug = debug
self.duration = duration
self.queue = 0
self.logger = logger
self.name = name
def process(self):
if self.debug:
print(self.name + ": input")
self.queue = self.queue + 1
if (self.queue >= 5):
self.logger.addMessage(self.name + " QUEUE_ALARM")
if self.debug:
print(self.name + ": process")
running.set()
self.queue = self.queue - 1
if self.debug:
print(self.name + ": ok")
if self.debug:
print(self.name + ": wait")
waiting.set()
yield self.hold(delay(self.duration, 5))
self.logger.addMessage(self.name + " OK")
self.release()
def spawn(self):
return self.process()
def get_events(self):
return [self.name + " QUEUE_ALARM", self.name + " OK"]
class Crane(sim.Component):
def __init__(self, name, duration, logger, debug=True):
super(Crane, self).__init__()
self.debug = debug
self.duration = duration
self.queue = 0
self.logger = logger
self.name = name
def process(self):
if self.debug:
print(self.name + ": input")
# yield self.request(crane)
self.queue = self.queue + 1
if self.debug:
print(self.name + ": go_forward")
self.logger.addMessage(self.name + " FORWARD")
yield self.hold(delay(self.duration, 1))
self.release()
self.queue = self.queue - 1
running.set()
if self.debug:
print(self.name + ": wait")
waiting.set()
if self.debug:
print(self.name + ": item_taken")
running.set()
print(self.name + ": go_back")
self.logger.addMessage(self.name + " BACKWARD")
yield self.hold(delay(self.duration, 1))
print(self.name + ": stop")
self.logger.addMessage(self.name + " STOP")
self.passivate()
def spawn(self):
return self.process()
def get_events(self):
return [self.name + " FORWARD", self.name + " BACKWARD",
self.name + " STOP"]
class Conveyor(sim.Component):
def __init__(self, name, duration, logger, debug= True):
super(Conveyor, self).__init__()
self.name = name
self.duration = duration
self.faults = []
self.logger = logger
self.debug = debug
def add_fault(self, fault):
self.faults.append(fault)
def process(self):
if self.debug:
waiting.set()
print(self.name + ": input")
self.logger.addMessage(self.name + " CONVEYOR_GATE")
giving.set()
giving.trigger(max=1)
if self.debug:
print(self.name + ": to_next_step")
# self.item = waitingline.pop()
# self.state = "giving"
yield delay(self.duration, 1)
for fault in self.faults:
yield fault.spawn()
def spawn(self): # 没有找到合适的函数替代原本spawn函数
return self.activate(process='process')
def get_events(self):
return [self.name + " CONVEYOR_GATE"]
class Clock():# 未调用
def __init__(self, env, logger, debug=True):
self.debug = debug
self.env= env
self.logger = logger
def process(self):
print("Tick")
self.logger.addMessage("TICK")
yield 10
def spawn(self):
return self.process()
def get_events(self):
return ["TICK"]
def delay(duration, percentage_variation):
stdev = percentage_variation / 100.0 * duration
random_additive_noise = normal(0, stdev)
return max(0, int(duration + random_additive_noise))
class Logger:
""" This class logs all the messages. """
def __init__(self,env):
self.env = env
self.loglines = []
def addMessage(self, type, metadata = ""):
logline = LogLine(self.env.now(), type, metadata)
self.loglines.append(logline)
def indentLine(self, lineStr):
return " " + lineStr
def indent(self, linesStr):
lines = linesStr.split("\n")
return "\n".join(map(self.indentLine, lines))
def getLoglines(self):
return "[\n" + (",\n".join(map(self.indent, map(str, self.loglines))) + "\n]\n")
class LogLine:
def __init__(self, timestamp, msgtype, metadata):
self.timestamp = timestamp
self.msgtype = msgtype
self.metadata = metadata
def __str__(self):
return "{\n \"timestamp\": " + str(self.timestamp) + ",\n \"type\": \"" + self.msgtype + "\"\n}"
env = sim.Environment()
# env = sim.Environment(trace= True)
giving = sim.State("giving")
waiting = sim.State("waiting")
running = sim.State("running")
logger = Logger(env)
ItemGenerator()
# clock = Clock(env, logger)
# clock.spawn() # clock函数不显示
q = sim.Queue('queue')
env.background_color('20%gray')
Bowl_Feeder = sim.Resource("Bowl_Feeder", 1)
conveyor = sim.Resource("conveyor", 1)
crane = sim.Resource("crane", 1)
Manual_Step= sim.Resource("Manual_Step",1)
# 直接运行并存储 ——————————————————————————————————————————————
# env.run()
# print()
# q.print_statistics()
# f = open("output.json", "w")
# f.write(logger.getLoglines())
# f.close()
# 可视化 ————————————————————————————————————————————————————
qa0 = sim.AnimateQueue(q, x=100, y=250, title='queue, normal', direction='e', id='blue')
qa1 = sim.AnimateQueue(q, x=100, y=350, title='queue, maximum 6 components', direction='e', max_length=6, id='red')
qa2 = sim.AnimateQueue(q, x=100, y=150, title='queue, reversed', direction='e', reverse=True, id='green')
sim.AnimateMonitor(q.length, x=10, y=450, width=480, height=100, horizontal_scale=3, vertical_scale=3)
sim.AnimateMonitor(q.length_of_stay, x=10, y=570, width=480, height=100, horizontal_scale=5, vertical_scale=5)
sim.AnimateText(text=lambda: q.length.print_histogram(as_str=True), x=500, y=750,
text_anchor='nw', font='narrow', fontsize=10)
sim.AnimateText(text=lambda: q.print_info(as_str=True), x=500, y=340,
text_anchor='nw', font='narrow', fontsize=10)
env.animate(True)
env.modelname('Demo queue animation')
env.run()