- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试使用 Kafka-Client 库(0.9.0.1)测试生产者、消费者。代理(0.9.0.1)正在服务器上运行,我已经测试了KafkaProducer,没有问题。但是当我测试 KafkaConsumer 进行轮询时,代理会发出错误消息。
[2016-03-18 13:44:19,129] ERROR Closing socket for /172.26.132.149 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 10 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:57) at kafka.network.RequestChannel$Request.(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245)
消费者测试代码如下。
class ConsumerRunner implements Runnable{
private KafkaConsumer<String,String> consumer;
private String topic;
public ConsumerRunner(String topic,Properties props){
consumer = new KafkaConsumer<String,String>(props);
this.topic = topic;
consumer.subscribe(Arrays.asList(this.topic));
}
public void run() {
while(true){
ConsumerRecords<String,String> records = consumer.poll(10000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
}
我猜测轮询请求包含错误的请求类型键,但是当我检查Kafka核心源时,我意识到请求类型键“10”被定义为“GroupCoordinatorKey”。我在“kafka.network.RequestChannel.scala”中发现可疑代码
val requestObj =
if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
RequestKeys.deserializerForKey(requestId)(buffer)
else
null
测试消费者也显示错误消息
java.io.EOFException: null at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:180) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) at com.medialog.mdt.kafka.KafkaTest$ConsumerThread.run(KafkaTest.java:61)
有人有想法吗?是我的问题吗?或者其他?请帮我。谢谢。
最佳答案
明科
不确定您是否只是想为 0.9 Kafka 代码创建一个消费者,或者您的 kafka 消息是否有导致此问题的特定内容,您能否分享更多详细信息。
但是如果您只是想为 0.9 编写一个 Kafka 消费者,那么在 Kafka 0.9 中会有新的消费者 API。如果您愿意使用新的消费者 API,请查看此示例 https://github.com/sdpatil/KafkaAPIClient/blob/master/src/main/java/com/spnotes/kafka/simple/Consumer.java示例。
苏尼尔
关于java - KafkaConsumer 在 KafkaServer 上出错(版本 0.9.0.1),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36077492/
我正在尝试从 Java 启动 Kafka 服务器 具体来说,我该如何翻译this line Scala 的代码转换为 Java 代码? private val server = new Kafka
跟进create `KafkaServer` from Java 我正在从 Java 创建一个 KafkaServer(确实是 Clojure,但是给定一个有效的 Java 示例,它很容易翻译)。 除
我有一个独立的 kafka 代理,我正在尝试为其配置 SASL。配置如下。我正在尝试在代理上设置 SASL_PLAIN 身份验证。 我的理解是,通过 server.properties 中的 list
我对连接到本地 Kafka 实例的应用程序进行了一些集成测试。当测试以类似于此问题的已接受答案的方式运行时,我正在使用 Java KafkaServer API 按需创建本地实例: How can I
我正在尝试使用 Kafka-Client 库(0.9.0.1)测试生产者、消费者。代理(0.9.0.1)正在服务器上运行,我已经测试了KafkaProducer,没有问题。但是当我测试 KafkaCo
我将从头开始。我有 openSuse 13.2,我还有 jdk_1.7.0_51、scala-2.11.4 和 gradle-2.2.1。我已经下载了 kafka-0.8.2-bet-src 的源代码
我们最近从 kafka 0.8.1.1 升级到 0.8.2.0。我们的集成测试失败,因为测试无限期地卡在 kafkaServer.shutdown() 上 这些是我的经纪人设置 Properties
我是一名优秀的程序员,十分优秀!