- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
TL;DR:使用线程、多处理和单线程运行代码后得到不同的结果。需要故障排除指导。
您好,如果这可能有点过于笼统,我提前表示歉意,但我需要一些帮助来解决问题,并且我不确定如何最好地继续。
故事是这样的;我有一堆数据索引到 Solr 集合(约 2.5 亿项)中,该集合中的所有项都有一个 sessionid。有些项目可以共享相同的 session ID。我正在梳理该集合,以提取具有相同 session 的所有项目,对数据进行一些处理,然后生成另一个 JSON 文件以供稍后索引。
该代码有两个主要功能:proc_day - 接受一天并处理当天的所有 session 和proc_session - 完成单个 session 需要发生的所有事情。
多处理是在proc_day上实现的,因此每一天都会由一个单独的进程处理,proc_session函数可以用线程运行。下面是我用于线程/多重处理的代码。它接受一个函数、一个参数列表和线程/多进程的数量。然后它将根据输入参数创建一个队列,然后创建进程/线程并让它们通过它。我没有发布实际的代码,因为它通常在单线程下运行良好,没有任何问题,但如果需要,可以发布它。
autoprocs.py
import sys
import logging
from multiprocessing import Process, Queue,JoinableQueue
import time
import multiprocessing
import os
def proc_proc(func,data,threads,delay=10):
if threads < 0:
return
q = JoinableQueue()
procs = []
for i in range(threads):
thread = Process(target=proc_exec,args=(func,q))
thread.daemon = True;
thread.start()
procs.append(thread)
for item in data:
q.put(item)
logging.debug(str(os.getpid()) + ' *** Processes started and data loaded into queue waiting')
s = q.qsize()
while s > 0:
logging.info(str(os.getpid()) + " - Proc Queue Size is:" + str(s))
s = q.qsize()
time.sleep(delay)
for p in procs:
logging.debug(str(os.getpid()) + " - Joining Process {}".format(p))
p.join(1)
logging.debug(str(os.getpid()) + ' - *** Main Proc waiting')
q.join()
logging.debug(str(os.getpid()) + ' - *** Done')
def proc_exec(func,q):
p = multiprocessing.current_process()
logging.debug(str(os.getpid()) + ' - Starting:{},{}'.format(p.name, p.pid))
while True:
d = q.get()
try:
logging.debug(str(os.getpid()) + " - Starting to Process {}".format(d))
func(d)
sys.stdout.flush()
logging.debug(str(os.getpid()) + " - Marking Task as Done")
q.task_done()
except:
logging.error(str(os.getpid()) + " - Exception in subprocess execution")
logging.error(sys.exc_info()[0])
logging.debug(str(os.getpid()) + 'Ending:{},{}'.format(p.name, p.pid))
自动线程.py:
import threading
import logging
import time
from queue import Queue
def thread_proc(func,data,threads):
if threads < 0:
return "Thead Count not specified"
q = Queue()
for i in range(threads):
thread = threading.Thread(target=thread_exec,args=(func,q))
thread.daemon = True
thread.start()
for item in data:
q.put(item)
logging.debug('*** Main thread waiting')
s = q.qsize()
while s > 0:
logging.debug("Queue Size is:" + str(s))
s = q.qsize()
time.sleep(1)
logging.debug('*** Main thread waiting')
q.join()
logging.debug('*** Done')
def thread_exec(func,q):
while True:
d = q.get()
#logging.debug("Working...")
try:
func(d)
except:
pass
q.task_done()
在不同的多处理/线程配置下运行 python 后,我在验证数据时遇到了问题。有很多数据,所以我真的需要让多重处理工作。这是我昨天的测试结果。
Only with multiprocessing - 10 procs:
Days Processed 30
Sessions Found 3,507,475
Sessions Processed 3,514,496
Files 162,140
Data Output: 1.9G
multiprocessing and multithreading - 10 procs 10 threads
Days Processed 30
Sessions Found 3,356,362
Sessions Processed 3,272,402
Files 424,005
Data Output: 2.2GB
just threading - 10 threads
Days Processed 31
Sessions Found 3,595,263
Sessions Processed 3,595,263
Files 733,664
Data Output: 3.3GB
Single process/ no threading
Days Processed 31
Sessions Found 3,595,263
Sessions Processed 3,595,263
Files 162,190
Data Output: 1.9GB
这些计数是通过 grep 和日志文件中的县条目收集的(每个主进程 1 个)。首先出现的问题是处理的日期不匹配。但是,我手动检查了日志文件,看起来好像缺少一个日志条目,有后续的日志条目表明该天已实际处理。我不知道为什么它被省略了。
我真的不想编写更多代码来验证此代码,这看起来非常浪费时间,还有其他选择吗?
最佳答案
我在上面的评论中给出了一些一般性提示。我认为您的方法在不同的抽象层次上存在多个问题。您也没有显示所有相关代码。
问题很可能是
您必须经历所有这些要点,并且由于问题的复杂性,这里肯定没有人能够为您确定确切的问题。
如果您不确定日志文件的完整性,您应该根据处理引擎的有效负载输出执行分析。我想说的是:日志文件可能只是数据处理的副产品。主要产品是您应该分析的东西。当然,正确记录日志也很重要。但这两个问题应该分开对待。
基于多处理
的解决方案特别值得怀疑的是您等待工作人员完成的方式。您似乎不确定应该通过哪种方法等待您的 worker ,因此您应用了三种不同的方法:
首先,您在 while 循环中监视队列的大小并等待它变为 0。这是一种非规范方法,但实际上可能有效。
其次,您以一种奇怪的方式join()
您的流程:
for p in procs:
logging.debug(str(os.getpid()) + " - Joining Process {}".format(p))
p.join(1)
为什么在这里定义一秒的超时而不响应进程是否在该时间范围内实际终止?您应该真正加入一个进程,即等待它终止,或者指定一个超时,如果该超时在进程完成之前到期,请特别对待这种情况。您的代码无法区分这些情况,因此 p.join(1)
就像编写 time.sleep(1)
一样。
第三,您加入队列。
那么,在确保q.qsize()
返回0并再等待一秒钟后,您真的认为加入队列很重要吗?有什么区别吗?这些方法中的一个就足够了,您需要考虑哪些标准对您的问题最重要。也就是说,这些条件之一应该确定性地暗示其他两个条件。
所有这些看起来像是对多处理解决方案的快速而肮脏的黑客攻击,而您自己并不确定该解决方案应该如何表现。我在研究并发架构时获得的最重要的见解之一:作为架构师,您必须 100% 了解系统中的通信和控制流如何工作。未正确监视和控制工作进程的状态很可能是您所观察到的问题的根源。
关于python - 使用 Python 多处理/线程解决数据不一致问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28069845/
初学者 android 问题。好的,我已经成功写入文件。例如。 //获取文件名 String filename = getResources().getString(R.string.filename
我已经将相同的图像保存到/data/data/mypackage/img/中,现在我想显示这个全屏,我曾尝试使用 ACTION_VIEW 来显示 android 标准程序,但它不是从/data/dat
我正在使用Xcode 9,Swift 4。 我正在尝试使用以下代码从URL在ImageView中显示图像: func getImageFromUrl(sourceUrl: String) -> UII
我的 Ubuntu 安装 genymotion 有问题。主要是我无法调试我的数据库,因为通过 eclipse 中的 DBMS 和 shell 中的 adb 我无法查看/data/文件夹的内容。没有显示
我正在尝试用 PHP 发布一些 JSON 数据。但是出了点问题。 这是我的 html -- {% for x in sets %}
我观察到两种方法的结果不同。为什么是这样?我知道 lm 上发生了什么,但无法弄清楚 tslm 上发生了什么。 > library(forecast) > set.seed(2) > tts lm(t
我不确定为什么会这样!我有一个由 spring data elasticsearch 和 spring data jpa 使用的类,但是当我尝试运行我的应用程序时出现错误。 Error creatin
在 this vega 图表,如果我下载并转换 flare-dependencies.json使用以下 jq 到 csv命令, jq -r '(map(keys) | add | unique) as
我正在提交一个项目,我必须在其中创建一个带有表的 mysql 数据库。一切都在我这边进行,所以我只想检查如何将我所有的压缩文件发送给使用不同计算机的人。基本上,我如何为另一台计算机创建我的数据库文件,
我有一个应用程序可以将文本文件写入内部存储。我想仔细看看我的电脑。 我运行了 Toast.makeText 来显示路径,它说:/数据/数据/我的包 但是当我转到 Android Studio 的 An
我喜欢使用 Genymotion 模拟器以如此出色的速度加载 Android。它有非常好的速度,但仍然有一些不稳定的性能。 如何从 Eclipse 中的文件资源管理器访问 Genymotion 模拟器
我需要更改 Silverlight 中文本框的格式。数据通过 MVVM 绑定(bind)。 例如,有一个 int 属性,我将 1 添加到 setter 中的值并调用 OnPropertyChanged
我想向 Youtube Data API 提出请求,但我不需要访问任何用户信息。我只想浏览公共(public)视频并根据搜索词显示视频。 我可以在未经授权的情况下这样做吗? 最佳答案 YouTube
我已经设置了一个 Twilio 应用程序,我想向人们发送更新,但我不想回复单个文本。我只是想让他们在有问题时打电话。我一切正常,但我想在发送文本时显示传入文本,以确保我不会错过任何问题。我正在使用 p
我有一个带有表单的网站(目前它是纯 HTML,但我们正在切换到 JQuery)。流程是这样的: 接受用户的输入 --- 5 个整数 通过 REST 调用网络服务 在服务器端运行一些计算...并生成一个
假设我们有一个名为 configuration.js 的文件,当我们查看内部时,我们会看到: 'use strict'; var profile = { "project": "%Projec
这部分是对 Previous Question 的扩展我的: 我现在可以从我的 CI Controller 成功返回 JSON 数据,它返回: {"results":[{"id":"1","Sourc
有什么有效的方法可以删除 ios 中 CBL 的所有文档存储?我对此有疑问,或者,如果有人知道如何从本质上使该应用程序像刚刚安装一样,那也会非常有帮助。我们正在努力确保我们的注销实际上将应用程序设置为
我有一个 Rails 应用程序,它与其他 Rails 应用程序通信以进行数据插入。我使用 jQuery $.post 方法进行数据插入。对于插入,我的其他 Rails 应用程序显示 200 OK。但在
我正在为服务于发布请求的 API 调用运行单元测试。我正在传递请求正文,并且必须将响应作为帐户数据返回。但我只收到断言错误 注意:数据是从 Azure 中获取的 spec.js const accou
我是一名优秀的程序员,十分优秀!