gpt4 book ai didi

python-3.x - 如何在不自动提交的情况下长时间(4-60 分钟)处理 Kafka 消息,并在不遭受重新平衡的情况下提交它

转载 作者:行者123 更新时间:2023-12-04 13:15:11 32 4
gpt4 key购买 nike

如何在不自动提交的情况下消费 Kafka 消息,长时间(4-60 分钟)处理它,并在不经历重新平衡,分区重新分配或阻止其他组消费者消费其他消息的情况下提交它。

我正在使用 Python 3.8 Kafka 消费者来:

  • 一次使用一条消息,无需自动提交。
  • 启动一个长时间运行的脚本(在 Python 中读取它的标准输出)
  • 有条件地提交消息。

  • 我的问题是 Kafka 分区经常被重新平衡到另一个消费者组成员。

    在翻阅文档后,我尝试使用这些配置属性:
  • session_timeout_ms
  • request_timeout_ms
  • max_poll_interval_ms
    from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition

    def consume_one_message_at_a_time(conf):

    conf.models_dir = f'{conf.project_root}/{conf.models_dir}'
    group_id = conf.group_id
    group_conf = conf.consumer_groups[group_id]

    kafka_brokers = conf.KAFKA_BROKERS
    topic = group_conf.subscribe[0].name

    print(f'KAFKA_BROKERS: {kafka_brokers}\n Topic {topic}\n group id: {group_id}')

    consumer = KafkaConsumer(
    topic,
    bootstrap_servers=kafka_brokers,
    group_id=group_id,
    enable_auto_commit=False,
    max_poll_records=1,
    max_poll_interval_ms=1800000,
    # session_timeout_ms=1800000,
    # request_timeout_ms=1800002,
    # connections_max_idle_ms=1800003
    # heartbeat_interval_ms=1800000,
    )

    print(f'bootstrap_servers: {kafka_brokers} subscribing to {topic}')
    consumer.subscribe([topic])

    for message in consumer:
    print(f"message is of type: {type(message)}")

    if not group_conf.use_cmd:
    do_something_time_consuming(message)
    else:
    if group_id == 'bots' and check_bot_id(message):

    bot_action(conf, group_conf, message)
    else:
    print(f'no action for group_id: {group_id}')
    print(f'key : {message.key}')
    print(f'value: {message.value}')

    meta = consumer.partitions_for_topic(message.topic)

    partition = TopicPartition(message.topic, message.partition)
    offsets = OffsetAndMetadata(message.offset + 1, meta)
    options = {partition: offsets}

    print(f'\noptions: {options}\n')

    response = consumer.commit(offsets=options)

  • 当其他组成员订阅或完成他们的工作并消费时,我收到此错误:
        Traceback (most recent call last):
    File "./consumer_one_at_a_time.py", line 148, in <module>
    consume_one_message_at_a_time(_conf)
    File "./consumer_one_at_a_time.py", line 141, in consume_one_message_at_a_time
    response = consumer.commit(offsets=options)
    File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit
    self._coordinator.commit_offsets_sync(offsets)
    File "/usr/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync
    raise future.exception # pylint: disable-msg=raising-bad-type
    kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
    rebalanced and assigned the partitions to another member.
    This means that the time between subsequent calls to poll()
    was longer than the configured max_poll_interval_ms, which
    typically implies that the poll loop is spending too much
    time message processing. You can address this either by
    increasing the rebalance timeout with max_poll_interval_ms,
    or by reducing the maximum size of batches returned in poll()
    with max_poll_records.

    添加这些配置后,我发现新的消费者被阻止了! IE。不要消费消息,直到一个被提交!
    session_timeout_ms=1800000,
    request_timeout_ms=1800002,
    connections_max_idle_ms=1800003
    # heartbeat_interval_ms=1800000,

    我读到后台线程应该发送心跳。有没有办法在没有轮询的情况下发送心跳?

    最佳答案

    Is there a way to send a heartbeat without polling?



    它已经像这样工作了。自 0.10.1.0 版起,心跳通过 Kafka 中的单独线程发送。 (您可以查看 this 了解更多信息)

    通常重新平衡发生在这些情况下:
  • 新消费者加入消费群
  • 添加新分区
  • 彻底关闭消费者
  • 消费者被卡夫卡认为已死
  • 使 session.timeout.ms 过期而不发送心跳
  • 过期 max.poll.timeout.ms 而不发送轮询请求

  • 看来你的情况是最后一个了。您轮询记录但不在 max.poll.interval.ms 中再次轮询(在您的情况下为 30 分钟)因为长时间运行。要解决这个问题:
  • 您可以增加max.poll.interval.ms .但它可能会导致重新平衡时间过长。因为 rebalance.timeout = max.poll.interval.ms .重新平衡开始后,消费者组中的所有消费者都被撤销,Kafka 等待所有仍在向 poll() 发送心跳的消费者(通过轮询消费者在那时发送 joinGroupRequest),直到重新平衡超时到期,即 max.poll.interval.ms .假设您设置了 max.poll.interval.ms到 60 分钟,您的过程需要 50 分钟才能完成。如果在你漫长的过程的第 5 分钟因为我上面提到的任何原因开始重新平衡,那么 Kafka 将等待你的消费者轮询 45 分钟。在此期间,所有消费者都将被撤销。 (对于这个消费者群体,消费将完全停止)所以恕我直言,这不是一个好主意。 (当然这取决于您的需求)
  • 另一种解决方案是不将 Kafka 用于这种长时间运行的操作。因为Kafka不适合长时间运行的处理。您可以将有关长进程的元数据作为使用 Kafka 的消息消费的一部分进行持久化,然后在不使用 Kafka 的情况下进行适当的操作。
  • 关于python-3.x - 如何在不自动提交的情况下长时间(4-60 分钟)处理 Kafka 消息,并在不遭受重新平衡的情况下提交它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61207633/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com