- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 spring-kafka-test 2.6.3 EmbeddedKafka
和 Junit 5 为使用 avro 消息的拓扑编写 spring boot 集成测试。在测试中我使用的是 MockSchemaReigstryClient
我正在注册模拟模式客户端并按照 PR 中的建议配置主题那现在已经关闭了。但是我收到并收到以下错误:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 1
Caused by: java.io.IOException: Cannot get schema from schema registry!
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:176) ~[kafka-schema-registry-client-6.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndId(MockSchemaRegistryClient.java:232) ~[kafka-schema-registry-client-6.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaById(MockSchemaRegistryClient.java:215) ~[kafka-schema-registry-client-6.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:279) ~[kafka-avro-serializer-6.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:98) ~[kafka-avro-serializer-6.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:77) ~[kafka-avro-serializer-6.0.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-6.0.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66) ~[kafka-streams-avro-serde-6.0.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38) ~[kafka-streams-avro-serde-6.0.1.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:865) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:938) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:640) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) ~[kafka-streams-2.6.0.jar:na]
生产代码运行良好。所以,似乎我在测试设置中遗漏了一些东西。任何指针将不胜感激。这是 gist代码。
更新:我正在使用 kafka-schema-registry-client-6.0.1 maven 依赖项。
最佳答案
您的 StreamsConfig 仍然使用默认的 SpecificAvroSerde
,它(是)使用 MockSchemRegistryClient
因为您配置了 schemaRegistryUrl : mock://
但它没有 serde 所需的模式。此 MockSchemRegistryClient
与您在测试中为生产者和消费者配置的实例不同。
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.class.getName());
考虑以下流程:(1) 测试生产者 --> (2) IN_TOPIC -> (3) StreamApp -> (4) OUT_TOPIC -> (5) 测试消费者。
您已经在 (1) 处为生产者配置了 MockSchemaRegistryClient
,在 (5) 处为消费者配置了 MockSchemaRegistryClient
,甚至还为 (3) 处的一个(主应用程序)注册了所需的模式。因此错误:
Caused by: java.io.IOException: Cannot get schema from schema registry!
解决此问题的一种方法是使用指向具有测试模式的注册表的 MockSpecificAvroSerde
:
application.yaml
:
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
specific.avro.reader: true
default:
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
但是对于application-test.yaml
:
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
schema.registry.url: mock://localtest
specific.avro.reader: true
default:
value.serde: your.custom.serde.that.uses.a.mock.schema.regiter.client.bydefault.MockSpecificAvroSerde
关于apache-kafka - MockSchemaRegistryClient 未注册 avro 模式 : Cannot get schema from schema registry,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65652102/
我在名为 commonSourceMetadata.avsc 的 json 文件中定义了一个名为 "some.package.SourceMetadata" 的 Avro 类型: { "type"
我很想了解在 Avro 中编码两种非常特定类型的数据的最佳实践:时间戳和 IP 地址。 我遇到了时间戳 ( https://issues.apache.org/jira/browse/AVRO-739
如何在 Avro Schema 生成中为数据类型设置最大大小/长度限制。例如:在模式中,我想指定一个字段,该字段采用最大 len 25 的字符串。 最佳答案 我相信您可以使用“固定”avro 类型并指
即是否可以使字段需要类似于 ProtoBuf: 消息搜索请求{ 需要 字符串查询 = 1; } 最佳答案 默认情况下,Avro 中的所有字段都是必需的。照原样 mentioned在官方文档中,如果你想
我有用户编写 AVRO 文件,我想使用 Flume 将所有这些文件移动到使用 Flume 的 HDFS 中。所以我以后可以使用 Hive 或 Pig 来查询/分析数据。 在客户端我安装了 flume
我正在为似乎具有多个对象数组的 JSON 有效负载创建 avro 模式。我不确定如何在模式中表示这一点。有问题的关键是 content: { "id": "channel-id", "name
似乎没有任何方法可以将数据附加到现有的 Avro 序列化文件中。我想让多个进程写入一个 avro 文件,但看起来每次打开它时,我都会从头开始。我不想读入所有数据,然后再将其写回。 使用 ruby
我试图定义一个不太平凡的 Avro 模式,但收效甚微;当它不会抛出架构语法错误时,它不会生成我试图在架构中定义的所有类型。 是否有 avsc 定义的可能内容的完整规范?我一直根据我从 Doc 规范中理
我正在尝试使用 avro-tools-1.7.4.jar create schema 命令创建两个 Avro 模式。 我有两个 JSON 模式,如下所示: { "name": "TestAvro",
首先,我创建了一个如下所示的 avro hive 表。 CREATE EXTERNAL TABLE user STORED AS AVRO LOCATION '/work/user' TBLPROPE
我正在读一本书 Hadoop application architectures,这本书很老但很有趣,在阅读时,我注意到 Avro 被认为是数据序列化框架,而 Parquet 被认为是列数据格式。 我
我一直在四处寻找,看到了 jira https://issues.apache.org/jira/browse/AVRO-739对于这个问题,但我对用户文档中的日期时间的 avro 支持没有更好的了解
我尝试在安装了 Spark 2.4.8 的 Cloud Dataproc 集群 1.4 上运行我的 Spark/Scala 代码 2.3.0。我在读取 avro 文件时遇到错误。这是我的代码: spa
我正在处理 JSON 格式的服务器日志,我想以 Parquet 格式将我的日志存储在 AWS S3 上(并且 Parquet 需要 Avro 模式)。首先,所有日志都有一组共同的字段,其次,所有日志都
这是来自教程点的解串器。 public class Deserialize { public static void main(String args[]) throws Exception{
我正在使用 avro-maven-plugin 1.8.1 从 schema 生成 java 代码,所有字段都是公共(public)的且已弃用,如下所示: public class data_el
一个简单的例子说明了我的问题。 本质上,我正在处理一个跨多个存储库拆分代码的大型项目。在 repo 1 中,在 .avdl 文件中定义了一个 Avro 模式“S1”,该文件被编译到其 Avro 生成的
通过套接字发送avro(avro c)编码数据我正在尝试将 avro 编码数据转换为字节数组(使用 memcpy)后通过套接字发送。我所做的如下所示 /客户端:client.c/ avro_datum
我的问题是这样的。我有一个 2GB 的压缩 avro 文件,HDFS 上存储了大约 1000 条 avro 记录。我知道我可以编写代码来“打开这个 avro 文件”并打印出每条 avro 记录。我的问
我看到以下错误 exception Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Do
我是一名优秀的程序员,十分优秀!