gpt4 book ai didi

python - Apache Beam 管道步骤未并行运行? (Python)

转载 作者:行者123 更新时间:2023-12-04 04:05:39 25 4
gpt4 key购买 nike

我使用了略微修改过的 wordcount 示例版本 ( https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py ),用以下内容替换了 process 函数:

  def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
import random
import time
n = random.randint(0, 1000)
time.sleep(5)
logging.getLogger().warning('PARALLEL START? ' + str(n))
time.sleep(5)

text_line = element.strip()
if not text_line:
self.empty_line_counter.inc(1)
words = re.findall(r'[\w\']+', text_line, re.UNICODE)
for w in words:
self.words_counter.inc()
self.word_lengths_counter.inc(len(w))
self.word_lengths_dist.update(len(w))

time.sleep(5)
logging.getLogger().warning('PARALLEL END? ' + str(n))
time.sleep(5)

return words

想法是检查该步骤是否正在并行执行。例如,预期的输出是:

PARALLEL START? 447
PARALLEL START? 994
PARALLEL END? 447
PARALLEL START? 351
PARALLEL START? 723
PARALLEL END? 994
PARALLEL END? 351
PARALLEL END? 723

然而,实际结果是这样的,这表明该步骤不是并行运行的:

PARALLEL START? 447
PARALLEL END? 447
PARALLEL START? 994
PARALLEL END? 994
PARALLEL START? 351
PARALLEL END? 351
PARALLEL START? 723
PARALLEL END? 723

我尝试过使用手动设置 direct_num_workers 的 LocalRunner,以及对多个 worker 使用 DataflowRunner,但都无济于事。如何确保此步骤实际并行运行?

更新:找到多处理模式here看起来很有前途。但是,在 Windows 命令行上>), 我在使用时收到以下错误:

Exception in thread run_worker:
Traceback (most recent call last):
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\threading.py", line 926, in _bootstrap_inner
self.run()
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\portability\local_job_service.py", line 218, in run
p = subprocess.Popen(self._worker_command_line, shell=True, env=env_dict)
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 775, in __init__
restore_signals, start_new_session)
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 1119, in _execute_child
args = list2cmdline(args)
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 530, in list2cmdline
needquote = (" " in arg) or ("\t" in arg) or not arg
TypeError: argument of type 'int' is not iterable

最佳答案

标准 Apache Beam 示例使用非常小的数据输入:gs://dataflow-samples/shakespeare/kinglear.txt 只有几 KB,因此无法很好地拆分工作。

Apache Beam 通过拆分输入数据来实现并行化。例如,如果您有很多文件,每个文件将被并行使用。如果您有一个非常大的文件,Beam 能够将该文件拆分成多个段,这些段将被并行使用。

您的代码最终应该显示并行性是正确的 - 但尝试使用(显着)更大的输入。

关于python - Apache Beam 管道步骤未并行运行? (Python),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62563137/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com