gpt4 book ai didi

python - 如何在 Python 中生成 JSON 格式的 Kafka 消息

转载 作者:行者123 更新时间:2023-12-05 03:52:32 25 4
gpt4 key购买 nike

如何像原始格式一样删除引号和发送数据原始的 JSON 格式是:

{
"@timestamp": "2020-06-02T09:38:03.183186Z"
}

此数据在另一个主题中

"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"

这是服务器之间发送数据的代码

def parse(d):   
if str(type(d)) == "<class 'dict'>":
return (r)
return -1

producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
value_serializer=lambda x: dumps(x).encode('utf-8')) # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
auto_offset_reset=param["AUTO_OFFSET_RESET"],
consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
enable_auto_commit=False,
auto_commit_interval_ms=60000,
group_id=param["GROUP_ID"],
client_id=param["CLIENT_ID"]
)
consumer.subscribe([param["TOPIC_IN"]])
while True:
num_rows = 0
for msg in consumer:
num_rows = num_rows + 1
m = json.loads(msg.value)
j = parse(m)
if j != -1:
data = json.dumps(j)
producer.send(param["TOPIC_OUT"], value=data)

最佳答案

您目前正在将您的值序列化为字符串。如果您想要 JSON 而不是字符串,那么您将需要正确序列化您的值。


以下应该可以解决问题:

import json  

producer = KafkaProducer(
bootstrap_servers='mykafka-broker',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

关于python - 如何在 Python 中生成 JSON 格式的 Kafka 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62149261/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com