- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们正在将 Spring 与 Kafka 消费者和生产者一起使用。我们正在生成大小为 905 字节的消息。我们正在序列化消息并尝试将其反序列化以供下一个消费者使用。
消息有效负载类示例:
{
"FILE_LIST":[
{
"KEY1": "Large String Value",
"KEY2": "Large String Value",
"Key3": "Large String Value",
"Key4": "Large String Value"
}
]
}
有效负载类别
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@NoArgsConstructor
@AllArgsConstructor
public class InnerModel {
@JsonProperty("KEY1")
@Getter
@Setter
private String key1;
@JsonProperty("KEY2")
@Getter
@Setter
private String key2;
@JsonProperty("KEY3")
@Getter
@Setter
private String key3;
@JsonProperty("KEY4")
@Getter
@Setter
private String key4;
}
CustomModel.java
package com.consumer.model;
public class CustomModel {
public CustomModel(List<InnerModel> filesList) {
super();
this.filesList = filesList;
}
@JsonProperty("FILE_LIST")
@NonNull
@Getter
@Setter
List<InnerModel> filesList;
}
消费者代码
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@KafkaListener(topics = "customtopic", groupId = "customgroup")
public class CustomModelConsumer {
@KafkaHandler(isDefault = true)
private void consumeCustomModel(CustomModel model) {
System.out.println("Model Consumer");
System.out.println(model);
}
}
application.properties
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.compression-type=gzip
spring.kafka.consumer.properties.spring.json.trusted.packages=com.consumer.model
当我们接受字符串格式的消息有效负载时,消费者工作得很好,但是当我们将消费者中的有效负载反序列化为对象时,我们会遇到问题。同样会抛出以下错误
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition customtopic-0 at offset 70. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 73, 78, 80, 85, 84, 95, 70, 73, 76, 69, 95, 76, 73, 83, 84, 34, 58, 91, 123, 34, 79, 82, 73, 71, 73, 78, 65, 76, 95, 67, 76, 73, 80, 95, 67, 85, 82, 82, 69, 78, 84, 34, 58, 34, 115, 51, 58, 47, 47, 98, 45, 97, 111, 45, 112, 114, 111, 100, 117, 99, 116, 45, 109, 111, 99, 107, 47, 55, 52, 49, 48, 57, 102, 98, 98, 45, 54, 102, 52, 101, 45, 52, 48, 54, 50, 45, 97, 98, 48, 102, 45, 100, 49, 102, 53, 98, 98, 102, 55, 49, 97, 56, 49, 47, 99, 108, 105, 112, 115, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 49, 48, 48, 48, 95, 111, 114, 105, 103, 105, 110, 97, 108, 99, 108, 105, 112, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 46, 109, 112, 52, 34, 44, 34, 79, 82, 73, 71, 73, 78, 65, 76, 95, 67, 76, 73, 80, 95, 71, 76, 79, 66, 65, 76, 95, 84, 73, 77, 69, 83, 84, 65, 77, 80, 95, 67, 85, 82, 82, 69, 78, 84, 34, 58, 34, 115, 51, 58, 47, 47, 98, 45, 97, 111, 45, 112, 114, 111, 100, 117, 99, 116, 45, 109, 111, 99, 107, 47, 55, 52, 49, 48, 57, 102, 98, 98, 45, 54, 102, 52, 101, 45, 52, 48, 54, 50, 45, 97, 98, 48, 102, 45, 100, 49, 102, 53, 98, 98, 102, 55, 49, 97, 56, 49, 47, 99, 108, 105, 112, 115, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 49, 48, 48, 48, 95, 111, 114, 105, 103, 105, 110, 97, 108, 99, 108, 105, 112, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 95, 116, 115, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 95, 116, 115, 46, 106, 115, 111, 110, 34, 44, 34, 79, 82, 73, 71, 73, 78, 65, 76, 95, 67, 76, 73, 80, 95, 78, 69, 88, 84, 34, 58, 34, 115, 51, 58, 47, 47, 98, 45, 97, 111, 45, 112, 114, 111, 100, 117, 99, 116, 45, 109, 111, 99, 107, 47, 55, 52, 49, 48, 57, 102, 98, 98, 45, 54, 102, 52, 101, 45, 52, 48, 54, 50, 45, 97, 98, 48, 102, 45, 100, 49, 102, 53, 98, 98, 102, 55, 49, 97, 56, 49, 47, 99, 108, 105, 112, 115, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 49, 48, 48, 48, 95, 111, 114, 105, 103, 105, 110, 97, 108, 99, 108, 105, 112, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 46, 109, 112, 52, 34, 44, 34, 79, 82, 73, 71, 73, 78, 65, 76, 95, 67, 76, 73, 80, 95, 71, 76, 79, 66, 65, 76, 95, 84, 73, 77, 69, 83, 84, 65, 77, 80, 95, 78, 69, 88, 84, 34, 58, 34, 115, 51, 58, 47, 47, 98, 45, 97, 111, 45, 112, 114, 111, 100, 117, 99, 116, 45, 109, 111, 99, 107, 47, 55, 52, 49, 48, 57, 102, 98, 98, 45, 54, 102, 52, 101, 45, 52, 48, 54, 50, 45, 97, 98, 48, 102, 45, 100, 49, 102, 53, 98, 98, 102, 55, 49, 97, 56, 49, 47, 99, 108, 105, 112, 115, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 49, 48, 48, 48, 95, 111, 114, 105, 103, 105, 110, 97, 108, 99, 108, 105, 112, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 95, 116, 115, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 95, 116, 115, 46, 106, 115, 111, 110, 34, 125, 93, 125]] from topic [customtopic] Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of
com.consumer.model.CustomModel
(no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator) at [Source: (byte[])"{"FILE_LIST":[{"KEY1":"Large String Value","KEY2":"Large String Value","Key3":"Large String Value"[truncated 405 bytes]; line: 1, column: 2] at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1451) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1027) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1290) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.5.jar:2.9.5] at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:198) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) ~[kafka-clients-1.0.0.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
最佳答案
正如@Naveen Kumar所述,我缺少默认或全参数构造函数。更改代码如下所示,使其工作
package com.consumer.model;
@NoArgsConstructor
@AllArgsConstructor
public class CustomModel {
@JsonProperty("FILE_LIST")
@NonNull
@Getter
@Setter
List<InnerModel> filesList;
}
关于java - Kafka Consumer 无法为 JsonDeserializer 消费 905 字节的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49721019/
我在 Windows 机器上启动 Kafka-Server 时出现以下错误。我已经从以下链接下载了 Scala 2.11 - kafka_2.11-2.1.0.tgz:https://kafka.ap
关于Apache-Kafka messaging queue . 我已经从 Kafka 下载页面下载了 Apache Kafka。我已将其提取到 /opt/apache/installed/kafka
假设我有 Kafka 主题 cars。 我还有一个消费者组 cars-consumers 订阅了 cars 主题。 cars-consumers 消费者组当前位于偏移量 89。 当我现在删除 cars
我想知道什么最适合我:Kafka 流或 Kafka 消费者 api 或 Kafka 连接? 我想从主题中读取数据,然后进行一些处理并写入数据库。所以我编写了消费者,但我觉得我可以编写 Kafka 流应
我曾研究过一些 Kafka 流应用程序和 Kafka 消费者应用程序。最后,Kafka流不过是消费来自Kafka的实时事件的消费者。因此,我无法弄清楚何时使用 Kafka 流或为什么我们应该使用
Kafka Acknowledgement 和 Kafka 消费者 commitSync() 有什么区别 两者都用于手动偏移管理,并希望两者同步工作。 请协助 最佳答案 使用 spring-kafka
如何在 Kafka 代理上代理 Apache Kafka 生产者请求,并重定向到单独的 Kafka 集群? 在我的特定情况下,无法更新写入此集群的客户端。这意味着,执行以下操作是不可行的: 更新客户端
我需要在 Kafka 10 中命名我的消费者,就像我在 Kafka 8 中所做的一样,因为我有脚本可以嗅出并进一步使用这些信息。 显然,consumer.id 的默认命名已更改(并且现在还单独显示了
1.概述 我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点 这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现
我正在使用以下命令: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.topic --property
我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者已将数据写入/已经将数据写入各种主题。我想与服务
我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door, house.room ) 并使用 Kafka 流正则表达式主题模式 API 使用所有主题。 一切看起来都不错,我得到了
有没有办法以编程方式获取kafka集群的版本?例如,使用AdminClient应用程序接口(interface)。 我想在消费者/生产者应用程序中识别 kafka 集群的版本。 最佳答案 目前无法检索
每当我尝试重新启动 kafka 时,它都会出现以下错误。一旦我删除/tmp/kafka-logs 它就会得到解决,但它也会删除我的主题。 有办法解决吗? ERROR Error while
我是 Apache Kafka 的新用户,我仍在了解内部结构。 在我的用例中,我需要从 Kafka Producer 客户端动态增加主题的分区数。 我发现了其他类似的 questions关于增加分区大
正如 Kafka 文档所示,一种方法是通过 kafka.tools.MirrorMaker 来实现这一点。但是,我需要将一个主题(比如 测试 带 1 个分区)(其内容和元数据)从生产环境复制到没有连接
我已经在集群中配置了 3 个 kafka,我正在尝试与 spring-kafka 一起使用。 但是在我杀死 kafka 领导者之后,我无法将其他消息发送到队列中。 我将 spring.kafka.bo
我的 kafka sink 连接器从多个主题(配置了 10 个任务)读取,并处理来自所有主题的 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。 以下是触发器记录中键值对的示例: "
我有以下 kafka 流代码 public class KafkaStreamHandler implements Processor{ private ProcessorConte
当 kafka-streams 应用程序正在运行并且 Kafka 突然关闭时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当 Kafka 回来时,一切都应该(理论上)去恢复正常
我是一名优秀的程序员,十分优秀!