gpt4 book ai didi

java - 将 GenericRecords 的 pCollection 写入 Parquet 文件的数据流

转载 作者:行者123 更新时间:2023-12-01 18:45:23 26 4
gpt4 key购买 nike

在 apache beam 步骤中,我有一个 PCollection KV<String, Iterable<KV<Long, GenericRecord>>>> 。我想将可迭代中的所有记录写入同一个 Parquet 文件中。我的代码片段如下

p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord>
.apply(ParDo.of(new MapKafkaGenericRecordValue(formatter, options.getFileNameDelimiter()))) //PCollection<KV<String, KV<Long, GenericRecord>>>
.apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>

现在我想将Iterable中的所有记录写入同一个parquet文件中(通过KV键导出文件名)。

最佳答案

我找到了问题的解决方案。在步骤 -

apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>

我将应用另一个转换,该转换将仅返回 Iterable 作为输出 pCollection。`.apply(ParDo.of(new GetIterable()))//PCollection>>其中 key 是我必须写入的文件的名称。那么剩下的片段是

.apply(Flatten.iterables())
.apply(
FileIO.<String, KV<String, GenericRecord>>writeDynamic()
.by((SerializableFunction<KV<String, GenericRecord>, String>) KV::getKey)
.via(
Contextful.fn(
(SerializableFunction<KV<String, GenericRecord>, GenericRecord>) KV::getValue
),
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)


)

.withTempDirectory("/tmp/temp-beam")
.to(options.getGCSBucketUrl())
.withNumShards(1)
.withDestinationCoder(StringUtf8Coder.of())
)

关于java - 将 GenericRecords 的 pCollection 写入 Parquet 文件的数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59851726/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com