gpt4 book ai didi

apache-kafka - 使用spark-streaming时如何自己保存多个分区的Kafka偏移量

转载 作者:行者123 更新时间:2023-12-04 04:08:47 24 4
gpt4 key购买 nike

我使用spark-streaming读取kafka数据,并处理每一行

我在下面使用创建一个流:

lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics,kafkaParams)
);

然后我用这段代码处理来自kafka的数据

    lines.foreachRDD((JavaRDD<ConsumerRecord<String, String>> rdd) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
OffsetRange[] range = new OffsetRange[1];
range[0] = o;

rdd.foreachPartition((Iterator<ConsumerRecord<String, String>> partitionOfRecords) -> {
// get kafka offset
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
// to cache line data
List<String> jsonData = new ArrayList<>();
// to read all line data
while (partitionOfRecords.hasNext()) {
ConsumerRecord<String, String> line = partitionOfRecords.next();
jsonData.add(line.value());
}
// TODO do my own bussiness from jsonData
.......
// HOW can I commit kafka Offset Here??
// this is a method to commit offset
((CanCommitOffsets) lines.inputDStream()).commitAsync(range)
});
});

我试了很多次,发现有问题:

  1. 如果我的数据处理成功而其他分区失败,它如何工作?这意味着我所有的数据处理应该回来?因为kafka offset有commit;

  2. 我已经运行了这段代码,然后我发现它真正执行提交操作是在下次这个rdd执行器运行时,这意味着如果进度OOM或被杀死,下次我从Kafka读取一些数据时双?

最佳答案

How does it work if my data process success when other partition failed? it means all my data process should come back? Because kafka offset has commit

如果特定任务失败,Spark 将尝试根据 spark.task.maxFailures 设置重新执行它。如果该数字已通过,则整个作业将失败。您需要确保如果 commitAsync 之前的部分失败,您不会提交偏移量。

I have run this code, then I found it really execute commit operate is when the next time this rdd executor run,it means if the progress oom or be killed , the next time some data I read from Kafka will double ?

是的。如果作业在下一次批处理迭代之前被终止,Spark 将尝试重新读取已经处理过的数据。

关于apache-kafka - 使用spark-streaming时如何自己保存多个分区的Kafka偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48004360/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com