gpt4 book ai didi

python - 无法使用 kafka-python 库从 kafka 代理获取最新数据

转载 作者:太空宇宙 更新时间:2023-11-03 20:31:27 26 4
gpt4 key购买 nike

我正在尝试使用 kafka-python 库使用来自 Kafka 代理的数据,并且有多个代理正在高频率地生成数据,但在 Kafka 消费者端我需要大约 5 的处理时间sec ,所以在处理第一条消息后,我应该得到最新的消息,而不是上次提交偏移后的下一条消息。

我尝试过设置 enable_auto_commit=Falseauto_offset_reset="latest" 我也尝试过设置随机组 id ,我也尝试过设置 group_id = None。这样做的唯一效果是我只在开始时获得最新的数据,但之后每个数据都按偏移量顺序出现,而不是队列末尾或最新数据。

consumer = KafkaConsumer(bootstrap_servers=kafka_brokers_address,
api_version=(2, 3, 0),
group_id='abcd',
value_deserializer=lambda v:json.loads(v.decode('utf-8')),
enable_auto_commit=False,
auto_offset_reset="latest")
consumer_rpnl.assign([TopicPartition('topic', 0)])

c = next(consumer)
## also tried
for c in consumer:
print(c.values)

最佳答案

如何从 https://github.com/dpkp/kafka-python/issues/1405 移动到最后一个示例

def seek_to_last():
consumer = KafkaConsumer(bootstrap_servers=config.kafka_bootstrap_server,
group_id=config.kafka_check_proxy_thread_group,
value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest',
enable_auto_commit=True)

partitions = consumer.partitions_for_topic(config.kafka_raw_proxy_topic)

if len(partitions) > config.TASK_OK_PROXY_SCAN_THREAD_N:
logger.error("...................")

for partation in partitions:
p = TopicPartition(config.kafka_raw_proxy_topic, partation)
mypartition = [p]
consumer.assign(mypartition)
# consumer.seek_to_end(p)
last_pos = consumer.end_offsets(mypartition)
pos = last_pos[p]
logger.info("%s, %s" % (partation, pos))
# consumer.seek(p, pos)
offset = OffsetAndMetadata(pos, b'')
consumer.commit(offsets={p: offset})

关于python - 无法使用 kafka-python 库从 kafka 代理获取最新数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57478851/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com