gpt4 book ai didi

python - 我正在尝试通过 docker compose 运行 Flask 应用程序和 kafka..但无法正确设置

转载 作者:太空宇宙 更新时间:2023-11-03 19:46:36 26 4
gpt4 key购买 nike

我正在尝试通过 docker compose.. 和 kafka 运行一个 Flask 应用程序。我正在一起运行 Flask 应用程序和 Consumer.py。当我通过 Flask api 调用 Producer.py 时,它似乎正在发送数据,但消费者没有收到任何东西。由于我同时运行 Flask 应用程序和消费者文件..我也无法看到日志...有人可以帮忙吗..

我的 docker-compose 文件

version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: "zoo1"
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: "kafka1"
ports:
- "9092:9092"
expose:
- "9093"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
backend:
build: .
image: backend:v1
command: bash -c "python3 run.py run && python run.py consumer"
container_name: backend
restart: always
volumes:
- .:/app
ports:
- '5000:5000'
depends_on:
- mongodb
- neodb
- elasticsearch
- kafka

我的制作人.py

from kafka import KafkaProducer
from json import dumps

class Producer:
def __init__(self):
pass

def producer(self, topic,data):

producer = KafkaProducer(
value_serializer=lambda m: dumps(m).encode('utf-8'),
bootstrap_servers=['kafka:9092','kafka:9093','172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
producer.send(topic, data)

我的消费者.py

from kafka import KafkaConsumer
from json import loads
import os

class Consumer:
def consumer(self,topic):
print("hello")
consumer = KafkaConsumer(
topic,
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='my-group-1',
value_deserializer=lambda m: loads(m.decode('utf-8')),
bootstrap_servers=['kafka:9092','kafka:9093','172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
while True:
dirName = "bad"
if not os.path.exists(dirName):
os.mkdir(dirName)
for m in consumer:
pass

最佳答案

看来您没有从 consumer = KafkaCo 获得任何对象...

这是您必须确保输入正确的文档。

Apache Kafka Docs

关于python - 我正在尝试通过 docker compose 运行 Flask 应用程序和 kafka..但无法正确设置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60096241/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com