gpt4 book ai didi

python - 数据不会使用python多处理推送到Kafka队列

转载 作者:行者123 更新时间:2023-12-03 12:59:19 24 4
gpt4 key购买 nike

我正在使用Python(2.7)多处理,使用kafka-python(1.3.5)KafkaProducer将数据推送到Kafka队列。

from kafka import KafkaProducer
import multiprocessing
# other imports


class TestClass(object):
def __init__(self, producer):
self.kafka_producer = producer

def main(self, conf, nthreads):
try:
for i in range(nthreads):
logger.info("Starting process number = %d " % (i + 1))
p = Process(target=self.do_some_task, args=(conf, 2))
p.start()
processes.append(p)
for p in processes:
logger.info("Joining process")
p.join()
except Exception, ex:
logger.error("Exception occurred : %s" % str(ex))

def do_some_task(conf, retry):
# some task happening
self.record(arg1, arg2)

# pushing to kafka
def record(self, arg1, arg2)
message = json.dumps({"a": "arg1", "b": "arg2"})
self.kafka_producer.send(KAFKA_TOPIC, message)


if __name__ == '__main__':
kafka_producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
request_timeout_ms=60000,
retries=2)
obj = TestClass(kafka_producer)

try:
parser = argparse.ArgumentParser(description='Description')
parser.add_argument('-threads', type=int, default=1) # 20 threads
parser.add_argument('-debug', type=int, default=0)
args = parser.parse_args()
me = SingleInstance(args.src)
TestClass.main(CONF[args.src], args.threads)

产生了20个线程,其中写入kafka。我看到了日志,发现该进程等待用kafka编写消息,但最终它继续运行而没有写入Kafka。没有异常(exception)。

我尝试从python命令行运行相同的代码而没有线程,并且一切正常。可能是什么问题。

最佳答案

请在 fork 过程之后产生与kafka的连接。并且请关闭连接,并在遇到与连接有关的错误时重新连接。

关于python - 数据不会使用python多处理推送到Kafka队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47176083/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com