- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想使用多线程执行 Kafka 生产者。下面是我试过的代码。我不知道如何在 Kafka 生产者中实现线程,因为我不熟悉线程编程。下面是我的制作人的代码。
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaProducerWithThread {
//init params
final String bootstrapServer = "127.0.0.1:9092";
final String topicName = "spark-data-topic";
final String csvFileName = "unique_products.csv";
final static int MAX_THREAD = 2; //created number of threads
//Logger
final Logger logger = LoggerFactory.getLogger(KafkaProducerWithThread.class);
public KafkaProducerWithThread() throws FileNotFoundException {
}
public static void main(String[] args) throws IOException {
new KafkaProducerWithThread().runProducer();
}
public void runProducer() throws IOException {
//Read the CSV file from Resources folder as BufferedReader
ClassLoader classLoader = new KafkaProducerWithThread().getClass().getClassLoader();
BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(csvFileName).getFile()));
//Create a Kafka Producer
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = createKafkaProducer();
//Kafka Producer Metrics
Metric requestTotalMetric = null;
for (Map.Entry<MetricName, ? extends Metric> entry : producer.metrics().entrySet()) {
if ("request-total".equals(entry.getKey().name())) {
requestTotalMetric = entry.getValue();
}
}
//Thread
ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);
//Read the CSV file line by line
String line = "";
int i = 0;
while ((line = reader.readLine()) != null) {
i++;
String key = "products_" + i;
//Create a ProducerRecord
ProducerRecord<String, String> csvProducerRecord = new ProducerRecord<>(topicName, key, line.trim());
//Send the data - Asynchronously
producer.send(csvProducerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//executes every time a record is sent successfully or an exception is thrown
if (e == null) {
//the record was sent successfully
// logger.info("Received new metadata. \n" +
// "Topic: " + recordMetadata.topic() + "\n" +
// "Partition: " + recordMetadata.partition() + "\n" +
// "Offset: " + recordMetadata.offset() + "\n" +
// "Timestamp: " + recordMetadata.timestamp());
} else {
logger.error("Error while producing", e);
}
}
});
if (i % 1000 == 0){
logger.info("Record #: " + i + " Request rate: " + requestTotalMetric.metricValue());
}
}
//Adding a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping the Producer!");
producer.flush();
producer.close();
logger.info("Stopped the Producer!");
}));
}
public org.apache.kafka.clients.producer.KafkaProducer<String, String> createKafkaProducer() {
//Create Producer Properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // For an idempotent producer
//kafka can detect whether it's a duplicate data based on the producer request id.
//Create high throughput Producer at the expense of latency & CPU
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "60");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); //32KB batch size
//Create Kafka Producer
org.apache.kafka.clients.producer.KafkaProducer<String, String> csvProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
return csvProducer;
}
}
任何人都可以帮助我在我的 Kafka 生产者程序中实现线程吗?我的制作人将制作超过一百万条记录,所以我想为此实现线程。我知道 ExecutorService
用于线程编程,但我不确定在这种情况下如何实现。谢谢。
最佳答案
调用 executorService.submit() 来执行任务。
class Producer {
ExecutorService executorService =
Executors.newFixedThreadPool(MAX_THREAD);
//Read the CSV file line by line
String line = "";
int i = 0;
while ((line = reader.readLine()) != null) {
//create produver record
ProducerRecord<String, String> csvProducerRecord = new ProducerRecord<>(topicName, key, line.trim());
MessageSender sendMessage= new MessageSender(csvProducerRecord,producer);
executorService.submit()...
}
}
//Thread class
class MessageSender implements Runnable<>{
MessageSender(Producerrecord,producer{
//store in class level variable in thread class
}
public void run(){
producer.send(csvProducerRecord...);
}
关于java - 具有多线程的 Kafka Producer - Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56056317/
我有一个生产者/消费者场景,我不希望一个生产者交付产品,也不希望多个消费者消费这些产品。然而,常见的情况是交付的产品仅由一个消费者消费,而其他消费者永远看不到该特定产品。我不想完成的是每个消费者消费一
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
关于 REST Web 服务。 @Produces("application/json") 和 @Produces(MediaType.APPICATION_JSON) 两者的工作方式相同,但第二个需
我正在尝试使用 Kafka: import java.util.Properties; import org.apache.kafka.clients.producer.Producer; impor
当我使用 Producer.flush() 时,它可以工作,但根据 kafka confluent issue 性能较差,但按照建议,我使用 Producer.poll(0) 但不会向主题生成任何消息
我正在针对 Python 的 confluent-kafka 使用 native java 实现测试 Apache Kafka Producer,以查看哪个具有最大吞吐量。 我正在使用 docker-
我看到 @products 注释允许我传递单个字符串和字符串列表。所以我只是想知道这是如何在java中完成的,如果我需要使用允许以下行为的方法来实现它,我该怎么做?或者这个注释是特定的,所以我们不能在
我正在开发一个迁移学习应用程序,我正在其中针对我的数据流重新训练 MobileNetV2。 我正在使用 retrain.py 重新训练模型来自tensorflow-hub并且没有做任何修改。 当我从终
在 Cloud Foundry 中,我能够向非 ssl url(“kafkaURL:9092”)生成消息。但它不适用于 ssl url(“kafkaURL:9093”)。 Kafka 服务器版本 0.
我正在使用 kafka 向消费者发送消息。但是由于某种原因,当我使用 Producer.send(record, new MyProducerCallback()); 向主题发送记录时,该主题的使用者
我正在编写一个演示应用程序来创建一个 Kafka Producer。我创建了一个主题并在 Kafka 上运行了一个生产者和消费者,它似乎正在工作。我正在编写一个 spring 应用程序来创建一个生产者
我在我的项目中使用 spring boot v2.2.4 和 Apache Kafka。 下面是我的pom.xml文件: org.springframewo
我正在尝试使用 java 程序制作 Kafka 生产者。但是当我运行程序时我收到了一些警告,没有任何错误但是生产者没有发送数据并且警告如下所示。 [kafka-producer-network-thr
我正在尝试加载一个简单的文本文件而不是 Kafka 中的标准输入。下载 Kafka 后,我执行了以下步骤: 启动动物园管理员: bin/zookeeper-server-start.sh config
我有一个类,它生成一个 ElasticSearch 客户端以与 @Inject 一起使用 @Produces @ApplicationScoped public Client createClient
对于一个新项目,我们在客户端使用 jQuery 组件,其中之一是 blueImp 文件 uploader 。我们愉快地编写代码,在 Chrome 和 Firefox 中一切都运行良好……直到有人尝试在
我有一些开发要做,我尝试看看是否有可以使用的设计模式。问题很简单: 我有一个启动许多线程的主线程。主线程必须等待每个线程完成然后再做其他事情。现有的代码有点难看。我有一个 while 循环来检查线程组
我正在使用驱动对象模型工具 CodeFluentEntities以便将模型部署到数据库引擎。 我正在考虑使用 localStorage 数据库引擎(如 IndexedDB 或 Web SQL)来为没有
我无法停止 ActiveMQ Producer。 场景是:我为内存使用和临时存储设置了较低的值。
我正在尝试结合使用 CDI (weld-se 2) 和 JavaFX,并且我想使用自定义创建的注释来注释我的 Controller 类,以便使用我的工厂方法管理此类创建。我想应该如下所示,但这段代码不
我是一名优秀的程序员,十分优秀!