gpt4 book ai didi

scala - 从kafka消息中获取kafka记录时间戳

转载 作者:行者123 更新时间:2023-12-02 02:45:03 30 4
gpt4 key购买 nike

我想要生产者将消息插入 kafka 主题的时间戳。

在 kafka 消费者端,我想提取该时间戳。


class Producer {
def main(args: Array[String]): Unit = {
writeToKafka("quick-start")
}
def writeToKafka(topic: String): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9094")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String](topic, "key", "value")
producer.send(record)
producer.close()
}
}



class Consumer {
def main(args: Array[String]): Unit = {
consumeFromKafka("quick-start")
}
def consumeFromKafka(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9094")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
}

kafka提供了一种方法吗?否则我将不得不从生产者向主题发送一个额外的字段。

最佳答案

Kafka从v0.10开始提供了一种方式

从该版本开始,您的所有消息都在 data.timestamp 中提供时间戳信息,并且内部信息的类型由代理上的配置“message.timestamp.type”决定。该值应为 CreateTimeLogAppendTime

在此版本之前,您必须手动实现它,通常是通过修改数据结构。

关于scala - 从kafka消息中获取kafka记录时间戳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62933554/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com