gpt4 book ai didi

apache-kafka - Spring Embedded Kafka + Mock Schema Registry : State Store ChangeLog Schema not registered

转载 作者:行者123 更新时间:2023-12-04 05:26:34 25 4
gpt4 key购买 nike

我正在使用 Spring Embedded Kafka Broker 为我们的 kafka 系统构建集成测试,带有 MockSchemaRegistryClient。我正在为使用 Streams API (KStreamBuilder) 构建的其中一个 Stream 拓扑构建测试。这个特定的拓扑有一个 KStream (stream1) 馈入 KTable (table1)。

当我将输入输入到 stream1 时遇到错误,该错误源自 table1 的 KTableProcessor:

Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
**Caused by: java.io.IOException: Cannot get schema from schema registry!**
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
at com.sofi.kafka.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:35)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:151)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:135)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:62)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188)
at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)

The KTableProcessor is attempting to deserialize an entry from the RocksDB state store, however the schema does not exist in the mock schema registry. The topic whose schema is being requested is: **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog**

As the exception states, the schema has not been registered. However, the topic **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog-key** does have a registered schema (registered when the entry's key is serialized for the query).

Since this is an internal topic, I don't expect to have to register this schema myself, however I'm failing because of the schema's absence in the registry. Is there a way to have changelog schemas registered prior to data ingestion? Is there a way to disable state store changelogging with the KStreamBuilder?

提前致谢!

最佳答案

解决了这个问题,我现在要害羞地叙述:当使用带有嵌入式 kafka 代理的 KTable(通过 Streams API)时,您需要使用一个 State Store 目录配置 KafkaStreams 对象,该目录对于每次运行的嵌入式 kafka 代理都是唯一的(在我的情况下,每次运行测试)。

您可以通过 StreamsConfig.STATE_DIR_CONFIG 控制 State Store 目录。配置。我通过将时间戳附加到默认状态存储目录来使其独一无二

properties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kraken-streams/" + LocalDateTime.now().toString());

问题是每次初始化嵌入式 kafka 代理时,同一位置都存在旧状态存储。当第一条记录进入 KTable 的主题时,状态存储能够返回先前的值。这导致尝试反序列化尚未(就模式注册表实例而言)序列化的状态存储记录。模式仅在序列化时注册,因此由于缺少已注册的模式,反序列化尝试失败。

关于apache-kafka - Spring Embedded Kafka + Mock Schema Registry : State Store ChangeLog Schema not registered,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50917589/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com