gpt4 book ai didi

java - Flink 流式传输 - 使用 StreamingFileSink 时更改部分文件名?

转载 作者:行者123 更新时间:2023-12-05 07:23:05 29 4
gpt4 key购买 nike

我正在尝试使用 Flink 流来使用 Kafka 主题消息并创建(定期)将保存在 s3 上的 Parquet 文件。
在使用批量格式的流文件接收器时,有没有办法将创建的部分文件名(或添加后缀/前缀)更改为比 part-0-0 或 part-1-3 更独特?

StreamingFileSink<> sink = StreamingFileSink.forBulkFormat(new Path("s3://test-bucket/"),               ParquetAvroFactory.getParquetWriter(schema,  CompressionCodec.UNCOMPRESSED.name()))
.withBucketAssigner(new PartitionBucketAssigner(partitionColumns))
.build();

最佳答案

您可以覆盖 BucketAssigner 上的 getBucketId 方法(参见 https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html ),这将影响路径,但显然不会影响部分文件名(参见下面的评论)。

部分文件名是在 org.apache.flink.streaming.api.functions.sink.filesystem.Bucket 中的这段代码中建立的:

private Path assembleNewPartPath() {
return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
}

这似乎不是为定制而设计的。

关于java - Flink 流式传输 - 使用 StreamingFileSink 时更改部分文件名?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56224800/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com