gpt4 book ai didi

apache-spark - Apache Spark 流 - kafka - 阅读旧消息

转载 作者:行者123 更新时间:2023-12-04 17:47:55 26 4
gpt4 key购买 nike

我正在尝试使用 Spark 流读取来自 Kafka 的旧消息。但是,我只能检索实时发送的消息(即,如果我填充新消息,而我的 spark 程序正在运行 - 然后我会收到这些消息)。

我正在更改我的 groupID 和 consumerID,以确保 zookeeper 不只是不提供它知道我的程序以前见过的消息。

假设 spark 将 zookeeper 中的偏移量视为 -1,它不应该读取队列中的所有旧消息吗?我只是误解了 kafka 队列的使用方式吗?我对 spark 和 kafka 很陌生,所以我不能排除我只是误解了一些东西。

package com.kibblesandbits

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

import net.liftweb.json._

object KafkaStreamingTest {

val cfg = new ConfigLoader().load
val zookeeperHost = cfg.zookeeper.host
val zookeeperPort = cfg.zookeeper.port
val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot

implicit val formats = DefaultFormats

def parser(json: String): String = {
return json
}

def main(args : Array[String]) {
val zkQuorum = "test-spark02:9092"

val group = "myGroup99"
val topic = Map("testtopic" -> 1)
val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
val ssc = new StreamingContext(sparkContext, Seconds(3))
val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
var gp = json_stream.map(_._2).map(parser)

gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
ssc.start()
}

运行此程序时,我将看到以下消息。所以我相信它不仅仅是因为设置了偏移量而没有看到消息。

14/12/05 13:34:08 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] Added fetcher for partitions ArrayBuffer([[testtopic,0], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,1], initOffset -1 to broker i d:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] )



然后,如果我填充 1000 条新消息——我可以看到这 1000 条消息保存在我的临时目录中。但我不知道如何阅读现有的消息,这些消息(此时)应该有数万条。

最佳答案

KafkaUtils 上使用替代工厂方法这让您可以向 Kafka 消费者提供配置:

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)]

然后使用您的 kafka 配置构建 map ,并将参数 'kafka.auto.offset.reset' 设置为 'smallest':
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000",
"kafka.auto.offset.reset" -> "smallest"
)

将该配置提供给上面的工厂方法。 "kafka.auto.offset.reset"-> "smallest"告诉消费者从主题中的最小偏移量开始。

关于apache-spark - Apache Spark 流 - kafka - 阅读旧消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27323725/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com