gpt4 book ai didi

python - 使用 Python 多处理/线程解决数据不一致问题

转载 作者:行者123 更新时间:2023-12-01 04:52:53 24 4
gpt4 key购买 nike

TL;DR:使用线程、多处理和单线程运行代码后得到不同的结果。需要故障排除指导。

您好,如果这可能有点过于笼统,我提前表示歉意,但我需要一些帮助来解决问题,并且我不确定如何最好地继续。

故事是这样的;我有一堆数据索引到 Solr 集合(约 2.5 亿项)中,该集合中的所有项都有一个 sessionid。有些项目可以共享相同的 session ID。我正在梳理该集合,以提取具有相同 session 的所有项目,对数据进行一些处理,然后生成另一个 JSON 文件以供稍后索引。

该代码有两个主要功能:proc_day - 接受一天并处理当天的所有 session 和proc_session - 完成单个 session 需要发生的所有事情。

多处理是在proc_day上实现的,因此每一天都会由一个单独的进程处理,proc_session函数可以用线程运行。下面是我用于线程/多重处理的代码。它接受一个函数、一个参数列表和线程/多进程的数量。然后它将根据输入参数创建一个队列,然后创建进程/线程并让它们通过它。我没有发布实际的代码,因为它通常在单线程下运行良好,没有任何问题,但如果需要,可以发布它。

autoprocs.py

import sys
import logging
from multiprocessing import Process, Queue,JoinableQueue
import time
import multiprocessing
import os

def proc_proc(func,data,threads,delay=10):
if threads < 0:
return
q = JoinableQueue()
procs = []

for i in range(threads):
thread = Process(target=proc_exec,args=(func,q))
thread.daemon = True;
thread.start()
procs.append(thread)

for item in data:
q.put(item)

logging.debug(str(os.getpid()) + ' *** Processes started and data loaded into queue waiting')

s = q.qsize()
while s > 0:
logging.info(str(os.getpid()) + " - Proc Queue Size is:" + str(s))
s = q.qsize()
time.sleep(delay)

for p in procs:
logging.debug(str(os.getpid()) + " - Joining Process {}".format(p))
p.join(1)

logging.debug(str(os.getpid()) + ' - *** Main Proc waiting')
q.join()
logging.debug(str(os.getpid()) + ' - *** Done')

def proc_exec(func,q):
p = multiprocessing.current_process()
logging.debug(str(os.getpid()) + ' - Starting:{},{}'.format(p.name, p.pid))
while True:
d = q.get()
try:
logging.debug(str(os.getpid()) + " - Starting to Process {}".format(d))
func(d)
sys.stdout.flush()
logging.debug(str(os.getpid()) + " - Marking Task as Done")
q.task_done()
except:
logging.error(str(os.getpid()) + " - Exception in subprocess execution")
logging.error(sys.exc_info()[0])
logging.debug(str(os.getpid()) + 'Ending:{},{}'.format(p.name, p.pid))

自动线程.py:

import threading
import logging
import time
from queue import Queue

def thread_proc(func,data,threads):
if threads < 0:
return "Thead Count not specified"
q = Queue()

for i in range(threads):
thread = threading.Thread(target=thread_exec,args=(func,q))
thread.daemon = True
thread.start()

for item in data:
q.put(item)

logging.debug('*** Main thread waiting')
s = q.qsize()
while s > 0:
logging.debug("Queue Size is:" + str(s))
s = q.qsize()
time.sleep(1)
logging.debug('*** Main thread waiting')
q.join()
logging.debug('*** Done')

def thread_exec(func,q):
while True:
d = q.get()
#logging.debug("Working...")
try:
func(d)
except:
pass
q.task_done()

在不同的多处理/线程配置下运行 python 后,我在验证数据时遇到了问题。有很多数据,所以我真的需要让多重处理工作。这是我昨天的测试结果。

Only with multiprocessing - 10 procs: 
Days Processed 30
Sessions Found 3,507,475
Sessions Processed 3,514,496
Files 162,140
Data Output: 1.9G

multiprocessing and multithreading - 10 procs 10 threads
Days Processed 30
Sessions Found 3,356,362
Sessions Processed 3,272,402
Files 424,005
Data Output: 2.2GB

just threading - 10 threads
Days Processed 31
Sessions Found 3,595,263
Sessions Processed 3,595,263
Files 733,664
Data Output: 3.3GB

Single process/ no threading
Days Processed 31
Sessions Found 3,595,263
Sessions Processed 3,595,263
Files 162,190
Data Output: 1.9GB

这些计数是通过 grep 和日志文件中的县条目收集的(每个主进程 1 个)。首先出现的问题是处理的日期不匹配。但是,我手动检查了日志文件,看起来好像缺少一个日志条目,有后续的日志条目表明该天已实际处理。我不知道为什么它被省略了。

我真的不想编写更多代码来验证此代码,这看起来非常浪费时间,还有其他选择吗?

最佳答案

我在上面的评论中给出了一些一般性提示。我认为您的方法在不同的抽象层次上存在多个问题。您也没有显示所有相关代码。

问题很可能是

  1. 在您用来从 solr 读取数据的方法中或在将读取数据提供给工作人员之前准备读取数据的方法中。
  2. 在您提出的用于在多个进程之间分配工作的架构中。
  3. 在您的日志基础设施中(正如您自己指出的那样)。
  4. 您的分析方法。

必须经历所有这些要点,并且由于问题的复杂性,这里肯定没有人能够为您确定确切的问题。

关于第 (3) 点和第 (4) 点:

如果您不确定日志文件的完整性,您应该根据处理引擎的有效负载输出执行分析。我想说的是:日志文件可能只是数据处理的副产品。主要产品是您应该分析的东西。当然,正确记录日志也很重要。但这两个问题应该分开对待。

我对上面列表中第 (2) 点的贡献:

基于多处理的解决方案特别值得怀疑的是您等待工作人员完成的方式。您似乎不确定应该通过哪种方法等待您的 worker ,因此您应用了三种不同的方法:

首先,您在 while 循环中监视队列的大小并等待它变为 0。这是一种非规范方法,但实际上可能有效。

其次,您以一种奇怪的方式join()您的流程:

for p in procs:
logging.debug(str(os.getpid()) + " - Joining Process {}".format(p))
p.join(1)

为什么在这里定义一秒的超时而不响应进程是否在该时间范围内实际终止?您应该真正加入一个进程,即等待它终止,或者指定一个超时,如果该超时在进程完成之前到期,请特别对待这种情况。您的代码无法区分这些情况,因此 p.join(1) 就像编写 time.sleep(1) 一样。

第三,您加入队列。

那么,在确保q.qsize()返回0并再等待一秒钟后,您真的认为加入队列很重要吗?有什么区别吗?这些方法中的一个就足够了,您需要考虑哪些标准对您的问题最重要。也就是说,这些条件之一应该确定性地暗示其他两个条件。

所有这些看起来像是对多处理解决方案的快速而肮脏的黑客攻击,而您自己并不确定该解决方案应该如何表现。我在研究并发架构时获得的最重要的见解之一:作为架构师,您必须 100% 了解系统中的通信和控制流如何工作。未正确监视和控制工作进程的状态很可能是您所观察到的问题的根源。

关于python - 使用 Python 多处理/线程解决数据不一致问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28069845/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com