gpt4 book ai didi

apache-kafka - Kafkastreams.allMetadata() 方法返回空列表

转载 作者:行者123 更新时间:2023-12-04 14:55:05 26 4
gpt4 key购买 nike

所以我试图让交互式查询与 Kafka 流一起工作。我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上)。我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka。

我已经设置了这样的主题

kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-output-topic

阅读我围绕这个问题所做的工作

我已阅读此文档页面: http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

我还在这里阅读了 Java 示例: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java

并阅读这篇类似的帖子,最初听起来与我的问题相同: Cannot access KTable from a different app as StateStore

这就是我的设置。那么问题是什么?

因此,正如我所说,我正在尝试创建自己的应用程序,它允许使用自定义 Akka Http REST Api(推荐的 RPC 调用)进行交互式查询,以允许我查询我的 KTable .实际的流处理似乎按预期进行,我可以打印 KTable 的结果并且它们与针对该主题产生的内容相匹配。

所以事情的存储方面似乎正在发挥作用

尝试使用 Streams.allMetadata() 时似乎会出现此问题。方法,它返回一个空列表。

我在用
  • 列表项
  • 斯卡拉 2.12
  • SBT
  • Akka.Http 10.9 用于 REST Api
  • 卡夫卡 11.0

  • 生产者代码

    这是我的制作人的代码
    package Processing.Ratings {

    import java.util.concurrent.TimeUnit

    import Entities.Ranking
    import Serialization.JSONSerde
    import Topics.RatingsTopics

    import scala.util.Random
    import org.apache.kafka.clients.producer.ProducerRecord
    import org.apache.kafka.clients.producer.KafkaProducer
    import org.apache.kafka.common.serialization.Serdes
    import Utils.Settings
    import org.apache.kafka.clients.producer.ProducerConfig

    object RatingsProducerApp extends App {

    run()

    private def run(): Unit = {

    val jSONSerde = new JSONSerde[Ranking]
    val random = new Random
    val producerProps = Settings.createBasicProducerProperties
    val rankingList = List(
    Ranking("jarden@here.com","sacha@here.com", 1.5f),
    Ranking("miro@here.com","mary@here.com", 1.5f),
    Ranking("anne@here.com","margeret@here.com", 3.5f),
    Ranking("frank@here.com","bert@here.com", 2.5f),
    Ranking("morgan@here.com","ruth@here.com", 1.5f))

    producerProps.put(ProducerConfig.ACKS_CONFIG, "all")

    System.out.println("Connecting to Kafka cluster via bootstrap servers " +
    s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")

    // send a random string from List event every 100 milliseconds
    val rankingProducer = new KafkaProducer[String, Array[Byte]](
    producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)

    //while (true) {
    for (i <- 0 to 10) {
    val ranking = rankingList(random.nextInt(rankingList.size))
    val rankingBytes = jSONSerde.serializer().serialize("", ranking)
    System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
    rankingProducer.send(new ProducerRecord[String, Array[Byte]](
    RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
    Thread.sleep(100)
    }

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
    rankingProducer.close(10, TimeUnit.SECONDS)
    }))
    }
    }
    }

    流代码

    这是流代码
    def createRatingStreamsProperties() : Properties = {
    val props = createBasicStreamProperties
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application")
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client")
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    props
    }

    private def createBasicStreamProperties() : Properties = {
    val props = new Properties()
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers)
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    // Records should be flushed every 10 seconds. This is less than the default
    // in order to keep this example interactive.
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
    // For illustrative purposes we disable record caches
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object])
    props
    }

    和实际代码
    import java.util.Properties
    import java.util.concurrent.TimeUnit
    import org.apache.kafka.common.serialization._
    import org.apache.kafka.streams._
    import org.apache.kafka.streams.kstream._
    import Entities.Ranking
    import Serialization.JSONSerde
    import Topics.RatingsTopics
    import Utils.Settings

    package Processing.Ratings {

    import Stores.StateStores
    import org.apache.kafka.streams.state.HostInfo


    class DummyRankingReducer extends Reducer[Ranking] {
    override def apply(value1: Ranking, value2: Ranking): Ranking = {
    value2
    }
    }

    class RankingByEmailInitializer extends Initializer[List[Ranking]] {
    override def apply(): List[Ranking] = List[Ranking]()
    }

    class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
    override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
    value :: aggregate
    }
    }


    object RatingStreamProcessingApp extends App {

    run()

    private def run() : Unit = {
    val stringSerde = Serdes.String
    val rankingSerde = new JSONSerde[Ranking]
    val listRankingSerde = new JSONSerde[List[Ranking]]
    val builder: KStreamBuilder = new KStreamBuilder
    val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)

    val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
    .aggregate(
    new RankingByEmailInitializer(),
    new RankingByEmailAggregator(),
    listRankingSerde,
    StateStores.RANKINGS_BY_EMAIL_STORE
    )

    rankingTable.toStream.print()

    val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
    val restEndpoint:HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
    System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
    System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")

    // Always (and unconditionally) clean local state prior to starting the processing topology.
    // We opt for this unconditional call here because this will make it easier for you to play around with the example
    // when resetting the application for doing a re-run (via the Application Reset Tool,
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
    //
    // The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
    // will take time and will require reading all the state-relevant data from the Kafka cluster over the network.
    // Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it
    // is truly needed, i.e., only under certain conditions (e.g., the presence of a command line flag for your app).
    // See `ApplicationResetExample.java` for a production-like example.
    //streams.cleanUp();
    streams.start()
    val restService = new RatingRestService(streams, restEndpoint)
    restService.start()


    //****************************************************************
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    //****************************************************************


    val SIZE = streams.allMetadata.size()
    val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)

    val range = keyValueStore.all
    val HASNEXT = range.hasNext
    import org.apache.kafka.streams.KeyValue
    while (range.hasNext ) {
    val next = range.next
    System.out.println(String.format("key: %s | value: %s", next.key, next.value))
    }

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
    streams.close(10, TimeUnit.SECONDS)
    restService.stop
    }))

    //return unit
    ()
    }
    }

    }

    我在哪里有这个配置
    kafka {
    bootStrapServers = "localhost:9092"
    zooKeepers = "zookeeper:2181"
    schemaRegistryUrl = "http://localhost:8081"
    partition = 0,
    restApiDefaultHostName = "localhost",
    restApiDefaultPort = "8080"
    }

    REST 服务的东西

    示例文件的 Scala 端口: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/MetadataService.java
    package Processing.Ratings

    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.state.StreamsMetadata
    import java.util.stream.Collectors
    import Entities.HostStoreInfo
    import org.apache.kafka.common.serialization.Serializer
    import org.apache.kafka.connect.errors.NotFoundException
    import scala.collection.JavaConverters._


    /**
    * Looks up StreamsMetadata from KafkaStreams
    */
    class MetadataService(val streams: KafkaStreams) {


    /**
    * Get the metadata for all of the instances of this Kafka Streams application
    *
    * @return List of { @link HostStoreInfo}
    */
    def streamsMetadata() : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application
    val metadata = streams.allMetadata
    return mapInstancesToHostStoreInfo(metadata)
    }


    /**
    * Get the metadata for all instances of this Kafka Streams application that currently
    * has the provided store.
    *
    * @param store The store to locate
    * @return List of { @link HostStoreInfo}
    */
    def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    val metadata = streams.allMetadataForStore(store)
    return mapInstancesToHostStoreInfo(metadata)
    }


    /**
    * Find the metadata for the instance of this Kafka Streams Application that has the given
    * store and would have the given key if it exists.
    *
    * @param store Store to find
    * @param key The key to find
    * @return { @link HostStoreInfo}
    */
    def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
    // Get metadata for the instances of this Kafka Streams application hosting the store and
    // potentially the value for key
    val metadata = streams.metadataForKey(store, key, serializer)
    if (metadata == null)
    throw new NotFoundException(
    s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")

    return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
    }




    def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {

    metadatas.stream.map[HostStoreInfo](metadata =>
    HostStoreInfo(
    metadata.host(),
    metadata.port,
    metadata.stateStoreNames.asScala.toList))
    .collect(Collectors.toList())
    .asScala.toList
    }



    }

    这是 REST 服务(我目前只尝试让“实例”路由工作)。
    package Processing.Ratings

    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.state.HostInfo
    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.server.Directives._
    import akka.stream.ActorMaterializer
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import spray.json.DefaultJsonProtocol._
    import Entities.AkkaHttpEntitiesJsonFormats._
    import Entities._
    import akka.http.scaladsl.marshalling.ToResponseMarshallable

    import scala.concurrent.Future


    object RestService {
    val DEFAULT_REST_ENDPOINT_HOSTNAME = "localhost"
    }


    class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {

    val metadataService = new MetadataService(streams)
    var bindingFuture: Future[Http.ServerBinding] = null

    implicit val system = ActorSystem("rating-system")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher


    def start() : Unit = {
    val emailRegexPattern = """\w+""".r


    val route =
    path("ratingByEmail" / emailRegexPattern) { email =>
    get {

    //TODO : This would come from Kafka store, either local or remote

    complete(ToResponseMarshallable.apply(List[Ranking](
    Ranking("fred@here.com", "sacha@there.com", 4.0f),
    Ranking("sam@here.com", "sacha@there.com", 2.0f)))
    )
    }
    } ~
    path("instances") {
    get {
    val x = metadataService.streamsMetadata
    complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
    }
    }


    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
    bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate()) // and shutdown when done
    }))
    }


    def stop() : Unit = {
    bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate()) // and shutdown when done
    }

    def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
    hostStoreInfo.host.equals(hostInfo.host()) &&
    hostStoreInfo.port == hostInfo.port
    }
    }

    这是商店中有数据的证明

    生产者运行
    enter image description here

    正在运行的流
    enter image description here

    这是我第一次运行生产者,然后是流,然后再次运行生产者(另一次运行)。

    KTable 看看结果如何正在显示,然后我启动了生产者并通过流获取的主题推送了更多消息

    但是当我查询我的 REST 端点以尝试使用 localhost:8080/instances 获取元数据时, 我得到的只是一个空列表 []
    enter image description here

    我本来希望上面流代码中的这些行返回一些元数据,商店中显然有一些东西,那么为什么没有元数据
    val SIZE = streams.allMetadata.size()
    val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

    这两个都返回 0,同时使用此代码遍历商店中的项目
    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)

    val range = keyValueStore.all
    val HASNEXT = range.hasNext
    import org.apache.kafka.streams.KeyValue
    while (range.hasNext ) {
    val next = range.next
    System.out.println(String.format("key: %s | value: %s", next.key, next.value))
    }

    从商店产生数据

    enter image description here

    我知道 REST api 工作正常,因为硬编码的测试路线工作正常

    enter image description here

    我究竟做错了什么???

    最佳答案

    所以我想通了,原来是由于缺少这个配置值

    props.put(StreamsConfig.APPLICATION_SERVER_CONFIG,  "localhost:8080")

    一旦我添加了 Akka Htpp REST API http://localhost:8080/instance开始工作。但后来我开始收到这个奇怪的异常
    org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)

    所以在阅读了这里的这篇文章后: http://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

    我决定我需要做的是执行一些重试逻辑,我是这样做的:

    重试

    我从这里借来的: https://gist.github.com/Mortimerp9/5430595
    package Utils

    import scala.concurrent._
    import scala.concurrent.duration._


    object Retry {

    /**
    * exponential back off for retry
    */
    def exponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 milliseconds

    def noIgnore(t: Throwable): Boolean = false

    /**
    * retry a particular block that can fail
    *
    * @param maxRetry how many times to retry before to giveup
    * @param deadline how long to retry before giving up; default None
    * @param backoff a back-off function that returns a Duration after which to retry. default is an exponential backoff at 100 milliseconds steps
    * @param ignoreThrowable if you want to stop retrying on a particular exception
    * @param block a block of code to retry
    * @param ctx an execution context where to execute the block
    * @returns an eventual Future succeeded with the value computed or failed with one of:
    * `TooManyRetriesException` if there were too many retries without an exception being caught. Probably impossible if you pass decent parameters
    * `DeadlineExceededException` if the retry didn't succeed before the provided deadline
    * `TimeoutException` if you provide a deadline and the block takes too long to execute
    * `Throwable` the last encountered exception
    */
    def retry[T](maxRetry: Int,
    deadline: Option[Deadline] = None,
    backoff: (Int) => Duration = exponentialBackoff,
    ignoreThrowable: Throwable => Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = {

    class TooManyRetriesException extends Exception("too many retries without exception")
    class DeadlineExceededException extends Exception("deadline exceded")

    val p = Promise[T]

    def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f: () => T): Option[T] = {
    if (maxRetry == retryCnt
    || deadline.isDefined && deadline.get.isOverdue) {
    exception match {
    case Some(t) =>
    p failure t
    case None if deadline.isDefined && deadline.get.isOverdue =>
    p failure (new DeadlineExceededException)
    case None =>
    p failure (new TooManyRetriesException)
    }
    None
    } else {
    val success = try {
    val rez = if (deadline.isDefined) {
    Await.result(future(f()), deadline.get.timeLeft)
    } else {
    f()
    }
    Some(rez)
    } catch {
    case t: Throwable if !ignoreThrowable(t) =>
    blocking {
    val interval = backoff(retryCnt).toMillis
    Thread.sleep(interval)
    }
    recursiveRetry(retryCnt + 1, Some(t))(f)
    case t: Throwable =>
    p failure t
    None
    }
    success match {
    case Some(v) =>
    p success v
    Some(v)
    case None => None
    }
    }
    }

    def doBlock() = block

    Future {
    recursiveRetry(0, None)(doBlock)
    }

    p.future
    }

    }

    我这样称呼
    def printStoreMetaData(streams:KafkaStreams) : Unit = {

    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore

    val keyValueStoreTry = waitUntilStoreIsQueryable(
    StateStores.RANKINGS_BY_EMAIL_STORE,
    QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
    streams
    ) match {
    case Success(keyValueStore) => {
    val SIZE = streams.allMetadata.size()
    val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
    val range = keyValueStore.all
    val HASNEXT = range.hasNext
    import org.apache.kafka.streams.KeyValue
    while (range.hasNext ) {
    val next = range.next
    System.out.println(String.format("key: %s | value: %s", next.key, next.value))
    }
    }
    case Failure(f) => println(f)
    }

    }

    这样做之后,对我来说都是快乐的日子。

    关于apache-kafka - Kafkastreams.allMetadata() 方法返回空列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46054763/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com