gpt4 book ai didi

python - 按元组顺序展平 PCollection

转载 作者:行者123 更新时间:2023-12-01 01:56:54 25 4
gpt4 key购买 nike

我正在尝试使用 Apache Beam 中的 Flatten 函数添加 header 。但是,似乎没有办法根据文档设置顺序:https://beam.apache.org/documentation/sdks/pydoc/2.4.0/apache_beam.transforms.core.html?highlight=flatten#apache_beam.transforms.core.Flatten

有时标题位于数据的末尾,而其他则位于顶部。有没有办法设置顺序?想知道我是否遗漏了一些东西。

with beam.Pipeline(options=options) as p:


header = [
('name', 'number'),
]
phones_list = [
('amy', '111-222-3333'),
('james', '222-333-4444'),
('amy', '333-444-5555'),
('carl', '444-555-6666'),
]

header = p | 'Header' >> beam.Create(header)
phones = p | 'CreatePhones' >> beam.Create(phones_list)

merged = ((phones,header)
| 'MergedPColl' >> beam.Flatten())

output = merged

output | 'Write' >> beam.io.WriteToText('./_output')

输出1:

('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')
('name', 'number')

输出2:

('name', 'number')
('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')

最佳答案

Flatten 是一个适用于 PCollection 的转换器。为了使合并工作并行进行,我认为他们不能保证顺序被保留;这与生成的 PCollection 的无序性质是一致的。

但是,如果您的唯一目的是在顶部添加 header ,则可以使用 textio.WriteToText()header 参数。 .

> header (str): String to write at beginning of file as a header. If not :data:None and append_trailing_newlines is set, `\n`` will be added.

phones | 'Write' >> beam.io.WriteToText(
# Feel free to make your own header format.
'./_output', header="('name', 'number')")

更一般地,为了保留原始输入的序列,我会使用序列号来扩充输入数据。在beam的并行变换(携带每个元素的序列号)之后,您始终可以通过对该序列号进行排序作为后处理步骤(在非并行模式下)来“恢复”原始顺序。

关于python - 按元组顺序展平 PCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50063091/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com