- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文整理了Java中kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams()
方法的一些代码示例,展示了ZookeeperConsumerConnector.createMessageStreams()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperConsumerConnector.createMessageStreams()
方法的具体详情如下:
包路径:kafka.javaapi.consumer.ZookeeperConsumerConnector
类名称:ZookeeperConsumerConnector
方法名:createMessageStreams
暂无
代码示例来源:origin: io.zipkin.zipkin2/zipkin-collector-kafka08
ExecutorService compute() {
ExecutorService pool =
streams == 1
? Executors.newSingleThreadExecutor()
: Executors.newFixedThreadPool(streams);
Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
topicCountMap.put(topic, streams);
for (KafkaStream<byte[], byte[]> stream :
connector.get().createMessageStreams(topicCountMap).get(topic)) {
pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics)));
}
return pool;
}
代码示例来源:origin: io.zipkin.java/zipkin-transport-kafka
KafkaCollector(Builder builder, Lazy<AsyncSpanConsumer> consumer) {
Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
topicCountMap.put(builder.topic, builder.streams);
// Settings below correspond to "Old Consumer Configs"
// http://kafka.apache.org/documentation.html
Properties props = new Properties();
props.put("zookeeper.connect", builder.zookeeper);
props.put("group.id", builder.groupId);
props.put("fetch.message.max.bytes", String.valueOf(builder.maxMessageSize));
// Same default as zipkin-scala, and keeps tests from hanging
props.put("auto.offset.reset", "smallest");
connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(new ConsumerConfig(props));
pool = builder.streams == 1
? Executors.newSingleThreadExecutor()
: Executors.newFixedThreadPool(builder.streams);
for (KafkaStream<byte[], byte[]> stream : connector.createMessageStreams(topicCountMap).get(builder.topic)) {
pool.execute(new KafkaStreamProcessor(stream, consumer, builder.metrics));
}
}
代码示例来源:origin: io.zipkin.java/transport-kafka
KafkaCollector(Builder builder, Lazy<AsyncSpanConsumer> consumer) {
Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
topicCountMap.put(builder.topic, builder.streams);
// Settings below correspond to "Old Consumer Configs"
// http://kafka.apache.org/documentation.html
Properties props = new Properties();
props.put("zookeeper.connect", builder.zookeeper);
props.put("group.id", builder.groupId);
props.put("fetch.message.max.bytes", String.valueOf(builder.maxMessageSize));
// Same default as zipkin-scala, and keeps tests from hanging
props.put("auto.offset.reset", "smallest");
connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(new ConsumerConfig(props));
pool = builder.streams == 1
? Executors.newSingleThreadExecutor()
: Executors.newFixedThreadPool(builder.streams);
for (KafkaStream<byte[], byte[]> stream : connector.createMessageStreams(topicCountMap).get(builder.topic)) {
pool.execute(new KafkaStreamProcessor(stream, consumer, builder.metrics));
}
}
本文整理了Java中kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams()方法的一些代码示例,展示了Zooke
我正在尝试从我的 Kafka 0.8.1 集群中检索数据。我创建了一个 ZookeeperConsumerConnector 实例,然后尝试对其调用 createMessageStreams。然而,无
我是一名优秀的程序员,十分优秀!