gpt4 book ai didi

java - 有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以写入 Parquet 文件?

转载 作者:行者123 更新时间:2023-12-02 09:20:38 27 4
gpt4 key购买 nike

我正在尝试在 Beam/Java 中编写一个数据流作业来处理来自 Pub/Sub 并写入 Parquet 的一系列事件。 Pub/Sub 中的事件采用 JSON 格式,每个事件可以生成一行或多行。我能够编写一个非常简单的示例,编写仅返回 1 条记录的 ParDo 转换。 ParDo 看起来像这样

    static class GenerateRecords extends DoFn<String, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
final GenericData.Record record = new GenericData.Record(schema);
String msg = context.element();

com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);


context.output(pRecord);
}
}

以及管道的写入部分

                .apply("Write to file",
FileIO.<GenericRecord>
write()
.via(
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.to(options.getOutputDirectory())
.withNumShards(options.getNumShards())
.withSuffix("pfile")
);

我的问题是,如何概括此 ParDo 转换以返回记录列表?我尝试了 List,但不起作用,ParquetIO.sink(schema) 发出“无法通过以下方式解析方法”的警告。

最佳答案

您可以根据需要多次在 DoFn 中调用 context.output()。因此,如果您知道在哪种情况下需要发出多条记录的业务逻辑,那么您只需为每个输出记录调用 context.output(record) 即可。它应该比拥有容器的 PCollection 更简单。

PS:顺便说一句,我有一个 simple example如何使用 ParquetIOAvroCoder 编写 GenericRecord 可能会有所帮助。

关于java - 有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以写入 Parquet 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58721495/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com