- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个包含 bz2 压缩文件的 tarfile。我想将函数 clean_file
应用于每个 bz2 文件,并整理结果。在系列中,这很容易用一个循环:
import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import Pool
def clean_file(member):
if '.bz2' in str(member):
f = tr.extractfile(member)
with bz2.open(f, "rt") as bzinput:
dicts = []
for i, line in enumerate(bzinput):
line = line.replace('"name"}', '"name":" "}')
dat = json.loads(line)
dicts.append(dat)
bzinput.close()
f.close()
del f, bzinput
processed = dicts[0]
return processed
else:
pass
# Open tar file and get contents (members)
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)
# Apply the clean_file function in series
i=0
processed_files = []
for m in members:
processed_files.append(clean_file(m))
i+=1
print('done '+str(i)+'/'+str(num_files))
但是,我需要能够并行执行此操作。我正在尝试使用 Pool
的方法,如下所示:
# Apply the clean_file function in parallel
if __name__ == '__main__':
with Pool(2) as p:
processed_files = list(p.map(clean_file, members))
但这会返回一个 OSError:
Traceback (most recent call last):
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "parse_data.py", line 19, in clean_file
for i, line in enumerate(bzinput):
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/bz2.py", line 195, in read1
return self._buffer.read1(size)
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 68, in readinto
data = self.read(len(byte_view))
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 103, in read
data = self._decompressor.decompress(rawblock, size)
OSError: Invalid data stream
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "parse_data.py", line 53, in <module>
processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/site-packages/tqdm/std.py", line 1167, in __iter__
for obj in iterable:
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 735, in next
raise value
OSError: Invalid data stream
所以我猜想这种方式无法正确访问 data.tar 或其他文件中的文件。如何并行应用函数?
我猜这将适用于任何包含 bz2 文件的 tar 存档,但这是我重现错误的数据: https://github.com/johnf1004/reproduce_tar_error
最佳答案
你没有指定你在什么平台上运行,但我怀疑它是 Windows,因为你有 ...
if __name__ == '__main__':
main()
... 这对于在使用 OS 函数 spawn
创建新进程的平台上创建进程的代码是必需的。但这也意味着当创建一个新进程时(例如,您正在创建的进程池中的所有进程),每个进程都从程序的最顶部重新执行源程序开始。这意味着每个池进程正在执行以下代码:
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)
但是,我不明白为什么这本身会导致错误,但我不能确定。然而,问题可能是,这是在调用辅助函数之后执行的,正在调用 clean_file
,因此 tr
尚未设置。如果此代码位于 clean_file
之前,它可能会起作用,但这只是一个猜测。当然,在每个池进程中使用 members = tr.getmembers()
提取成员是一种浪费。 每个进程都需要打开 tar 文件,最好只打开一次。
但很明显,您发布的堆栈跟踪与您的代码不匹配。你显示:
Traceback (most recent call last):
File "parse_data.py", line 53, in <module>
processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
但是您的代码没有任何对 tqdm
的引用或使用方法 imap
。现在,当您发布的代码与产生异常的代码不完全匹配时,分析您的问题到底是什么变得更加困难。
如果您在 Mac 上运行,可能会使用 fork
来创建新进程,当主进程创建了多个线程(您没有创建)时,这可能会出现问题必须看到,也许通过 tarfile
模块)然后你创建一个新进程,我已经指定代码以确保 spawn
用于创建新进程。无论如何,下面的代码应该 可以工作。它还介绍了一些优化。如果没有,请发布一个新的堆栈跟踪。
import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import get_context
def open_tar():
# open once for each process in the pool
global tr
tr = tarfile.open('data.tar')
def clean_file(member):
f = tr.extractfile(member)
with bz2.open(f, "rt") as bzinput:
for line in bzinput:
line = line.replace('"name"}', '"name":" "}')
dat = json.loads(line)
# since you are returning just the first occurrence:
return dat
def main():
with tarfile.open('data.tar') as tr:
members = tr.getmembers()
# just pick members where '.bz2' is in member:
filtered_members = filter(lambda member: '.bz2' in str(member), members)
ctx = get_context('spawn')
# open tar file just once for each process in the pool:
with ctx.Pool(initializer=open_tar) as pool:
processed_files = pool.map(clean_file, filtered_members)
print(processed_files)
# required for when processes are created using spawn:
if __name__ == '__main__':
main()
关于python - 无法将函数并行映射到 tarfile 成员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67047533/
test = (function(){var key = 200; return {getKey : function(){return key} }; })(); test.
如果这个问题可能一直被问到,我很抱歉,但我进行了搜索,但找不到足够的答案。 如果公共(public)成员/方法正在访问私有(private)成员/字段,如何禁用它们的继承? 所以考虑一下: publi
重要的澄清:一些评论者似乎认为我是从 union 复制的。仔细查看 memcpy,它从一个普通的旧 uint32_t 地址复制而来,该地址不包含在 union 中。另外,我正在(通过 memcpy)复
spinner 通常只显示一个字符串,在我想分配 IDpersonne 和 Name 的情况下,旋转器必须告诉我名字。当我得到选定的项目时,我必须得到 ID。我该怎么做? 最佳答案 我假设您已将项目排
A 类的实例是 B 类的公共(public)成员。B 类的实例也是 A 的公共(public)成员。在什么情况下可能需要这种实现?我的意思是是否有一个或多个标准场景需要这种实现方式?更具体的细节:我有
我如何设置我的 web.config 以使用表单例份验证,将成员身份提供程序设置为 ActiveDirectoryMembershipProvider 并使用内置登录控件。这样我就可以使用有效的事件目
这个问题已经有答案了: Should methods in a Java interface be declared with or without a public access modifier?
因此根据定义,类中的私有(private)数字在序列化时以类名作为前缀。这对我来说是一个问题,我希望能够序列化/保存/反序列化一个确切的对象,但是 php 所做的是给我另一个 classname+va
我实现了一个成员? clojure 中的函数如下: (defn member? [item seq] (cond (empty? seq) false (= item (first
我在这里的问题似乎总是与使用函数有关。它仍然让我困惑!在本教科书练习中,我被要求按值传递结构,然后调整它并按引用传递。最初我设计的代码是在 main 中完成所有工作。现在我正在传递值。所以我添加了新函
所以我有这些变量 List files, images = new List(); string rootStr; 还有这个线程函数 private static int[] thread_searc
我对 C++ 模板和尝试弄清楚部分模板特化还比较陌生。我正在使用模板实现几个相关的数据结构:用于概率存在/不存在查询的布隆过滤器(基于位数组),以及用于丰度查询的计数布隆过滤器(带有整数数组)。我从以
例如在 java 中,我在外部类和内部类中声明并初始化了一个 JButton,我决定在某些情况下将其隐藏,这是一种安全的编程实践吗? 最佳答案 内部类的全部目的是它们可以访问到环绕内部类的外部类。 所
我有一个使用库进行通信的类: class Topic { Topic( Type T, String name ); }; class Reader { Reader (Topic, Stri
我在两个单独的文件中有以下代码。 package animal; public class Frog { protected void ribbit() { Syste
我有一个分数列表。使用这些,我需要从 redis 排序集中提取值。 我知道我可以使用 zrangebyscore - 但如果我提供的列表中的分数不连续怎么办?在这种情况下,我不能依赖 zrangeby
过去几年我一直被 C# 编码宠坏了,现在我又回到了 C++ 并发现我在处理本应很简单的东西时遇到了麻烦。我正在为 gamedev 使用名为 DarkGDK 的第三方库(任何以 db 为前缀的命令),但
我正在关注 Brian Harvey 从 2011 年开始在 UC Berkeley site 上的 SICP 讲座。 .他正在使用 STk interpreter教这门课,我正在使用带有 DrRac
在这段代码中,为什么在运算符重载中无法访问我的类的私有(private)字段? (请注意,这只是一个 MRE,不是完整代码) template class Frac template Frac o
在命名命名空间类中,我将一个类(位于全局命名空间中)声明为友元。 但是,后一个类不能访问前一个类的私有(private)成员。为什么是这样?有什么办法可以解决吗? Bob.h namespace AB
我是一名优秀的程序员,十分优秀!