gpt4 book ai didi

java - 当从 Collection 生成源时,Flink 不进行检查点,BucketingSink 会将文件置于挂起状态

转载 作者:行者123 更新时间:2023-11-30 06:13:53 25 4
gpt4 key购买 nike

我正在尝试使用集合生成一些测试数据,并将该数据写入 s3,当我这样做时,Flink 似乎根本不执行任何检查点操作,但当源来自 s3 时,它确实会执行检查点操作。

例如,此确实检查点并使输出文件处于已完成状态:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))

val lines: DataStream[String] = {
val path = "s3a://my_bucket/simple_job/in"
env
.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = 5000L
)
}

val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))

lines.addSink(sinkFunction)

env.execute()

同时,这不会检查点,并且即使在作业完成后也会使文件处于 .pending 状态:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))

val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString))

val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))

lines.addSink(sinkFunction)

env.execute()

最佳答案

原来是因为这张票:https://issues.apache.org/jira/browse/FLINK-2646之所以会发生,是因为集合中的流在应用程序有时间创建单个检查点之前就完成了。

关于java - 当从 Collection 生成源时,Flink 不进行检查点,BucketingSink 会将文件置于挂起状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49655460/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com