作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在开发 Kafka 自定义分区器类。在这里,我尝试将数据推送到单独的分区中。我的 Kafka 生产者类:
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaCustomPartitioner {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
int blocks = Integer.parseInt(args[1]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class","com.kafka.partdecider.CustomPartitioner");
props.put("producer.type", "sync");
props.put("request.required.acks","1");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
for(int nBlocks=0; nBlocks<blocks; nBlocks++) {
for(long nEvents=0; nEvents<events; nEvents++) {
long runTime = new Date().getTime();
String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("CustPartTopic",String.valueOf(nBlocks),msg);
producer.send(data);
}
}
producer.close();
}
}
客户分区器类:
import kafka.producer.Partitioner;
public class CustomPartitioner implements Partitioner {
public int partition(Object key, int arg1) {
String receivingkey = (String) key;
long id = Long.parseLong(receivingkey);
return (int) (id%arg1);
}
}
项目的参数部分的值为:3 2如果我运行该类,我会在这一行收到“ArrayOutOfBoundsException”:
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
at com.kafka.custompartitioner.KafkaCustomPartitioner.main(KafkaCustomPartitioner.java:13)
错误显示在以下行:long events = Long.parseLong(args[0]);
但我不明白为什么该行会给出错误。谁能告诉我如何解决这个问题?
最佳答案
这对我有用,API 完全不同:
package mypackage.io;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
public class KafkaCustomPartitioner {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long events = Long.parseLong(args[0]);
int blocks = Integer.parseInt(args[1]);
Random rnd = new Random();
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "mypackage.io.CustomPartitioner");
props.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for(int nBlocks=0; nBlocks<blocks; nBlocks++) {
for(long nEvents=0; nEvents<events; nEvents++) {
long runTime = new Date().getTime();
String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd;
producer.send(new ProducerRecord<String, String>("CustPartTopic", String.valueOf(nBlocks), msg)).get();
}
}
producer.close();
}
}
然后是自定义分区器
package mypackage.io;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String receivingkey = (String) key;
long id = Long.parseLong(receivingkey);
int numPartitions = cluster.availablePartitionsForTopic(topic).size();
return (int) (id % numPartitions);
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}
关于java - 我的 Kafka 自定义分区器类中出现错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44797937/
我刚开始使用 Dagger 2,想知道与我目前用来实现依赖注入(inject)的技术相比,它有什么优势。 目前,为了实现 DI,我创建了一个具有两种风格的项目:mock 和 prod。在这些风格中,我
我是一名优秀的程序员,十分优秀!