gpt4 book ai didi

java - 为什么 Kafka 消费者忽略了我在 auto.offset.reset 参数中的 "earliest"指令,因此没有从绝对第一个事件中读取我的主题?

转载 作者:搜寻专家 更新时间:2023-11-01 01:20:12 27 4
gpt4 key购买 nike

我有一个 Kafka 主题,我想从最早的事件开始阅读。

我想要做的是从一个主题(从时间上绝对最早的事件开始)获取所有数据,直到某个日期的事件。

每个事件的结构都有一个名为 dateCliente 的字段,我将其用作过滤事件的阈值。至此,我已经成功完成了读写。我正在写入一个临时 Parquet 文件,我将其用作 Hive 表的分区。这工作正常,但是,即使我在 auto.offset.reset 参数中指定最早,它也不是从一开始就读取数据。

每当我运行我的代码时,我都会得到从这个日期开始的所有事件。每次我再次执行代码时,它都会继续读取我在上一次代码执行中读取的最后一个事件之后的 Kafka 事件。

我用来配置 Kafka 消费者和订阅主题的代码如下:

  // Configurations for kafka consumer
val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
//

// Crea la sesion de Spark
val spark = SparkSession
.builder()
.master("yarn")
.appName("XY")
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._


val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "XY")

val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("geoevents") )

但是,每当我从命令行创建一个消费者以从主题中读取数据时,我都会得到前几天的所有事件。我运行的命令行命令是:

kafka-console-consumer --new-consumer --topic geoevents --from-beginning --bootstrap-server xx.yy.zz.xx 

关于为什么我的代码表现如此并忽略 auto.offset.reset 中的"earliest" 有什么想法吗?

最佳答案

这是因为 auto.offset.reset 仅在组没有提交的偏移量时应用。

查看消费者配置 documentation :

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server

如果你想从头开始,你可以:

关于java - 为什么 Kafka 消费者忽略了我在 auto.offset.reset 参数中的 "earliest"指令,因此没有从绝对第一个事件中读取我的主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49945450/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com