gpt4 book ai didi

java - Spring 集成卡夫卡消费者

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:54:03 24 4
gpt4 key购买 nike

我是 kafka 的新手,如果我遗漏了什么,我深表歉意。

我正在尝试使用来自现有主题的消息。

我从this得到了Spring Integration Basic Kafka Example的代码链接。

我的代码目前看起来像这样:

@SpringBootApplication
public class Application {

@Value("${kafka.topic}")
private String topic;

@Value("${kafka.messageKey}")
private String messageKey;

@Value("${kafka.broker.address}")
private String brokerAddress;

@Value("${kafka.zookeeper.connect}")
private String zookeeperConnect;

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context
= new SpringApplicationBuilder(Application.class)
.web(false)
.run(args);
/*MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class);
for (int i = 0; i < 1; i++) {
toKafka.send(new GenericMessage<String>("foo" + i));
}*/
PollableChannel fromKafka = context.getBean("received", PollableChannel.class);
Message<?> received = fromKafka.receive();
while (received != null) {
System.out.println(received);
received = fromKafka.receive();
}
context.close();
System.exit(0);
}

/*@ServiceActivator(inputChannel = "toKafka")
@Bean
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression(this.topic));
handler.setMessageKeyExpression(new LiteralExpression(this.messageKey));
return handler;
}*/

/*@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}*/

/*@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
//props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}*/

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
return new KafkaMessageListenerContainer<>(consumerFactory(), new TopicPartition(this.topic, 0));
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "siTestGroup");
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}

@Bean
public PollableChannel received() {
return new QueueChannel();
}

/*@Bean
public TopicCreator topicCreator() {
return new TopicCreator(this.topic, this.zookeeperConnect);
}*/

/*public static class TopicCreator implements SmartLifecycle {

private final String topic;

private final String zkConnect;

private volatile boolean running;

public TopicCreator(String topic, String zkConnect) {
this.topic = topic;
this.zkConnect = zkConnect;
}

@Override
public void start() {
ZkUtils zkUtils = new ZkUtils(new ZkClient(this.zkConnect, 6000, 6000,
ZKStringSerializer$.MODULE$), null, false);
try {
if (!AdminUtils.topicExists(zkUtils, topic))
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
}
catch (TopicExistsException e) {
// no-op
}
this.running = true;
}

@Override
public void stop() {
}

@Override
public boolean isRunning() {
return this.running;
}

@Override
public int getPhase() {
return Integer.MIN_VALUE;
}

@Override
public boolean isAutoStartup() {
return true;
}

@Override
public void stop(Runnable callback) {
callback.run();
}

}*/

}

我收到以下错误:

16:22:59.725 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:00.229 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:00.732 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:01.235 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:01.739 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:02.243 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)

我没有从日志中获取信息。

感谢您的帮助:)

编辑:

添加pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>1.4.0.BUILD-SNAPSHOT</version>
</parent>
<groupId>org.springframework.integration.samples</groupId>
<artifactId>kafka</artifactId>
<version>4.3.0.BUILD-SNAPSHOT</version>
<name>Apache Kafka Sample</name>
<description>Apache Kafka Sample</description>
<url>http://projects.spring.io/spring-integration</url>
<organization>
<name>SpringIO</name>
<url>https://spring.io</url>
</organization>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<developers>
<developer>
<id>garyrussell</id>
<name>Gary Russell</name>
<email>grussell@pivotal.io</email>
<roles>
<role>project lead</role>
</roles>
</developer>
<developer>
<id>markfisher</id>
<name>Mark Fisher</name>
<email>mfisher@pivotal.io</email>
<roles>
<role>project founder and lead emeritus</role>
</roles>
</developer>
<developer>
<id>ghillert</id>
<name>Gunnar Hillert</name>
<email>ghillert@pivotal.io</email>
</developer>
<developer>
<id>abilan</id>
<name>Artem Bilan</name>
<email>abilan@pivotal.io</email>
</developer>
</developers>
<scm>
<connection>scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git</connection>
<developerConnection>scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git</developerConnection>
<url>https://github.com/spring-projects/spring-integration-samples</url>
</scm>
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.3.0.M1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.2.5.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.0.M1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>1.0.0.M2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>repo.spring.io.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

最佳答案

该版本的示例应用程序需要 0.9 代理 - 请参阅 this question及其指向 Kafka docs 的链接.

该示例应用的早期版本适用于 0.8 代理。

您需要 this commit 之前的版本我认为我们在 github 中没有标签,但是 this is the previous version适用于 0.8。

编辑

使用0.8客户端版本,需要改一下这段代码...

BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
BrokerAddress.fromAddress(this.brokerAddress));

...到...

BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
BrokerAddress.fromAddress(this.firstBrokerAddress),
BrokerAddress.fromAddress(this.secondBrokerAddress));

即提供一组 BrokerAddress 对象。

对于 0.9 客户端,可以使用一个简单的以逗号分隔的 host:port 对列表

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);

关于java - Spring 集成卡夫卡消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36851538/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com