gpt4 book ai didi

python - Flask API 作为实时 kafka 消费者

转载 作者:行者123 更新时间:2023-12-03 15:46:54 30 4
gpt4 key购买 nike

我想构建一个使用 Flask 框架开发的 python API,该框架使用 Kafka 主题并将流推送到客户端(html 页面或其他应用程序)。

我尝试使用虚拟数据生成实时流(请参阅下面的实时路由)。出现的问题是result变量仅在循环完成后被推送,而 result变量应该在每次迭代时推送。

我还尝试使用 Kafka 连接生成实时流(请参阅下面的 kafka 路由)。问题是没有返回数据,而是请求没有完成。

from flask import Response, Flask
import time
from kafka import KafkaConsumer

application = Flask(__name__)

@application.route('/')
def index():
return "Hello, World!"


@application.route('/realtime/')
def realtime():

def createGenerator():

for i in range(1,10):
yield str(i) + '\n'
time.sleep(0.2)

return Response(createGenerator())


@application.route('/kafka/')
def kafkaStream():
consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
client_id = 'name of client',
auto_offset_reset = 'earliest',
value_deserializer = lambda m: json.loads(m.decode('ascii')))

consumer.subscribe(topics=['my-topic'])

def events():
result = []
for message in consumer:
if message is not None:
result.append(message.value)
yield result
return Response(events())

if __name__ == '__main__':
application.run(debug = True)

到目前为止,我有效地从 Kafka 接收数据的唯一方法是在控制台中打印结果。
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
client_id = 'name of client',
auto_offset_reset = 'earliest',
value_deserializer = lambda m: json.loads(m.decode('ascii')))

consumer.subscribe(topics=['my-topic'])

for message in consumer:
print message

我认为问题是API在进程完成之前无法推送数据,并且因为 KafkaConsumer连接是无限的,没有任何东西被推送到客户端。

我怎样才能克服这个问题?

最佳答案

对于任何其他人正在寻找解决此问题的方法。基于套接字的解决方案将在 kafka 消费者之上工作,该消费者要么不断地监听队列和帖子中的消息。
查看此链接以获取更多信息。
kafka SocketIo

关于python - Flask API 作为实时 kafka 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40927773/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com