gpt4 book ai didi

python - 光束 : ReadAllFromText receive string or list from DoFn different behavior?

转载 作者:行者123 更新时间:2023-12-01 07:23:04 27 4
gpt4 key购买 nike

我有一个管道从 GCS 读取文件通过Pub\Sub ,

class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield file_name

class LogFn(beam.DoFn):
def process(self, element):
logging.info(element)
return [element]

class LogPassThroughFn(beam.DoFn):
def process(self, element):
logging.info(element)
return element

...

p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
| 'Log Results' >> beam.ParDo(LogFn())
# | 'Log Results' >> beam.ParDo(LogPassThroughFn())
| "Read File from GCS" >> beam.io.ReadAllFromText()

LogPassThroughFn的区别和LogPassThroughFn是返回值的类型,其中之一是 string ,另一个是list 。还有LogFn在测试代​​码中运行良好,但是 LogPassThroughFn使管道无法运行。每this issue answer

Beam Python SDK still tries to interpret the output of that ParDo as if it was a collection of elements. And it does so by interpreting the string you emitted as collection of characters.

我们知道LogFn应该可以正常工作。

但是,我注意到 ExtractFileNameFn返回string而不是list 。那是对的吗?然后我测试如下,返回 listExtractFileNameFn1

class ExtractFileNameFn1(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield [file_name]

...

p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn1())
| "Read File from GCS" >> beam.io.ReadAllFromText()

现在,管道无法运行...

我的问题是 return string 之间有什么区别并返回list在DoFn?为什么ReadAllFromText可以收到string来自ExtractFileNameFn ,但收到list来自LogFn

光束版本:2.14.0

最佳答案

ParDo 的文档说:

Note that the DoFn must return an iterable for each element of the input PCollection. An easy way to do this is to use the yield keyword in the process method.

https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo

返回可迭代的目的是您的输入元素可能不会与输出元素映射 1-1。单个输入可能产生多个输出。

您可以随时产生它们,或者您可以将它们收集到一个列表中并在最后返回它们

所以这个:

class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield file_name

与此相同:

class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
return [file_name]

两者的输出元素都是字符串,每个输出元素都是一个文件名

当您yield [file_name]时,每个输出元素实际上是一个包含字符串的列表

关于python - 光束 : ReadAllFromText receive string or list from DoFn different behavior?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57591856/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com