gpt4 book ai didi

python - 蒙特卡罗模拟 : Underlying distribution parameter changes during simulation run based on conditions

转载 作者:行者123 更新时间:2023-12-05 05:32:45 27 4
gpt4 key购买 nike

我正在尝试模拟两条生产线(A 线和 B 线)的运行。它们的故障时间遵循 Weibull 分布(形状 = 0.7,比例 = 12(分钟))。 A 线和 B 线均以 100 件产品/分钟的速度生产。从现实世界的场景来看,如果任何一条线出现故障,我想提高其他线的速率(比如 - 120 产品/分钟)直到故障线没有修复的时间。

挑战:随着速率的增加,失败的机会增加,因此,尺度参数发生变化(例如,对于 120 产品/分钟的速率,尺度参数从 12 分钟变为 10 分钟)。我想在仿真中对这种分布参数的变化进行建模。

示例:

  • A 线和 B 线在时间 0 开始运行。
  • 使用威 bool 分布为其故障时间 (TTF) 生成随机数 - A 行:15 分钟,B 行:20 分钟。
  • A 线在 15 分钟时出现故障,并在 20 分钟时得到修复。在这 5 分钟内,B 线加速运行。
  • 在 t= 15 分钟时,使用 weibull(更改参数)生成一个随机数以获得下一个 TTF。重复该过程直到模拟时间。

目前我正在使用 Python Simpy 编写逻辑代码,但找不到对此建模的方法。任何帮助或引用都会非常有帮助。这是我的尝试,但我肯定遗漏了一些东西。

import simpy
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import weibull_min


class Machine(object):

def __init__(self, env, name,scale_parameter,shape_parameter, mean_repair_time,std_repair_time,increased_rate):
self.env = env
self.name = name
self.scale_parameter = scale_parameter
self.shape_parameter = shape_parameter
self.mean_repair_time = mean_repair_time
self.std_repair_time = std_repair_time
self.increased_rate = increased_rate
self.broken = False
self.processing_list = [0,]
self.machine_status = 1


self.process = env.process(self.run())
# Start the failure process
env.process(self.check_machines_status())
def run(self):
"""
Run as long as the simulation runs.
"""
while True:
try:

yield self.env.timeout(self.mean_time_to_failure())
self.processing_list.append(self.env.now)
print(f'{self.env.now:.2f} {self.name} is in failure.')
trial_resource.get(1)
yield self.env.timeout(self.mean_time_to_repair())
print(f'{self.env.now:.2f} {self.name} is repaired.')
self.processing_list.append(env.now)
trial_resource.put(1)



except simpy.Interrupt:
self.machine_status = 0
yield self.env.timeout(self.updated_mean_time_to_failure())
print(f'{self.env.now:.2f} {self.name} is in updated failure.')
#trial_resource.get(1)
self.broken = True
yield self.env.timeout(self.mean_time_to_repair())
print(f'{self.env.now:.2f} {self.name} is in updated repaired.')
trial_resource.put(1)
self.machine_status =1


def check_machines_status(self):
"""Periodically check the status of running machines. If any machine fails
interrupt the process"""
while True:
print(self.env.now,trial_resource.level)
print(self.name)

if trial_resource.level < trial_resource.capacity and self.broken == False and self.machine_status == 1:
# Only break the machine if it is currently working.
self.process.interrupt()
print('Machine running process interrupted %d' % env.now)

yield env.timeout(1)


def mean_time_to_failure(self):
x = int(weibull_min.rvs(self.shape_parameter, loc=0, scale= self.scale_parameter, size=1).tolist()[0])
if x == 0:
x = 1

return x

def updated_mean_time_to_failure(self):
correction_factor = (1-self.increased_rate)/100
x = int(weibull_min.rvs(self.shape_parameter*correction_factor, loc=0, scale= self.scale_parameter, size=1).tolist()[0])
if x == 0:
x = 1

return x


def mean_time_to_repair(self):
x = int(np.random.lognormal(self.mean_repair_time,self.std_repair_time))
if x ==0:
x =1
return x


env = simpy.Environment()
trial_resource = simpy.Container(env,init=3,capacity=3)
machine_1 = Machine(env, 'M1', 12, 0.65, 0.51,1,10)
machine_2 = Machine(env, 'M2', 14, 0.65, 0.51,1,10)
machine_3 = Machine(env, 'M3', 8, 0.65, 0.51,1,10)

env.run(until = 12)
print(machine_1.processing_list)
print(machine_2.processing_list)
print(machine_3.processing_list)

