gpt4 book ai didi

apache-kafka - Python librdkafka Producer 与 native Apache Kafka Producer 执行

转载 作者:行者123 更新时间:2023-12-04 05:29:22 25 4
gpt4 key购买 nike

我正在针对 Python 的 confluent-kafka 使用 native java 实现测试 Apache Kafka Producer,以查看哪个具有最大吞吐量。

我正在使用 docker-compose 部署具有 3 个 Kafka 代理和 3 个 zookeeper 实例的 Kafka 集群。我的 docker compose 文件:https://paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw

这是一个非常简单的代码,其中大部分是 Python confluent-kafka 的默认选项,以及 Java 生成器中的一些配置更改以匹配 confluent-kafka 的配置。

python 代码:

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
'default.topic.config': {'acks': "all"}})

ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'

def f():
import time
start = time.time()
for i in xrange(1000000):
try:
producer.produce('test-topic', ss)
except Exception:
producer.poll(1)
try:
producer.produce('test-topic', ss)
except Exception:
producer.flush(30)
producer.produce('test-topic', ss)
producer.poll(0)
producer.flush(30)
print(time.time() - start)


if __name__ == '__main__':
f()

Java 实现。配置与 librdkafka 中的配置相同。按照 Edenhill 的建议更改了 linger.ms 和回调。

package com.amit.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.Charset;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaProducerExampleAsync {

private final static String TOPIC = "test-topic";
private final static String BOOTSTRAP_SERVERS = "kafka-1:19092,kafka-2:29092,kafka-3:39092";

private static Producer<String, String> createProducer() {
int bufferMemory = 67108864;
int batchSizeBytes = 1000000;
String acks = "all";

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSizeBytes);
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1000000);
props.put(ProducerConfig.ACKS_CONFIG, acks);

return new KafkaProducer<>(props);
}

static void runProducer(final int sendMessageCount) throws InterruptedException {
final Producer<String, String> producer = createProducer();
final String msg = "0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357";

final ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, msg);
final long[] new_time = new long[1];

try {
for (long index = 0; index < sendMessageCount; index++) {
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
// This if-else is to only start timing this when first message reach kafka
if(e != null) {
e.printStackTrace();
} else {
if (new_time[0] == 0) {
new_time[0] = System.currentTimeMillis();
}
}
}
});
}
} finally {
// producer.flush();
producer.close();
System.out.printf("Total time %d ms\n", System.currentTimeMillis() - new_time[0]);
}
}

public static void main(String... args) throws Exception {
if (args.length == 0) {
runProducer(1000000);
} else {
runProducer(Integer.parseInt(args[0]));
}
}
}

基准测试结果(在进行 Edenhill 推荐的更改后编辑)

确认 = 0 消息:1000000

java :12.066

Python:9.608 秒

确认:所有 消息:1000000

Java:45.763 11.917 秒

Python:14.3029 秒

即使在进行了我能想到的所有更改以及 Edenhill 在下面的评论中建议的更改之后,Java 实现的性能仍与 Python 实现相同。

关于 Kafka 在 Python 中的性能有各种基准测试,但我找不到任何将 librdkafka 或 python Kafka 与 Apache Kafka 进行比较。

我有两个问题:
  • 这个测试是否足以得出结论,使用默认配置和大小为 1Kb librdkafka 的消息更快?
  • 有没有人有将 librdkafka 与 confluent-kafka 进行基准测试的经验或来源(博客、文档等)?
  • 最佳答案

    Python 客户端使用 librdkakfa,它覆盖了 Kafka 的一些默认配置。

    Paramter = Kafka default
    batch.size = 16384
    max.in.flight.requests.per.connection = 5 (librdkafka's default is 1000000)

    message.max.bytes 在 librdkafka 中可能相当于 最大请求大小 .

    我认为没有 librdKafka 的等价物 queue.buffering.max.messages 在 Kafka 的生产者 API 中。如果你发现了什么,然后纠正我。

    另外,删除 缓冲区内存来自 Java 程序的参数。

    https://kafka.apache.org/documentation/#producerconfigs
    https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

    接下来是 Java 需要一些时间来加载类。所以你需要 增加消息数你的生产者生产者。如果生成所有消息至少需要 20-30 分钟,那就太好了。然后您可以将 Java 客户端与 Python 客户端进行比较。

    我喜欢在 python 和 java 客户端之间进行比较的想法。继续在 stackoverflow 上发布您的结果。

    关于apache-kafka - Python librdkafka Producer 与 native Apache Kafka Producer 执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54999370/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com