- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 php-rdkafka作为 php kafka 客户端。我使用 test
成功生成了我的测试消息group.and 使用以下代码使用消息,
$kafkaConsumer = new RdKafka\Consumer();
$kafkaConsumer->addBrokers("127.0.0.1:9292");
$topic = $kafkaConsumer->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$msg = $topic->consume(0, 1000);
if($msg){
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
}
test
中设置消息时组并尝试为
test
消费消息组然后我收到旧消息和新消息。所以我只想知道如何确认旧消息,这样我只能收到新消息而不是旧消息?有人可以对此大放异彩吗?
0.11.0.1
最佳答案
在 Kafka 中确认消费消息的方法是提交其偏移量。这样,在重新启动消费者时,它可以检索上次提交的偏移量并从停止的地方重新启动。
正如评论中所建议的,您需要使用 RD_KAFKA_OFFSET_STORED
指示消费者检索存储的偏移量。
但是您还需要通过设置 group.id
来提供组名。配置:
<?php
$conf = new RdKafka\Conf();
// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1:9292");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("test", $topicConf);
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
关于php - 如何使用 php-rdkafka 在 kafka 中确认消费消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46730967/
我正在开发 C++ kafka 客户端 librdkafka。查看示例 https://github.com/edenhill/librdkafka/blob/master/src-cpp/rdkaf
我正在使用来自 C++ rdkafka .问题是关于 RdKafka::KafkaConsumer。如何设置消费者从主题的开头开始? 附言 链接中的消费者示例是基于 RdKafka::Consumer
假设我无权访问在感兴趣的分区上提交的一组生产者,但只能控制一堆 C++ 消费者。由于我在一个复杂的程序上运行基准测试,我想知道我的消费者获取的偏移量与存储在分区中的总偏移量之间的差值。 例如,>> 在
我试图在 linux 服务器中托管的 drupal 9 站点上安装 rdkafka,我尝试了这个命令 sudo apt install php-pear sudo apt-get install -y
我正在使用基于 RD Kafka C 的客户端来消费/生成发送给 Kafka Broker 的消息。它在很长一段时间内工作正常,但突然我看到以下错误: %3|1540186099.148|ERROR
有没有办法使用 javascript 解码主题 __consumer_offsets 中的数据? 现在我有这样的东西: consumer.on('data: ', function(data) {
在 pkg-config 搜索路径中找不到包 rdkafka。 Confluent go package 抛出这样的错误 # pkg-config --cflags -- rdkafka Packa
我正在使用 php-rdkafka作为 php kafka 客户端。我使用 test 成功生成了我的测试消息group.and 使用以下代码使用消息, $kafkaConsumer = new RdK
我正在为我的一个应用程序使用node-rdkafka。我想使用 groupid 来使用来自多个主题的消息。(groupid 和主题对)但我无法在 node-rdkafka 文档中找到任何详细信息。所以
我正在尝试设置生产者以使用 node-rdkafka 将消息发送到 IBM Cloud 中的 Event Stream Service,但是我无法从服务器接收回“就绪”事件。 我花了一天时间试图解决这
我正在为我的一个应用程序使用node-rdkafka库。因此,我有一个要求,即在生成消息时将 tenantId 与 header 一起传递。我检查了文档,但没有得到我需要的东西,或者可能是我遗漏了一些
我使用 PECL 安装了 PHP 7.4 和 RdKafka,但是模块没有启动,这是 php -v 的输出 PHP Warning: PHP Startup: rdkafka: Unable to i
我要安装rdkafka Upon running the command as per the instructions: sudo pecl install rdkafka 我已经使用以下命令安装了
问题:RdKafka 安装程序未在 Visual Studio 15 中查找/识别 librdkafka 上下文: 为了在 visual studio 15 中开始使用 RdKafka,我运行了通用的
这是我的回溯: 我正在尝试运行一个使用 apache-kafka 作为简单消息传递的项目队列,具有单个生产者-经纪人-消费者。但是,我有 kafka_2.12-0.11.0.0 版本。 Error:
我在一些本地 docker 容器中运行了 kafka 和 zookeeper。 我有一个 node.js 代码库,它使用 node-rdkafka 作为消费者连接到 kafka。我们将此代码库称为“消
我想将 Kafka 与我的 Node.JS 服务一起用作消息代理,并在 kafka-node 之间进行辩论。和 node-rdkafka图书馆。 我开始测试 kafka-node 的功能(该库似乎更受
这是我在 StackOverflow 上的第一个问题。通常我会从别人的问题中找到解决方案,但这次互联网似乎没有太多答案。 因此,在使用 go get 之后以及每次尝试编译和运行我的应用程序时,我都会收
我对 Web 前端开发非常陌生,在加载 kafka 客户端库时我对 JS/Node/Angular 世界有点迷失。我考虑了两种访问我的 kafka 集群的选项:node-rdkafka 和 kafka
我正在尝试让 php 在 docker 容器中连接到 kafka。 kafka php 库 - https://github.com/arnaud-lb/php-rdkafka/ kafka dock
我是一名优秀的程序员,十分优秀!