gpt4 book ai didi

python - Apache Beam 将每个标记的输出写入单独的文件

转载 作者:行者123 更新时间:2023-12-05 05:15:48 25 4
gpt4 key购买 nike

我根据输入数据元素之一(日期)标记输入元素。

class TagElementsWithDate(beam.DoFn):
def process(self, element):
dt = element['date'].replace('-', '')[:6]
yield pvalue.TaggedOutput(dt, element)

input_data = p | 'Read Input' >> beam.io.Read(beam.io.BigQuerySource(query='select id, date from `project.dataset.tablename`', use_standard_sql=True))

tagged_data = input_data | 'tag data' >> beam.ParDo(TagElementsWithDate()).with_outputs()

tagged_data 是 DoOutputsTuple。我希望对此进行迭代并将每个标记数据写入一个单独的文件。

最佳答案

您需要编写自己的 DoFn。有点像

from apache_beam.io.textio import _TextSink


class WriteEachKeyToText(beam.DoFn):
def __init__(self, file_path_prefix=str):
super().__init__()
self.file_path_prefix = file_path_prefix

def process(self, kv):
key = kv[0]
elements = kv[1]
sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")

writer = sink.open_writer("prefix", self.file_path_prefix)
for e in elements: # values
writer.write(e)

然后你可以像这样使用它:

output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))

关于python - Apache Beam 将每个标记的输出写入单独的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51254057/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com