gpt4 book ai didi

apache-flink - Flink 如何在 S3 中将 DataSet 写成 Parquet 文件?

转载 作者:行者123 更新时间:2023-12-04 10:53:47 32 4
gpt4 key购买 nike

如何使用 Flink 将 DataSet 作为 Parquet 文件写入 s3 bucket。是否有像 spark 这样的直接函数:DF.write.parquet("write in parquet")

请帮助我如何以 parquet 格式编写 flink 数据集。

我在尝试将我的 DataSet 转换为 (Void,GenericRecord) 时卡住了

    DataSet<Tuple2<Void,GenericRecord>> df = allEvents.flatMap(new FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<Void, GenericRecord>>() {
@Override
public void flatMap(Tuple2<LongWritable, Text> longWritableTextTuple2, Collector<Tuple2<Void, GenericRecord>> collector) throws Exception {
JsonAvroConverter converter = new JsonAvroConverter();
Schema schema = new Schema.Parser().parse(new File("test.avsc"));
try {
GenericRecord record = converter.convertToGenericDataRecord(longWritableTextTuple2.f1.toString().getBytes(), schema);
collector.collect( new Tuple2<Void,GenericRecord>(null,record));
}
catch (Exception e) {
System.out.println("error in converting to avro")
}
}
});
Job job = Job.getInstance();
HadoopOutputFormat parquetFormat = new HadoopOutputFormat<Void, GenericRecord>(new AvroParquetOutputFormat(), job);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
df.output(parquetFormat);
env.execute();

请帮我解决我做错的地方。我得到异常和这个代码不工作。

最佳答案

它比 Spark 稍微复杂一些。我能够在 Flink 中读写 Parquet 数据的唯一方法是通过 Hadoop 和 MapReduce 兼容性。您的依赖项中需要 hadoop-mapreduce-client-coreflink-hadoop-compatibility。然后您需要创建一个适当的 HadoopOutoutFormat。你需要做这样的事情:

val job = Job.getInstance()
val hadoopOutFormat = new hadoop.mapreduce.HadoopOutputFormat[Void, SomeType](new AvroParquetOutputFormat(), job)
FileOutputFormat.setOutputPath(job, [somePath])

然后你可以做:

dataStream.writeUsingOutputFormat(hadoopOutFormat)

关于apache-flink - Flink 如何在 S3 中将 DataSet 写成 Parquet 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59330502/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com