gpt4 book ai didi

python - 如何使用 kafka 主题并通过 http 进行解析/服务?

转载 作者:行者123 更新时间:2023-12-01 09:24:10 26 4
gpt4 key购买 nike

我正在尝试在 python 中使用 kafka 主题并使用 prometheus 客户端通过 http 提供服务,但我似乎在主题使用上被阻止。我放置了一些占位符来简单地添加指标,但看起来该部分已被阻止。

import os
from pykafka import KafkaClient
import threading
from kafka import KafkaConsumer
from prometheus_client import start_http_server, Metric, REGISTRY

class CustomCollector(threading.Thread):
daemon = True

def collect(self):
client = KafkaClient(hosts=os.environ['KAFKA_ADDRESS'])
topic = client.topics[b'os.environ['KAFKA_TOPIC']
consumer = topic.get_simple_consumer()
for message in consumer:
if message is not None:
print(message.value)

metric = Metric('test_name', 'description', 'summary')
metric.add_sample('test_name', 'description', 'summary')
yield metric

if __name__ == '__main__':
start_http_server(9998)
REGISTRY.register(CustomCollector())
while True: time.sleep(1)

如果我运行代码,我会看到主题数据按预期流式传输到控制台。但是,我的指标端点从未被填充,并且对 Web 服务器的任何请求都会挂起,直到我终止该应用程序,它会使用库中的标准指标进行响应。

最佳答案

对于每个消费的消息,Metric 实例的构造应该发生一次。也就是说,Metric() 调用应该位于 for message in Consumer 循环内部。此外,您可能希望在创建 Metric 实例时以某种方式使用 message.value

关于python - 如何使用 kafka 主题并通过 http 进行解析/服务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50573918/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com