- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
尝试用以下代码构造一个kafka消费者
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
// set up consumer
final Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-tutorial");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
// transactional API
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// consumer --from-beginning
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put("zookeeper.connect", CLUSTER.zookeeperConnect());
consumerProps.put("schema.registry.url", CLUSTER.schemaRegistryUrl());
final KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<GenericRecord, GenericRecord>(consumerProps);
consumer.subscribe(Collections.singletonList(inputTopic));
但因错误而失败
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:765)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:633)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at com.telefonica.app.test_consumer.KafkaETLConsumerTest.testRunConsumer(KafkaETLConsumerTest.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at junit.framework.TestCase.runTest(TestCase.java:176)
at junit.framework.TestCase.runBare(TestCase.java:141)
at junit.framework.TestResult$1.protect(TestResult.java:122)
at junit.framework.TestResult.runProtected(TestResult.java:142)
at junit.framework.TestResult.run(TestResult.java:125)
at junit.framework.TestCase.run(TestCase.java:129)
at junit.framework.TestSuite.runTest(TestSuite.java:252)
at junit.framework.TestSuite.run(TestSuite.java:247)
at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:86)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:539)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:761)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:461)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:207)
Caused by: org.apache.kafka.common.KafkaException: io.confluent.kafka.serializers.KafkaAvroSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:673)
... 22 more
融合版本 3.0.0
最佳答案
问题出在这里:
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
请注意,您将 Serializer 类传递给 Deserializer 配置。这正是异常所说的:
io.confluent.kafka.serializers.KafkaAvroSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
序列化器不是反序列化器。
试试:
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
关于apache-kafka - org.apache.kafka.common.KafkaException : io. confluent.kafka.serializers.KafkaAvroSerializer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47720884/
所以,我试图在我的 Flink Kafka 流作业中启用 EXACTLY_ONCE 语义以及检查点。 但是我没有让它工作,所以我尝试从 Github 下载测试示例代码: https://github.
我手动启动 Zookeeper,然后是 Kafka 服务器,最后是带有各自属性文件的 Kafka-Rest 服务器。接下来,我将在 tomcat 上部署我的 Spring Boot 应用程序 在 To
简而言之,我启动了 Kafka,成功创建了一个主题,启动了一个启用了 key 的生产者。到目前为止,一切都很好。我发送了一条简单的消息,我得到了 root@kafka:/# kafka-console
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面已经阅读了有关它的好东西,因此开发人员可以主要专注于事物的业务逻辑方面。 这里
环境 3 节点 Kafka 集群 亚马逊 MSK v2.3 1 主题 6个分区 1 个消费者组,2 个消费者 在 Kubernetes 中运行 汇合 .NET SDK 1.2.2 除了 bootstr
尝试用以下代码构造一个kafka消费者 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka
我已经设置了 aws MSK,我正在尝试将记录从 MSK 接收到 Elasticsearch 。 我能够将数据插入 MSK 为 json 格式。 我想沉迷于 Elasticsearch 。 我能够正确
长话短说: 我非常简单的 Spark Streaming 应用程序在驱动程序中失败,并显示“KafkaException:字符串超出最大大小”。我在执行程序中看到了相同的异常,但我还在执行程序日志的某
我正在尝试使用 SSL 连接到 Kafka 3.0,但在加载 SSL keystore 时遇到问题 我尝试了许多可能的值,但没有帮助 我尝试更改位置,更改位置的值,但仍然没有帮助 package uk
我正在开发 Spring Boot + Apache Kafka例子。发送时 TOPIC卡夫卡出现错误。 错误: java.lang.IllegalStateException: Failed to
我是一名优秀的程序员,十分优秀!