- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
目标:
当前状态:
因此有了使用多处理的想法;我希望能够同时写入输出,而不是 CPU 绑定(bind)而是 I/O 绑定(bind)。
撇开背景不谈,这里是问题(本质上是一个设计问题)- multiprocessing library通过 pickle 对象然后将数据管道传输到其他生成的进程来工作;但是我尝试在 WriteWorker Process 中使用的 ResultProxy 对象和共享队列不可挑选,这导致以下消息(不是逐字记录,但足够接近):
pickle.PicklingError: Can't pickle object in WriteWorker.start()
所以对你们有用的人的问题是,关于可以避免此问题的潜在设计模式或方法的任何想法?这似乎是一个简单、经典的生产者-消费者问题,我想出了解决方案很简单,我只是想多了
感谢任何帮助或反馈!谢谢:)
编辑:这里有一些相关的代码片段,如果我可以提供任何其他上下文,请告诉我
来自父类:
#init manager and queues
self.manager = multiprocessing.Manager()
self.query_queue = self.manager.Queue()
self.write_queue = self.manager.Queue()
def _get_data(self):
#spawn a pool of query processes, and pass them query queue instance
for i in xrange(self.NUM_QUERY_THREADS):
qt = QueryWorker.QueryWorker(self.query_queue, self.write_queue, self.config_values, self.args)
qt.daemon = True
# qt.setDaemon(True)
qt.start()
#populate query queue
self.parse_sql_queries()
#spawn a pool of writer processes, and pass them output queue instance
for i in range(self.NUM_WRITE_THREADS):
wt = WriteWorker.WriteWorker(self.write_queue, self.output_path, self.WRITE_BUFFER, self.output_dict)
wt.daemon = True
# wt.setDaemon(True)
wt.start()
#wait on the queues until everything has been processed
self.query_queue.join()
self.write_queue.join()
来自 QueryWorker 类:
def run(self):
while True:
#grabs host from query queue
query_tupe = self.query_queue.get()
table = query_tupe[0]
query = query_tupe[1]
query_num = query_tupe[2]
if query and table:
#grab connection from pool, run the query
connection = self.engine.connect()
print 'Running query #' + str(query_num) + ': ' + table
try:
result = connection.execute(query)
except:
print 'Error while running query #' + str(query_num) + ': \n\t' + str(query) + '\nError: ' + str(sys.exc_info()[1])
#place result handle tuple into out queue
self.out_queue.put((table, result))
#signals to queue job is done
self.query_queue.task_done()
最佳答案
简单的答案是避免直接使用 ResultsProxy。而是使用 cursor.fetchall() 或 cursor.fetchmany(number_to_fetch) 从 ResultsProxy 获取数据,然后将数据传递到多处理队列。
关于Python 多处理与 unpicklable 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13594934/
我有一个对象,它会在第一次使用后被缓存。我将使用 cPickle 模块执行此操作。如果模块已经被缓存,当我下次尝试实例化对象时(在另一个进程中)我想使用缓存的对象。以下是我的基本结构: import
下面的例子 pickles 很好,但是我得到一个编译错误,指出无法生成 unpickler。这是一个简单的测试用例来重现这一点: import scala.pickling._ import json
我有一个文件,其中包含一些我在主脚本中不需要的字典和列表(大约 900 行)。然后我执行以下操作。 myDicts = [DictOne, DictTwo, ListOne, ListTwo] pic
这个问题在这里已经有了答案: Is there a way to view cPickle or Pickle file contents without loading Python in Win
我正在尝试使用 pickle 来保存自定义类;非常类似于下面的代码(尽管在类上定义了一些方法,还有一些用于数据的指令等)。然而,当我运行这个程序时,pickle 然后 unpickle,我丢失了类中的
是否有一种好方法来加载表示为字符串的字节对象,以便可以对其进行 unpickled? 基本示例 这是一个愚蠢的例子: import pickle mydict = { 'a': 1111, 'b':
全局变量 Agree 是在所有函数外部定义的命名元组: Agree = collections.namedtuple('Agree', ['kappa', 'alpha','avg_ao'], ver
我正在使用 python 请求库并尝试保持 session 。 由于我的主机上有多个 IP,我创建了以下方法以使 session 绑定(bind)到特定 IP。 class SourceAddress
我一直在开发一个 python 应用程序,其中客户端向服务器发送时钟信号,而服务器以音频信号响应。 我有两个按钮,一个用于启动时钟,一个用于暂停轨道。 主类 # function I call whe
import pickle class ABError(Exception): def __init__(self, a, b): super(ABError, self)._
我目前正在开发一个 Django 项目,希望通过网络对视频文件进行一些转换。为了对视频进行转换,我使用了 opencv 的 python API,我还使用 Dajax 来执行 ajax 请求。 在 a
当我尝试解开 cifar-10 数据集时,出现以下错误。我需要训练一个模型,但我什至无法获取操作数据。我该如何解决这个问题 dict=cPickle.load(fo) UnpicklingError:
我想实现一个类(最好是单例),在初始化阶段应使用 cPickle 机制恢复其状态。为此,我编写了以下代码片段: import cPickle import collections class Test
我在重命名模块后通过 numpy.load 加载对象时遇到问题。这是一个显示问题的简单示例。 假设在 mymodule.py 中定义了一个类: class MyClass(object): a
As stated in the pickle documentation ,类通常以这样一种方式进行 pickle ,即它们要求完全相同的类出现在接收端的模块中。但是,我确实注意到类还有一些 __g
我有一个我想打开的 pickled 对象,但闲置时返回错误 TypeError: file must have 'read' and 'readline' attributes 这是我的代码 open
我正在编写脚本来通过反复取消对象直到 EOF 来处理(非常大的)文件。我想对文件进行分区并让单独的进程(在云中)解开并处理单独的部分。 但是我的分区器并不智能,它不知道文件中 pickle 对象之间的
目标: 使用带有线程或进程的 SQLAlchemy 在数据库中运行约 40 个巨大的查询,将相应的 SQLA ResultProxies在 Queue.Queue 中(由 multiprocessin
我正在尝试加载 pickle 并通过 Flask 应用程序显示一些数据。我遵循的结构如下。 package1.py class myclass: #do something m = mycla
使用多处理时出现以下错误: Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/pyth
我是一名优秀的程序员,十分优秀!