gpt4 book ai didi

org.apache.storm.kafka.ZkHosts类的使用及代码示例

转载 作者:知者 更新时间:2024-03-19 19:41:31 28 4
gpt4 key购买 nike

本文整理了Java中org.apache.storm.kafka.ZkHosts类的一些代码示例,展示了ZkHosts类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkHosts类的具体详情如下:
包路径:org.apache.storm.kafka.ZkHosts
类名称:ZkHosts

ZkHosts介绍

暂无

代码示例

代码示例来源:origin: intel-hadoop/HiBench

public static BaseRichSpout getSpout(StormBenchConfig conf) {
 String topic = conf.topic;
 String consumerGroup = conf.consumerGroup;
 String zkHost = conf.zkHost;
 BrokerHosts brokerHosts = new ZkHosts(zkHost);
 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, "", consumerGroup);
 spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
 spoutConfig.ignoreZkOffsets = true;
 spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
 return new KafkaSpout(spoutConfig);
}

代码示例来源:origin: intel-hadoop/HiBench

public static ITridentDataSource getTridentSpout(StormBenchConfig conf, boolean opaque) {
 String topic = conf.topic;
 String consumerGroup = conf.consumerGroup;
 String zkHost = conf.zkHost;
 BrokerHosts brokerHosts = new ZkHosts(zkHost);
 TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(brokerHosts, topic, consumerGroup);
 tridentKafkaConfig.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
 tridentKafkaConfig.ignoreZkOffsets = true;
 tridentKafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
 if (opaque) {
  return new OpaqueTridentKafkaSpout(tridentKafkaConfig);
 } else {
  return new TransactionalTridentKafkaSpout(tridentKafkaConfig);
 }
}

代码示例来源:origin: Paleozoic/storm_spring_boot_demo

@Bean("kafkaSpout")
  public KafkaSpout buildSpout() {
    super.setId("kafkaSpout");
    /**
     *
     * new ZkHosts(brokerZkStr):brokerZkStr逗号分隔,kafka的zookeeper集群
     * topic:storm订阅的topic,即从哪个topic读取消息
     * spout会根据config的zkRoot和id两个参数在zookeeper上为每个kafka分区创建保存kafka偏移量的路径,如:/zkRoot/id/partitionId。
     * {@link PartitionManager#committedPath()}
     * zkRoot:偏移量保存的zk根路径
     * id:如果重新运行,希望获取同样的偏移量,则设置为固定的ID
     * PS:kafka新版本已经不将偏移量保存在zookeeper了。而且也不推荐将offset写入zk(低效)。
     */
    SpoutConfig spoutConf = new SpoutConfig(new ZkHosts(brokerZkStr), topic, zkRoot, "kafkaSpout");
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    return new KafkaSpout(spoutConf);
  }
}

代码示例来源:origin: JiuzhouSec/nightwatch

private static TopologyBuilder buildTopology() throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    String topicName = Configuration.getConfig().getString("rtc.mq.spout.topic");
    String groupName = Configuration.getConfig().getString("rtc.mq.spout.group");
    BrokerHosts hosts = new ZkHosts(Configuration.getConfig().getString("rtc.zk.hosts"));
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/consumers", groupName);
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
    spoutConfig.zkServers = Arrays.asList(Configuration.getConfig().getString("rtc.storm.zkServers").split(","));
    spoutConfig.zkPort = Configuration.getConfig().getInt("rtc.storm.zkPort");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("MQSpout", kafkaSpout, Configuration.getConfig().getInt("rtc.storm.spout.parallelismHint")).setNumTasks(Configuration.getConfig().getInt("rtc.storm.spout.task"));
    builder.setBolt("ExtractBolt", new ExtractBolt(), Configuration.getConfig().getInt("rtc.storm.extract.bolt.parallelismHint")).setNumTasks(Configuration.getConfig().getInt("rtc.storm.extract.bolt.task")).shuffleGrouping("MQSpout");
    builder.setBolt("Statistic", new StatisticBolt(), Configuration.getConfig().getInt("rtc.storm.statistic.bolt.parallelismHint")).setNumTasks(Configuration.getConfig().getInt("rtc.storm.statistic.bolt.task")).fieldsGrouping("ExtractBolt", new Fields(new String[]{"hashKeys"}));
//        builder.setBolt("Alarm", new AlarmBolt(), Configuration.getConfig().getInt("rtc.storm.alarm.bolt.parallelismHint")).setNumTasks(Configuration.getConfig().getInt("rtc.storm.alarm.bolt.task")).fieldsGrouping("Statistic", new Fields(new String[]{"EventName"}));
    return builder;
  }

代码示例来源:origin: JiuzhouSec/nightwatch

private static TopologyBuilder buildTopology() throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    String topicName = Configuration.getConfig().getString("rtc.mq.spout.topic");
    String groupName = Configuration.getConfig().getString("rtc.mq.spout.group");
    BrokerHosts hosts = new ZkHosts(Configuration.getConfig().getString("rtc.zk.hosts"));
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/consumers", groupName);

    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
    spoutConfig.zkServers = Arrays.asList(Configuration.getConfig().getString("rtc.storm.zkServers").split(","));
    spoutConfig.zkPort = Configuration.getConfig().getInt("rtc.storm.zkPort");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("MQSpout", kafkaSpout, Configuration.getConfig().getInt("rtc.storm.spout.parallelismHint")).setNumTasks(Configuration.getConfig().getInt("rtc.storm.spout.task"));
    builder.setBolt("ExtractBolt", new ExtractBolt(), Configuration.getConfig().getInt("rtc.storm.extract.bolt.parallelismHint")).setNumTasks(Configuration.getConfig().getInt("rtc.storm.extract.bolt.task")).shuffleGrouping("MQSpout");
    builder.setBolt("Statistic", new StatisticBolt(), Configuration.getConfig().getInt("rtc.storm.statistic.bolt.parallelismHint")).setNumTasks(Configuration.getConfig().getInt("rtc.storm.statistic.bolt.task")).fieldsGrouping("ExtractBolt", new Fields(new String[]{"hashKeys"}));
//        builder.setBolt("Alarm", new AlarmBolt(), Configuration.getConfig().getInt("rtc.storm.alarm.bolt.parallelismHint")).setNumTasks(Configuration.getConfig().getInt("rtc.storm.alarm.bolt.task")).fieldsGrouping("Statistic", new Fields(new String[]{"EventName"}));
    return builder;
  }

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com