- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我是 kafka 的新手,如果我遗漏了什么,我深表歉意。
我正在尝试使用来自现有主题的消息。
我从this得到了Spring Integration Basic Kafka Example的代码链接。
我的代码目前看起来像这样:
@SpringBootApplication
public class Application {
@Value("${kafka.topic}")
private String topic;
@Value("${kafka.messageKey}")
private String messageKey;
@Value("${kafka.broker.address}")
private String brokerAddress;
@Value("${kafka.zookeeper.connect}")
private String zookeeperConnect;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context
= new SpringApplicationBuilder(Application.class)
.web(false)
.run(args);
/*MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class);
for (int i = 0; i < 1; i++) {
toKafka.send(new GenericMessage<String>("foo" + i));
}*/
PollableChannel fromKafka = context.getBean("received", PollableChannel.class);
Message<?> received = fromKafka.receive();
while (received != null) {
System.out.println(received);
received = fromKafka.receive();
}
context.close();
System.exit(0);
}
/*@ServiceActivator(inputChannel = "toKafka")
@Bean
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression(this.topic));
handler.setMessageKeyExpression(new LiteralExpression(this.messageKey));
return handler;
}*/
/*@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}*/
/*@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
//props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}*/
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
return new KafkaMessageListenerContainer<>(consumerFactory(), new TopicPartition(this.topic, 0));
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "siTestGroup");
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public PollableChannel received() {
return new QueueChannel();
}
/*@Bean
public TopicCreator topicCreator() {
return new TopicCreator(this.topic, this.zookeeperConnect);
}*/
/*public static class TopicCreator implements SmartLifecycle {
private final String topic;
private final String zkConnect;
private volatile boolean running;
public TopicCreator(String topic, String zkConnect) {
this.topic = topic;
this.zkConnect = zkConnect;
}
@Override
public void start() {
ZkUtils zkUtils = new ZkUtils(new ZkClient(this.zkConnect, 6000, 6000,
ZKStringSerializer$.MODULE$), null, false);
try {
if (!AdminUtils.topicExists(zkUtils, topic))
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
}
catch (TopicExistsException e) {
// no-op
}
this.running = true;
}
@Override
public void stop() {
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return Integer.MIN_VALUE;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
callback.run();
}
}*/
}
我收到以下错误:
16:22:59.725 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:00.229 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:00.732 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:01.235 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:01.739 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
16:23:02.243 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
at java.lang.Thread.run(Unknown Source)
我没有从日志中获取信息。
感谢您的帮助:)
编辑:
添加pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>1.4.0.BUILD-SNAPSHOT</version>
</parent>
<groupId>org.springframework.integration.samples</groupId>
<artifactId>kafka</artifactId>
<version>4.3.0.BUILD-SNAPSHOT</version>
<name>Apache Kafka Sample</name>
<description>Apache Kafka Sample</description>
<url>http://projects.spring.io/spring-integration</url>
<organization>
<name>SpringIO</name>
<url>https://spring.io</url>
</organization>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<developers>
<developer>
<id>garyrussell</id>
<name>Gary Russell</name>
<email>grussell@pivotal.io</email>
<roles>
<role>project lead</role>
</roles>
</developer>
<developer>
<id>markfisher</id>
<name>Mark Fisher</name>
<email>mfisher@pivotal.io</email>
<roles>
<role>project founder and lead emeritus</role>
</roles>
</developer>
<developer>
<id>ghillert</id>
<name>Gunnar Hillert</name>
<email>ghillert@pivotal.io</email>
</developer>
<developer>
<id>abilan</id>
<name>Artem Bilan</name>
<email>abilan@pivotal.io</email>
</developer>
</developers>
<scm>
<connection>scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git</connection>
<developerConnection>scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git</developerConnection>
<url>https://github.com/spring-projects/spring-integration-samples</url>
</scm>
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.3.0.M1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.2.5.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.0.M1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>1.0.0.M2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>repo.spring.io.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
最佳答案
该版本的示例应用程序需要 0.9 代理 - 请参阅 this question及其指向 Kafka docs 的链接.
该示例应用的早期版本适用于 0.8 代理。
您需要 this commit 之前的版本我认为我们在 github 中没有标签,但是 this is the previous version适用于 0.8。
编辑
使用0.8客户端版本,需要改一下这段代码...
BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
BrokerAddress.fromAddress(this.brokerAddress));
...到...
BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
BrokerAddress.fromAddress(this.firstBrokerAddress),
BrokerAddress.fromAddress(this.secondBrokerAddress));
即提供一组 BrokerAddress
对象。
对于 0.9 客户端,可以使用一个简单的以逗号分隔的 host:port
对列表
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
关于java - Spring 集成卡夫卡消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36851538/
Windows 集成 (NTLM) 身份验证和 Windows 集成 (Kerberos) 之间有什么区别? 如何在IIS6中实现这些 w.r.t. MSDN 最佳答案 Kerberos 和 NTLM
Keycloak是一个用 Java 编写的开源身份验证和身份管理解决方案。它提供了一个nodejs适配器,使用它我能够成功地与express集成。这是有效的路由文件: 'use strict'
这是我关于 Bamboo 的第二个问题 ( My First One )。阅读建议信息后我的理解是,我需要一个构建工具,例如 nAnt 或 MSbuild 来编写一个获取源代码并构建它的脚本(我正在开
可用于将第三方应用程序与 jira 4.3 集成的身份验证方案有哪些?显然,从客户那里获取用户名和密码听起来很荒谬。另外,我知道 oauth 身份验证仅适用于版本 5。请告诉我。谢谢。 附注。我不是在
我有一个使用 DDS 的旧版 C++ 应用程序用于异步通信/消息传递。我需要将此应用程序集成到使用 JMS 进行消息传递的 JavaEE 环境中。除了构建独立的 JMS/DDS 桥接模块之外,我还有其
我正在尝试使用 Whatsapp 发送测试消息,但收到此错误消息: "error":{"code":27,"description":"Recipient not available on chann
我想将 photologue 与我的 Django 应用程序集成,并使用它在车辆库存中显示照片......有点像 Boost Motor Group Inc. 提供的内容。我已经集成了该应用程序,所以
我目前正在尝试弄清楚如何与 fujitsu scansnap 扫描仪集成,但没有从 fujitsu 找到有关 fujitsu scansnap 管理器如何调用您的应用程序并将文件发送到您的应用程序的详
在我的项目中,我使用了 9 个(九个)int-ip:udp-inbound-channel-adapter 和一个 jms:inbound-channel-adapter。 Jms 适配器从服务器接收
在我们当前的原型(prototype)中,大多数标准 HTML 控件都被小程序取代,最重要的是表单提交由小程序触发。 有没有一种方法可以像 一样在服务器端调用关联的操作 ? 本文Applet and
是否可以使用 twilio 号码从 whatsapp 发送/接收短信?有人用whatsapp试过twilio吗?我问过客服,如果可能的话,他说,不确定,但很多人都问过这个问题。 最佳答案 万一其他人来
我们办公室中几乎不存在版本控制,这显然导致了很多麻烦。我们想使用SVN和Notepad++进行设置...任何人都对如何实现此目标有任何想法?我已经开始研究并浏览了这个网站: http://www.sw
曾经有提供这种集成的 spring-modules 项目;但是,该项目现已弃用。现在有没有人继续支持这种集成?谢谢。 最佳答案 工作正在进行中。 http://blog.athico.com/sear
我的理解是,根据 http://wiki.dbpedia.org/Datasets,DBpedia 从 YAGO 获取类层次结构,而不是实体。 .但是,类似 http://dbpedia.org/cl
任何人都可以帮助我如何将 OpenCMS 与 Java Spring Web 应用程序集成。已经用谷歌搜索并浏览了很多网站但没有用。所以,请帮助我。 最佳答案 我认为将 SpringMVC 与 Ope
我正在尝试使用新的 migs getaway (MPGS) 我遵循了下一个 url 中的代码 https://ap-gateway.mastercard.com/api/documentation/i
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 4年前关闭。 Improve thi
我有一个 cmake 项目。我想轻松完成以下操作 搜索光标下任何变量、函数等的声明、定义和引用,这些可能在外部头文件中声明,其路径是在CMakeLists.txt中使用INCLUDE_DIRECTOR
有人能给我指点一下 Objective-C(或 c/c++)库的方向,或者教通过 FTP 上传或下载的教程(Objective-C)吗?最好能展示如何将文件下载到临时目录,然后稍后上传?我不介意针对
集成()给出了非常错误的答案: integrate(function (x) dnorm(x, -5, 0.07), -Inf, Inf, subdivisions = 10000L) # 2.127
我是一名优秀的程序员,十分优秀!