最佳答案

此解决方案可以处理超过 2 台机器。当一台机器发生故障时,它会向所有其他机器发送一条消息,通知它们发生故障。机器在修复时也会发送通知。每台机器都会跟踪有多少台机器坏了,如果有一台或多台机器坏了,就会加快生产速度。失败前时间 (ttf) 也加快了。当速率发生变化时,当前正在进行的部分的进度将被纳入计算新完成时间的因素。中断既用于触发故障,也用于在正在进行的工作完成时重置。另请查看文档中的机械车间示例。我没时间做这件事,所以它可能并不完美

"""
Machine shop example where machines change their
production rate and break down rates in response
to other machines breaking down or getting fixed

If one machine breaks down, the remaining machines
will speed up until all machines are fixed

speeding up the production rate also speeds up the
time till failure ttf

rates can change in the middle of making a part

Programmmer: Michael R. Gibbs
"""

import simpy
import random

class Machine():
"""
Machine that makes parts.
Machine breaks down at ramdom time
The machine has a normal mode, and a fast mode

Machine speeds up when it receives a message that
another machine has broken down and slows back down
when it receives messages that all broken machines are fixed

env: simulation environment
id: name of the machine
rate: number of parts per time unit (normal mode)
fast_rate: number of parts per time unit (fast mode)
ttf_gen: zero param func to generate ttf (normal mode)
fast_ttf_gen: zero param func to generate ttf (fast mode)
"""

def __init__(self, env, id, rate, fast_rate, ttf_gen, fast_ttf_gen):

self.id = id
self.env = env
self.rate = rate
self.fast_rate = fast_rate
self.ttf_gen = ttf_gen
self.fast_ttf_gen = fast_ttf_gen
self.broken = False
self.fast_mode = False
self.remaining_time = 0
self.current_rate = self.rate

self.mach_list = []
self.broke_cnt = 0

# start the part making, an the count down till failure
self.make_parts_proc = self.env.process(self.make_parts())
self.breakdown_proc = self.env.process(self.gen_breakdown())

def make_parts(self):
"""
Main loop to manufacture parts

interupts are used to trigger updates
when rates change and when breakdowns occure
"""

while True:

if self.remaining_time <= 0:
# starting a new part
print(f'{self.env.now:.2f}: mach: {self.id} has started a part')

# need to store times so other methods can upate
# the procesing state
self.total_part_time = 1 / self.current_rate
self.remaining_time = self.total_part_time

while self.remaining_time > 0:
# need to loop incase get inturrupted
# while in the middle of making a part

try:
self.part_start_time = self.env.now # used to track progress
yield self.env.timeout(self.remaining_time)

# got here without being interupted, done with part
self.remaining_time = 0

except simpy.Interrupt:
# can get inturpted because processing rate changed
# or a breakdown has happend
# if rate changed, we are using the inturput to update
# the processing timeout

if self.broken:
# processing is interuped, fix machine

# update processing progress
self.remaining_time -= (self.env.now - self.part_start_time)

print(f'{self.env.now:.2f}: mach: {self.id} has broken down')

# notify other machines that this machine has broke
for m in self.mach_list:
m.someone_broke(self)

# time out for fixing
yield self.env.timeout(5)

# notify other machines that this machine is fixed
for m in self.mach_list:
m.someone_fixed(self)

print(f'{self.env.now:.2f}: mach: {self.id} has been fixed')

# start a new breakdown count down
self.breakdown_proc = self.env.process(self.gen_breakdown())
self.broken = False

print(f'{self.env.now:.2f}: mach: {self.id} has finish a part')

def gen_breakdown(self):
"""
counts down to failure and uses
a interrupt to stop processing
and start repairs

using class properties instead of local
variables so other methods can update
the countdown state of ttf
"""

if not self.broken:
# get breakdown based on current fast mode
if self.fast_mode:
self.total_ttf = self.fast_ttf_gen()
else:
self.total_ttf = self.ttf_gen()

self.remaining_ttf = self.total_ttf

while self.remaining_ttf > 0:
self.ttf_start = self.env.now
print(f'{self.env.now:.2f}: mach: {self.id} has {self.remaining_ttf} till breakdown')
try:
yield self.env.timeout(self.remaining_ttf)

# failure has orrured
self.broken = True
self.make_parts_proc.interrupt()
self.remaining_ttf = 0

