gpt4 book ai didi

java - 具有多线程的 Kafka Producer - Java

转载 作者:行者123 更新时间:2023-11-29 04:09:15 26 4
gpt4 key购买 nike

我想使用多线程执行 Kafka 生产者。下面是我试过的代码。我不知道如何在 Kafka 生产者中实现线程,因为我不熟悉线程编程。下面是我的制作人的代码。

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaProducerWithThread {
//init params
final String bootstrapServer = "127.0.0.1:9092";
final String topicName = "spark-data-topic";
final String csvFileName = "unique_products.csv";
final static int MAX_THREAD = 2; //created number of threads

//Logger
final Logger logger = LoggerFactory.getLogger(KafkaProducerWithThread.class);

public KafkaProducerWithThread() throws FileNotFoundException {
}

public static void main(String[] args) throws IOException {

new KafkaProducerWithThread().runProducer();
}

public void runProducer() throws IOException {

//Read the CSV file from Resources folder as BufferedReader
ClassLoader classLoader = new KafkaProducerWithThread().getClass().getClassLoader();
BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(csvFileName).getFile()));

//Create a Kafka Producer
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = createKafkaProducer();

//Kafka Producer Metrics
Metric requestTotalMetric = null;
for (Map.Entry<MetricName, ? extends Metric> entry : producer.metrics().entrySet()) {
if ("request-total".equals(entry.getKey().name())) {
requestTotalMetric = entry.getValue();
}
}

//Thread
ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);


//Read the CSV file line by line
String line = "";
int i = 0;
while ((line = reader.readLine()) != null) {
i++;
String key = "products_" + i;

//Create a ProducerRecord
ProducerRecord<String, String> csvProducerRecord = new ProducerRecord<>(topicName, key, line.trim());

//Send the data - Asynchronously
producer.send(csvProducerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//executes every time a record is sent successfully or an exception is thrown
if (e == null) {
//the record was sent successfully
// logger.info("Received new metadata. \n" +
// "Topic: " + recordMetadata.topic() + "\n" +
// "Partition: " + recordMetadata.partition() + "\n" +
// "Offset: " + recordMetadata.offset() + "\n" +
// "Timestamp: " + recordMetadata.timestamp());
} else {
logger.error("Error while producing", e);
}
}
});

if (i % 1000 == 0){
logger.info("Record #: " + i + " Request rate: " + requestTotalMetric.metricValue());
}

}

//Adding a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping the Producer!");
producer.flush();
producer.close();
logger.info("Stopped the Producer!");
}));
}

public org.apache.kafka.clients.producer.KafkaProducer<String, String> createKafkaProducer() {
//Create Producer Properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // For an idempotent producer
//kafka can detect whether it's a duplicate data based on the producer request id.

//Create high throughput Producer at the expense of latency & CPU
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "60");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); //32KB batch size

//Create Kafka Producer
org.apache.kafka.clients.producer.KafkaProducer<String, String> csvProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
return csvProducer;
}
}

任何人都可以帮助我在我的 Kafka 生产者程序中实现线程吗?我的制作人将制作超过一百万条记录,所以我想为此实现线程。我知道 ExecutorService 用于线程编程,但我不确定在这种情况下如何实现。谢谢。

最佳答案

  • 创建一个 MessageSender 类,如下所示。
  • 创建生产者类后,创建一个新的 MesssageSender 对象,将生产者记录和生产者作为构造函数参数。
  • 调用 executorService.submit() 来执行任务。

          class Producer {
    ExecutorService executorService =
    Executors.newFixedThreadPool(MAX_THREAD);
    //Read the CSV file line by line
    String line = "";
    int i = 0;
    while ((line = reader.readLine()) != null) {
    //create produver record
    ProducerRecord<String, String> csvProducerRecord = new ProducerRecord<>(topicName, key, line.trim());
    MessageSender sendMessage= new MessageSender(csvProducerRecord,producer);
    executorService.submit()...
    }
    }

    //Thread class
    class MessageSender implements Runnable<>{
    MessageSender(Producerrecord,producer{
    //store in class level variable in thread class
    }
    public void run(){
    producer.send(csvProducerRecord...);
    }

关于java - 具有多线程的 Kafka Producer - Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56056317/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com