gpt4 book ai didi

kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams()方法的使用及代码示例

转载 作者:知者 更新时间:2024-03-18 12:39:31 26 4
gpt4 key购买 nike

本文整理了Java中kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams()方法的一些代码示例,展示了ZookeeperConsumerConnector.createMessageStreams()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperConsumerConnector.createMessageStreams()方法的具体详情如下:
包路径:kafka.javaapi.consumer.ZookeeperConsumerConnector
类名称:ZookeeperConsumerConnector
方法名:createMessageStreams

ZookeeperConsumerConnector.createMessageStreams介绍

暂无

代码示例

代码示例来源:origin: io.zipkin.zipkin2/zipkin-collector-kafka08

ExecutorService compute() {
 ExecutorService pool =
   streams == 1
     ? Executors.newSingleThreadExecutor()
     : Executors.newFixedThreadPool(streams);
 Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
 topicCountMap.put(topic, streams);
 for (KafkaStream<byte[], byte[]> stream :
   connector.get().createMessageStreams(topicCountMap).get(topic)) {
  pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics)));
 }
 return pool;
}

代码示例来源:origin: io.zipkin.java/zipkin-transport-kafka

KafkaCollector(Builder builder, Lazy<AsyncSpanConsumer> consumer) {
 Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
 topicCountMap.put(builder.topic, builder.streams);
 // Settings below correspond to "Old Consumer Configs"
 // http://kafka.apache.org/documentation.html
 Properties props = new Properties();
 props.put("zookeeper.connect", builder.zookeeper);
 props.put("group.id", builder.groupId);
 props.put("fetch.message.max.bytes", String.valueOf(builder.maxMessageSize));
 // Same default as zipkin-scala, and keeps tests from hanging
 props.put("auto.offset.reset", "smallest");
 connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(new ConsumerConfig(props));
 pool = builder.streams == 1
   ? Executors.newSingleThreadExecutor()
   : Executors.newFixedThreadPool(builder.streams);
 for (KafkaStream<byte[], byte[]> stream : connector.createMessageStreams(topicCountMap).get(builder.topic)) {
  pool.execute(new KafkaStreamProcessor(stream, consumer, builder.metrics));
 }
}

代码示例来源:origin: io.zipkin.java/transport-kafka

KafkaCollector(Builder builder, Lazy<AsyncSpanConsumer> consumer) {
 Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
 topicCountMap.put(builder.topic, builder.streams);
 // Settings below correspond to "Old Consumer Configs"
 // http://kafka.apache.org/documentation.html
 Properties props = new Properties();
 props.put("zookeeper.connect", builder.zookeeper);
 props.put("group.id", builder.groupId);
 props.put("fetch.message.max.bytes", String.valueOf(builder.maxMessageSize));
 // Same default as zipkin-scala, and keeps tests from hanging
 props.put("auto.offset.reset", "smallest");
 connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(new ConsumerConfig(props));
 pool = builder.streams == 1
   ? Executors.newSingleThreadExecutor()
   : Executors.newFixedThreadPool(builder.streams);
 for (KafkaStream<byte[], byte[]> stream : connector.createMessageStreams(topicCountMap).get(builder.topic)) {
  pool.execute(new KafkaStreamProcessor(stream, consumer, builder.metrics));
 }
}

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com