- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我无法记录到使用 multprocess.Pool.apply_async 的单个文件。我正在努力适应this Logging Cookbook 中的示例,但它仅适用于 multiprocessing.Process
。将日志队列传递到 apply_async
似乎没有效果。我想使用一个池,以便我可以轻松管理并发线程的数量。
下面的 multiprocessing.Process 改编示例对我来说效果很好,只是我没有从主进程获取日志消息,并且我认为当我有 100 个大型作业时它不会很好地工作。
import logging
import logging.handlers
import numpy as np
import time
import multiprocessing
import pandas as pd
log_file = 'PATH_TO_FILE/log_file.log'
def listener_configurer():
root = logging.getLogger()
h = logging.FileHandler(log_file)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_function(sleep_time, name, queue, configurer):
configurer(queue)
start_message = 'Worker {} started and will now sleep for {}s'.format(name, sleep_time)
logging.info(start_message)
time.sleep(sleep_time)
success_message = 'Worker {} has finished sleeping for {}s'.format(name, sleep_time)
logging.info(success_message)
def main_with_process():
start_time = time.time()
single_thread_time = 0.
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
name = str(i)
sleep_time = np.random.randint(10) / 2
single_thread_time += sleep_time
worker = multiprocessing.Process(target=worker_function,
args=(sleep_time, name, queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
end_time = time.time()
final_message = "Script execution time was {}s, but single-thread time was {}s".format(
(end_time - start_time),
single_thread_time
)
print(final_message)
if __name__ == "__main__":
main_with_process()
但是我无法让以下适应工作:
def main_with_pool():
start_time = time.time()
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
pool = multiprocessing.Pool(processes=3)
job_list = [np.random.randint(10) / 2 for i in range(10)]
single_thread_time = np.sum(job_list)
for i, sleep_time in enumerate(job_list):
name = str(i)
pool.apply_async(worker_function,
args=(sleep_time, name, queue, worker_configurer))
queue.put_nowait(None)
listener.join()
end_time = time.time()
print("Script execution time was {}s, but single-thread time was {}s".format(
(end_time - start_time),
single_thread_time
))
if __name__ == "__main__":
main_with_pool()
我使用 multiprocessing.Manager、multiprocessing.Queue、multiprocessing.get_logger、apply_async.get() 尝试了许多细微的变化,但没有任何效果。
我认为对此会有一个现成的解决方案。我应该尝试 celery 吗?
谢谢
最佳答案
这里实际上有两个独立的问题,它们相互交织:
multiprocessing.Queue()
对象作为参数传递给基于池的函数(您可以将其传递给直接启动的工作线程,但不能传递给任何“进一步的”,因为它是)。None
发送到监听器进程。要修复第一个,请替换:
queue = multiprocessing.Queue(-1)
与:
queue = multiprocessing.Manager().Queue(-1)
作为管理器管理的Queue()
实例可以被传递。
要解决第二个问题,可以收集每个异步调用的每个结果,或者关闭池并等待它,例如:
pool.close()
pool.join()
queue.put_nowait(None)
或更复杂的:
getters = []
for i, sleep_time in enumerate(job_list):
name = str(i)
getters.append(
pool.apply_async(worker_function,
args=(sleep_time, name, queue, worker_configurer))
)
while len(getters):
getters.pop().get()
# optionally, close and join pool here (generally a good idea anyway)
queue.put_nowait(None)
(您还应该考虑将 put_nowait
替换为 put
的等待版本,而不是使用无限长度的队列。)
关于python - 如何使用 multiprocessing.Pool.apply_async 记录到单个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48045978/
我有一个网站。 必须登录才能看到里面的内容。 但是,我使用此代码登录。 doc = Jsoup.connect("http://46.137.207.181/Account/Login.aspx")
我正在尝试为我的域创建一个 SPF 记录并使我的邮件服务器能够对其进行评估。我在邮件服务器上使用 Postfix 并使用 policyd-spf (Python) 来评估记录。目前,我通过我的私有(p
我需要为负载平衡的 AWS 站点 mywebsite.com 添加 CName 记录。记录应该是: @ CNAME mywebsite.us-east-1.elb.amazon
我目前正在开发一个相当大的多层应用程序,该应用程序将部署在海外。虽然我希望它在解聚后不会折叠或爆炸,但我不能 100% 确定这一点。因此,如果我知道我可以请求日志文件,以准确找出问题所在以及原因,那就
我使用以下命令从我的网络摄像头录制音频和视频 gst-launch-0.10 v4l2src ! video/x-raw-yuv,width=640,height=480,framerate=30/1
我刚刚开始使用 ffmpeg 将视频分割成图像。我想知道是否可以将控制台输出信息保存到日志文件中。我试过“-v 10”参数,也试过“-loglevel”参数。我在另一个 SO 帖子上看到使用 ffmp
我想针对两个日期查询我的表并检索其中的记录。 我这样声明我的变量; DECLARE @StartDate datetime; DECLARE @EndDate datetime; 并像这样设置我的变量
在 javascript 中,我可以使用简单的 for 循环访问对象的每个属性,如下所示 var myObj = {x:1, y:2}; var i, sum=0; for(i in myObj) s
最近加入了一个需要处理大量代码的项目,我想开始记录和可视化调用图的一些流程,让我更好地理解一切是如何组合在一起的。这是我希望在我的理想工具中看到的: 每个节点都是一个函数/方法 如果一个函数可以调用另
如何使用反射在F#中创建记录类型?谢谢 最佳答案 您可以使用 FSharpValue.MakeRecord [MSDN]创建一个记录实例,但是我认为F#中没有任何定义记录类型的东西。但是,记录会编译为
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 3年前关闭。 Improve thi
我是 Sequelize 的新手并且遇到了一些语法问题。我制作了以下模型: // User sequelize.define('user', { name: { type: DataTyp
${student.name} Notify 这是我的output.jsp。请注意,我已经放置了一个链接“Notify”以将其转发到 display.jsp 上。但我不确定如何将 Stud
例如,这是我要做的查询: server:"xxx.xxx.com" AND request_url:"/xxx/xxx/xxx" AND http_X_Forwarded_Proto:(https O
我一直在开发大量 Java、PHP 和 Python。所有这些都提供了很棒的日志记录包(分别是 Log4J、Log 或logging)。这在调试应用程序时有很大帮助。特别是当应用程序 headless
在我的Grails应用程序中,我异步运行一些批处理过程,并希望该过程记录各种状态消息,以便管理员以后可以检查它们。 我考虑过将log4j JDBC附加程序用作最简单的解决方案,但是据我所知,它不使用D
我想将进入 MQ 队列的消息记录到数据库/文件或其他日志队列,并且我无法修改现有代码。是否有任何方法可以实现某种类似于 HTTP 嗅探器的消息记录实用程序?或者也许 MQ 有一些内置的功能来记录消息?
如果我有一条包含通用字段的记录,在更改通用字段时是否有任何方法可以模仿方便的 with 语法? 即如果我有 type User = // 'photo can be Bitmap or Url {
假设我有一个名为 Car 的自定义对象。其中的所有字段都是私有(private)的。 public class Car { private String mName; private
当记录具有特定字段时,我需要返回 true 的函数,反之亦然。示例: -record(robot, {name, type=industrial, ho
我是一名优秀的程序员,十分优秀!