gpt4 book ai didi

Python Kafka 客户端 - 没有错误但无法正常工作

转载 作者:行者123 更新时间:2023-12-04 17:44:31 26 4
gpt4 key购买 nike

我在 python 中运行 confluent_kafka 客户端。目前我在尝试生成和消费消息时没有收到任何错误,但问题是生产者说它成功了,但消费者找不到任何消息。

我创建了一个主题,这是我构建的我正在使用的类:

from confluent_kafka import Producer, Consumer
from config import config
import json

class Kafka:
"""
Kafka Handler.
"""

def __init__(self, kafka_brokers_sasl, api_key):
"""
Arguments:
kafka_brokers_sasl {str} -- String containing kafka brokers separated by comma (no spaces)
api_key {str} -- Kafka Api Key
"""

self.driver_options = {
'bootstrap.servers': kafka_brokers_sasl,
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': 'token',
'sasl.password': api_key,
'log.connection.close' : False,
#'debug': 'all'
}

self.producer_options = {
'client.id': 'kafka-python-console-sample-producer'
}
self.producer_options.update(self.driver_options)

self.consumer_options = {
'client.id': 'kafka-python-console-sample-consumer',
'group.id': 'kafka-python-console-sample-group'
}
self.consumer_options.update(self.driver_options)

self.running = None


def stop(self):
self.running = False


def delivery_report(self, err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


def produce(self, topic, data): # Function for producing/uploading data to a Kafka topic

p = Producer(self.producer_options)

print("Running?")

# Asynchronously produce a message, the delivery report callback will be triggered from poll() above, or flush() below, when the message has been successfully delivered or failed permanently.
p.produce(topic, data, callback=self.delivery_report)

# Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered.
p.flush()
print("Done?")


def consume(self, topic, method_class=None): # Function for consuming/reading data from a Kafka topic. Works as a listener and triggers the run() function on a method_class
print("raaa")

kafka_consumer = Consumer(self.consumer_options)

kafka_consumer.subscribe([topic])

# Now loop on the consumer to read messages
print("Running?")
self.running = True
while self.running:
msg = kafka_consumer.poll()

print(msg)

if msg is not None and msg.error() is None:
print('Message consumed: topic={0}, partition={1}, offset={2}, key={3}, value={4}'.format(
msg.topic(),
msg.partition(),
msg.offset(),
msg.key().decode('utf-8'),
msg.value().decode('utf-8')))
else:
print('No messages consumed')

print("Here?")
kafka_consumer.unsubscribe()
kafka_consumer.close()
print("Ending?")

mock = {'yas': 'yas', 'yas2': 'yas2'}
kafka = Kafka(config['kafka']['kafka_brokers_sasl'], config['kafka']['api_key'])
kafka.produce(config['kafka']['topic'], json.dumps(mock))
kafka.consume(config['kafka']['topic'])

运行这个我得到打印:

Running?
Message delivered to DANIEL_TEST [0]
Done?
raaa
Running?
<cimpl.Message object at 0x104e4c390>
No messages consumed

最佳答案

我遇到了同样的问题。driver_options 必须包含 SSL 证书路径,因此您必须设置 'ssl.ca.location': '/etc/pki/tls/cert.pem'或此处记录的等效位置:https://github.com/ibm-messaging/event-streams-samples/blob/master/kafka-python-console-sample/app.py#L75

然后成功了!

关于Python Kafka 客户端 - 没有错误但无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52737167/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com