- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我使用的是 Kafka JDK 客户端版本 0.10.2.1
。我能够向 Kafka 生成简单的消息以进行“心跳”测试,但我无法使用 sdk 使用来自同一主题的消息。当我进入 Kafka CLI 时,我能够使用该消息,因此我已经确认该消息存在。这是我用来从我的 Kafka 服务器消费的函数,带有 props - 只有在我确实确认 product()
成功之后,我才将生成的消息传递给主题,我可以发布如果需要,稍后可以使用该功能:
private def consumeFromKafka(topic: String, expectedMessage: String): Boolean = {
val props: Properties = initProps("consumer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJava)
var readExpectedRecord = false
try {
val records = {
val firstPollRecs = consumer.poll(MAX_POLLTIME_MS)
// increase timeout and try again if nothing comes back the first time in case system is busy
if (firstPollRecs.count() == 0) firstPollRecs else {
logger.info("KafkaHeartBeat: First poll had 0 records- trying again - doubling timeout to "
+ (MAX_POLLTIME_MS * 2)/1000 + " sec.")
consumer.poll(MAX_POLLTIME_MS * 2)
}
}
records.forEach(rec => {
if (rec.value() == expectedMessage) readExpectedRecord = true
})
} catch {
case e: Throwable => //log error
} finally {
consumer.close()
}
readExpectedRecord
}
private def initProps(propsType: String): Properties = {
val prop = new Properties()
prop.put("bootstrap.servers", kafkaServer + ":" + kafkaPort)
propsType match {
case "producer" => {
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prop.put("acks", "1")
prop.put("producer.type", "sync")
prop.put("retries", "3")
prop.put("linger.ms", "5")
}
case "consumer" => {
prop.put("group.id", groupId)
prop.put("enable.auto.commit", "false")
prop.put("auto.commit.interval.ms", "1000")
prop.put("session.timeout.ms", "30000")
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// poll just once, should only be one record for the heartbeat
prop.put("max.poll.records", "1")
}
}
prop
}
现在,当我运行代码时,它在控制台中输出如下:
13:04:21 - Discovered coordinator serverName:9092 (id: 2147483647
rack: null) for group 0b8947e1-eb68-4af3-ac7b-be3f7c02e76e. 13:04:23
INFO o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned
partitions [] for group 0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:24
INFO o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group
0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:25 INFO
o.a.k.c.c.i.AbstractCoordinator - Successfully joined group
0b8947e1-eb68-4af3-ac7b-be3f7c02e76e with generation 1 13:04:26 INFO
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions
[HeartBeat_Topic.Service_5.2018-08-03.13_04_10.377-0] for group
0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:27 INFO
c.p.p.l.util.KafkaHeartBeatUtil - KafkaHeartBeat: First poll had 0
records- trying again - doubling timeout to 60 sec.
然后没有其他任何事情,没有抛出任何错误 - 所以没有记录被轮询。有谁知道是什么阻止了“消费”的发生?订阅者似乎成功了,因为我能够成功调用 listTopics 并列出分区,没有问题。
最佳答案
您的代码有一个错误。看来你的台词是:
if (firstPollRecs.count() == 0)
应该这样说
if (firstPollRecs.count() > 0)
否则,您将传入一个空的 firstPollRecs
,然后对其进行迭代,这显然不会返回任何内容。
关于java - 可以向Kafka生产但不能消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51678093/
我在一个网站上工作,该网站在生产中只有 aspx 文件和 bin 目录和文件。任何人都知道这个网站是如何部署的,我通常有我的网站,我也会提交代码。 我的问题 2. 如何在同一台服务器上创建测试网站?我
您好,我认为这应该是一个相当简单的问题,但我对管理 git 不太熟悉。 我使用的是非常流行的 http://nvie.com/posts/a-successful-git-branching-mode
目前我的网站(生产服务器)已经有很多代码了。现在我想开始在我的项目中使用 Git 并为我的团队设置一个暂存服务器。谁能给我任何建议? 这是我脑海中的画面: Production
我目前正在学习 Erlang SO 用户能否提供有关他们的任何 Erlang 应用程序部署的有趣示例? 我想深入了解 Erlang 在过去的电信中的常见用途,以及 Erlang 在开发/部署过程中带来
我关注了Ryan's screencast并部署到 VPS。所以我使用 Unicorn + nginx + github + Ubuntu 12.04 LTS + capistrano。我也使用 i1
我想在 Azure 中维护临时环境和生产环境。每个都应该有自己的 blob 存储和 sql 存储。实现这一目标的最佳方法是什么?设置临时和生产 SQL Server 以及两个 Blob 存储帐户? 最
我无法使用 Electron 打包程序在内置的 Electron 应用程序中打开chrome开发工具。 我已经尝试过mainWindow.webContents.openDevTools(),但这没有
我有一个 Azure 应用程序服务环境。 可以在同一个 ASE 中运行多个应用服务计划(开发、测试和生产)吗? 基本上,我知道他们会共享前端池,我认为这很好,因为那里没有运行应用程序代码,并且它“..
我是 Maven 新手,有 Rails 背景。在较高级别上,如果我正在运行测试、在本地运行应用程序以及在部署到生产环境时,我希望连接到不同的数据库。 这就是我的想法。当我运行 mvn test 时,它
我有一个 Azure 应用程序服务环境。 可以在同一个 ASE 中运行多个应用服务计划(开发、测试和生产)吗? 基本上,我知道他们会共享前端池,我认为这很好,因为那里没有运行应用程序代码,并且它“..
我正在使用 faSTLane\produce 脚本制作一个新应用程序,我收到以下错误消息: in `parse_response': {"data"=>nil, "messages"=>{"warn"
使开发人员能够构建包含私有(private)数据的系统的当前做法是什么?谁能指出这类事情的“最佳实践”指南? 我们这里有一个 Catch-22,因为开发人员需要编写与具有被认为是“私有(private
我有一个连接 Azure SQL Server 的 Azure 云服务。当我第一次设置这个时,我真的不太了解自己在做什么,只是想熟悉 Azure。所以现在我想利用我所拥有的东西并将其转变为可靠的部署结
我是 Cordova 的新手。抱歉,如果这些是业余问题。我想详细了解典型手机应用程序的设置和架构。 我有一个本地版本的 Meteor Cordova 正在运行,它通过 Modulus 连接到远程服务器
我一直在寻找一些在一些 POS(销售点)设备和服务器之间同步数据的选项。 SymmetricDS似乎是具有商业友好许可证的选项之一。作为一个 Codehaus 项目确实保证了一定程度的质量,所以我同意
在 PHP 开发中,可以通过服务器的“环境”变量确定应用程序是在生产环境还是开发环境中运行。 在 tomcat 服务器上是否有类似的变量可用,或者是否有更好的方法将应用程序用于生产和开发? 最佳答案
我正在做一个项目,我需要使用 TwitterAPI 检索 Twitter 消息,处理它们并将它们存储在数据库中。我正在使用 Producer/Consumer BlockingQueue,其中元素的作
这个问题类似于:iPhone development - what is the difference between a development and distribution provision
我正在尝试根据 URL 在 Drupal 中设置环境。例如,如果我访问 mysite.local,它将使用 localdb 并将站点名称更改为“Local Mysite”;如果我转到 mysite.c
我今天一直在阅读 Magento 中的数据库同步。 我目前正在努力解决的一件事是在开发期间和上传到生产期间需要同步什么。现在假设一批更改将包含对数据库和类似代码的更改,下面是我对模型工作流的理解(我目
我是一名优秀的程序员,十分优秀!