- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如何在不自动提交的情况下消费 Kafka 消息,长时间(4-60 分钟)处理它,并在不经历重新平衡,分区重新分配或阻止其他组消费者消费其他消息的情况下提交它。
我正在使用 Python 3.8 Kafka 消费者来:
from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition
def consume_one_message_at_a_time(conf):
conf.models_dir = f'{conf.project_root}/{conf.models_dir}'
group_id = conf.group_id
group_conf = conf.consumer_groups[group_id]
kafka_brokers = conf.KAFKA_BROKERS
topic = group_conf.subscribe[0].name
print(f'KAFKA_BROKERS: {kafka_brokers}\n Topic {topic}\n group id: {group_id}')
consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_brokers,
group_id=group_id,
enable_auto_commit=False,
max_poll_records=1,
max_poll_interval_ms=1800000,
# session_timeout_ms=1800000,
# request_timeout_ms=1800002,
# connections_max_idle_ms=1800003
# heartbeat_interval_ms=1800000,
)
print(f'bootstrap_servers: {kafka_brokers} subscribing to {topic}')
consumer.subscribe([topic])
for message in consumer:
print(f"message is of type: {type(message)}")
if not group_conf.use_cmd:
do_something_time_consuming(message)
else:
if group_id == 'bots' and check_bot_id(message):
bot_action(conf, group_conf, message)
else:
print(f'no action for group_id: {group_id}')
print(f'key : {message.key}')
print(f'value: {message.value}')
meta = consumer.partitions_for_topic(message.topic)
partition = TopicPartition(message.topic, message.partition)
offsets = OffsetAndMetadata(message.offset + 1, meta)
options = {partition: offsets}
print(f'\noptions: {options}\n')
response = consumer.commit(offsets=options)
Traceback (most recent call last):
File "./consumer_one_at_a_time.py", line 148, in <module>
consume_one_message_at_a_time(_conf)
File "./consumer_one_at_a_time.py", line 141, in consume_one_message_at_a_time
response = consumer.commit(offsets=options)
File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit
self._coordinator.commit_offsets_sync(offsets)
File "/usr/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync
raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.
session_timeout_ms=1800000,
request_timeout_ms=1800002,
connections_max_idle_ms=1800003
# heartbeat_interval_ms=1800000,
最佳答案
Is there a way to send a heartbeat without polling?
max.poll.interval.ms
中再次轮询(在您的情况下为 30 分钟)因为长时间运行。要解决这个问题:
max.poll.interval.ms
.但它可能会导致重新平衡时间过长。因为 rebalance.timeout = max.poll.interval.ms
.重新平衡开始后,消费者组中的所有消费者都被撤销,Kafka 等待所有仍在向 poll() 发送心跳的消费者(通过轮询消费者在那时发送 joinGroupRequest),直到重新平衡超时到期,即 max.poll.interval.ms
.假设您设置了 max.poll.interval.ms
到 60 分钟,您的过程需要 50 分钟才能完成。如果在你漫长的过程的第 5 分钟因为我上面提到的任何原因开始重新平衡,那么 Kafka 将等待你的消费者轮询 45 分钟。在此期间,所有消费者都将被撤销。 (对于这个消费者群体,消费将完全停止)所以恕我直言,这不是一个好主意。 (当然这取决于您的需求)关于python-3.x - 如何在不自动提交的情况下长时间(4-60 分钟)处理 Kafka 消息,并在不遭受重新平衡的情况下提交它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61207633/
这个问题在这里已经有了答案: How does Scala's apply() method magic work? (3 个回答) 9年前关闭。 假设我在 scala 中有一个 MyList 类,其
这个问题在这里已经有了答案: What is a non-capturing group in regular expressions? (18 个回答) Reference - What does
这个问题是针对嵌入式系统的! 我有以下选项来初始化一个对象: Object* o = new Object(arg); 这会将对象放入堆中并返回指向它的指针。我不喜欢在嵌入式软件中使用动态分配。 Ob
我自己搜索过,没能成功的正则表达式。 我有一个 html 文件,其中包含 [] 之间的变量我想把每一个字都写进去。 [client_name][client_company] [cl
我是 Python 新手。我不明白为什么这段代码不起作用: reOptions = re.search( "[\s+@twitter\s+(?P\w+):(?P.*?)\s+]", d
在过去 7 个月左右的时间里,我几乎一直在使用 .NET C# 进行编程。在那之前,我的大部分编程都是用 C++(从学校里学的)。在工作中,我可能需要在接下来的几个月里做一大堆 C 语言。我对 C 的
我是 RE 的新手,我正在尝试获取歌词并分离出歌词标题、和声和主唱: 下面是一些歌词的例子: [Intro] D.A. got that dope! [Chorus: Travis Scott] Ic
这可能是不可能的,但我想检查是否可以用一种简单的方式表达这样的事情: // obviously doesn't work class Foo : IFoo where T: Bar {
我们的应用程序中有“user”和“study”实体,存储在它们各自的表中。一项研究代表一种研究和已收集的数据。它们是多对多的关系,所以我们需要一个链接表:studies_users。 我们为用户分配角
将测试条件添加到 Visual Studio 2010 数据库单元测试(对于 SQL Server 2008)时,这些条件称为例如rowCountCondition1、rowCountConditio
在模拟器上,我可以从设置中卸载 SD 卡。 然后我可以将它安装到我的操作系统上,然后正常卸载它。 我一直无法弄清楚如何在模拟器上重新安装它(无需重新启动)。 提示: adb 命令 remount 是无
假设在一个分支上执行了一系列提交,但该分支尚未与主干重新同步。是否可以从提交中生成全局补丁?是否可以从一系列提交中生成“分组”补丁?如果是,如何? 最佳答案 svn diff -rXXX:YYY UR
在某些情况下,我想在我的应用程序中锁定调整大小功能,为此我尝试对属性进行数据绑定(bind),并且不允许在某些情况下更改它,但没有成功。 有没有办法这样做? 这是我不成功的尝试: XAML: Vie
当我的计算机连接多个显示器时,我可以检测它们,并根据从获取的值设置位置来向它们绘制图形 get(0, 'MonitorPositions') 但是,当我在 MATLAB 运行时断开监视器时,此属性不会
我们有一个grails应用程序,该应用程序在grails数据库中存储了各种域对象。该应用程序连接到第二个数据库,运行一些原始sql,并在表中显示结果。它基本上是一个报告服务器。 我们通过在DataSo
无法比较来自不同容器的迭代器(参见这里的示例: https://stackoverflow.com/a/4664519/225186 )(或者从技术上讲,它不需要有意义。) 这就提出了另一个问题,来自
我有以下情况: 家长 Activity : ParentActivityClass { private Intent intent; @Override public void onCreate(Bu
我经常将元素与附加功能 Hook ,例如: $('.myfav').autocomplete(); $('.myfav').datepicker(); $('.myfav').click(somefu
因此,我将 tooltipster.js 库用于工具提示,并尝试更改工具提示在不同屏幕尺寸上的默认距离。 所以这是默认的 init 的样子: $(inputTooltipTrigger).tool
我在 ARM7 嵌入式环境中工作。我使用的编译器不支持完整的 C++ 功能。它不支持的一项功能是动态类型转换。 有没有办法实现dynamic_cast<>() ? 我使用 Google 寻找代码,但到
我是一名优秀的程序员,十分优秀!