- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试从我的 Kafka 0.8.1 集群中检索数据。我创建了一个 ZookeeperConsumerConnector
实例,然后尝试对其调用 createMessageStreams
。然而,无论我做什么,似乎 createMessageStreams
只是挂起并且永远不会返回,即使这是我对 Kafka 所做的唯一事情。
阅读邮件列表似乎有时会出于某些原因而发生这种情况,但据我所知我没有做过任何这些事情。
此外,我要指出,我实际上是在 Clojure 中使用 clj-kafka 执行此操作,但我怀疑 clj-kafka 不是问题所在,因为即使我运行这段代码也会遇到问题:
(.createMessageStreams
(clj-kafka.consumer.zk/consumer {"zookeeper.connect" "127.0.0.1:2181"
"group.id" "my.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
{"mytopic" (int 1)})
和clj-kafka.consumer.zk/consumer
只是使用 Consumer.createJavaConsumerConnector
来创建一个 ZookeeperConsumerConnector
而没有做任何太花哨的事情。
此外,“mytopic”中肯定有消息,因为从命令行我可以运行以下命令并取回我已经发送到主题的所有内容:
% kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic mytopic --from-beginning
所以也不是说话题是空的。
此时此刻,我感到难过。想法?
预计到达时间:“挂起”我想我真正的意思是它似乎启动了一个线程,然后一直卡在其中,什么也不做。如果我从 REPL 运行这段代码,我可以通过点击 control-c 退出它,然后我得到这个错误:
IllegalMonitorStateException java.util.concurrent.locks.ReentrantLock$Sync.tryRelease (ReentrantLock.java:155)
最佳答案
我在中断 REPL 时遇到了同样的问题和同样的异常。它挂起的原因是 consumer.zk 命名空间中的惰性迭代函数。从中读取消息的队列是一个 LinkedBlockingQueue,在惰性迭代函数中调用 .hasNext 会调用此队列上的 .take。这会在队列上创建一个读锁,并将阻塞并等待直到有可用的东西从队列中取出。这意味着惰性迭代函数永远不会真正返回。 lazy-iterate 由“消息”函数调用,如果你不做类似的事情
(take 2 (messages "mytopic" some-consumer))
那么消息函数将永远不会返回并无限期地挂起。我认为这是 clj-kafka 中的错误(或设计缺陷)。为了说明这确实是正在发生的事情,请尝试在您的消费者配置中设置“consumer.timeout.ms”“0”。它将抛出 TimeoutExpection 并将控制权返回给 REPL。
这进一步导致了“with-resource”宏的问题。宏接受一个消费者的绑定(bind)、一个关闭函数和一个主体;它调用 body,然后调用 shutdown fn。如果在主体内部调用“消息”,主体将永远不会返回,因此永远不会调用关闭函数。如果调用 shutdown,消息函数将返回,因为 shutdown 会在队列中放置一条消息,指示消费者清理其资源和线程以为 GC 做准备。该宏将应用程序置于一种状态,在该状态下,退出主循环的唯一方法是终止应用程序(或调用它的线程)本身。在为生产环境做好准备之前,该库当然还有很长的路要走。
关于clojure - Kafka 的 ZookeeperConsumerConnector.createMessageStreams 永远不会返回,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23255907/
本文整理了Java中kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams()方法的一些代码示例,展示了Zooke
我正在尝试从我的 Kafka 0.8.1 集群中检索数据。我创建了一个 ZookeeperConsumerConnector 实例,然后尝试对其调用 createMessageStreams。然而,无
我是一名优秀的程序员,十分优秀!