except simpy.Interrupt:
# the state has been updated
# the timeout has been interupted
# so it can be restarted with new state
print(f'{self.env.now:.2f}: mach: {self.id} updating ttf {self.remaining_ttf}')
print(f'{self.env.now:.2f}: mach: {self.id} ttf gen exit')

def someone_broke(self, mach):
"""
Another machine is notifing this machine that it has broken
and this machine needs to change to fast mode, if not already
in fast mode
"""

self.broke_cnt += 1

print(f'{self.env.now:.2f}: mach: {self.id} received mess that mach: {mach.id} has broke, broke cnt: {self.broke_cnt}')

if not self.fast_mode:
self.fast_mode = True

# update the ttf based on the fast mode ttf func
# keep the same progress so if we were 90% of of the
# old ttf, then set things so we are still 90% of new ttf

# update with the last bit of progress
self.remaining_ttf -= self.env.now - self.ttf_start
per_ttf_left = (self.remaining_ttf/self.total_ttf)

# update based on fast ttf
self.total_ttf = self.fast_ttf_gen()
self.remaining_ttf = per_ttf_left * self.total_ttf
if self.remaining_ttf <= 0:
# special case when notification happens at same time as breakdown
self.remaining_ttf = 0.001

# update the part processing state

# update the last bit of processing progress
self.remaining_time -= self.env.now - self.part_start_time

# update state based on new processing fast rate
# like ttf keep the same perenct of progress
self.current_rate = self.fast_rate
old_total = self.total_part_time
self.total_part_time = 1 / self.current_rate
per_left = self.remaining_time / old_total
self.remaining_time = self.total_part_time * per_left

if not self.broken:
# if broken nothing to interrupt
# new states will used when machine
# is fixed and processing starts up again
self.breakdown_proc.interrupt()
self.make_parts_proc.interrupt()



def someone_fixed(self, mach):
"""
Another machine is notifing this machine that it has been fixed
and this machine needs to change to normal mode, if there are
no more broken machines
"""

self.broke_cnt -= 1

print(f'{self.env.now:.2f}: mach: {self.id} received mess that mach: {mach.id} is fixed, broke cnt: {self.broke_cnt}')

# only change if all machines are fixed and broke cnt is 0
if self.broke_cnt <= 0:
self.broke_cnt = 0

if self.fast_mode:
self.fast_mode = False

# update the ttf based on the normal mode ttf func
# keep the same progress so if we were 90% of of the
# old ttf, then set things so we are still 90% of new ttf

# update with the last bit of progress
self.remaining_ttf -= self.env.now - self.ttf_start
per_ttf_left = (self.remaining_ttf/self.total_ttf)

self.total_ttf = self.ttf_gen()
self.remaining_ttf = per_ttf_left * self.total_ttf

if self.remaining_ttf <= 0:
# special case when notifcation happens at breakdown time
self.remaining_ttf = 0.001

# update state based on new processing normal rate
# like ttf keep the same perenct of progress
self.remaining_time -= self.env.now - self.part_start_time
self.current_rate = self.rate
old_total = self.total_part_time
self.total_part_time = 1 / self.current_rate
per_left = self.remaining_time / old_total
self.remaining_time = self.total_part_time * per_left

if not self.broken:
# if broken nothing to interrupt
# new states will be used when machine
# is fixed and processing starts up again
self.breakdown_proc.interrupt()
self.make_parts_proc.interrupt()



def set_mach_list(self, mach_list):
"""
set the list of machines to be notified if this machine
breaks down, or is fixed
"""
self.mach_list = mach_list

# ttf generator function
#
# by wrapping a dist in a lambda I
# created create a 0 param function
# that I can pass to the Machine class
# To change the dist I just need to
# update the lambda, no hard codeing
# of dist func parameters in the Machine
# class code
ttf = lambda : random.randint(8,10)
fast_ttf = lambda: random.randint(5,7)

# create sim
env = simpy.Environment()

mach_list = []
machines_cnt = 2 # can be more then 2
for i in range(1, machines_cnt + 1):
m = Machine(env, i, 5, 8, ttf, fast_ttf)
mach_list.append(m)

# build list of machines to notify
# when a machine breaks or gets fixed
for m in mach_list:
# filter out the current machine
# don't want to send to self
other_m = mach_list.copy()
other_m.remove(m)

m.set_mach_list(other_m)

env.run(until = 50)

print("end of simulation")

关于python - 蒙特卡罗模拟 : Underlying distribution parameter changes during simulation run based on conditions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74010378/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com