gpt4 book ai didi

bigdata - 在Kafka-storm中未能将偏移数据写入Zookeeper

转载 作者:行者123 更新时间:2023-12-04 07:01:24 25 4
gpt4 key购买 nike

我当时正在设置一个 Storm 集群来计算实时趋势和其他统计信息,但是通过允许kafka-spout上次读取的偏移量(kafka-spout的源代码来自此),我在向项目中引入“恢复”功能时遇到了一些问题。 https://github.com/apache/incubator-storm/tree/master/external/storm-kafka)要记住。我以这种方式启动kafka-spout:

BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);

默认设置应该是这样做的,但是我认为在我的情况下不是这样,每次我启动项目时, PartitionManager都会尝试查找具有偏移量的文件,但没有发现任何东西:
2014-06-25 11:57:08 INFO  PartitionManager:73 - Read partition information from: /storm/partition_1  --> null
2014-06-25 11:57:08 INFO PartitionManager:86 - No partition information found, using configuration to determine offset

然后,它开始从最新的可能偏移量开始读取。如果我的项目从未失败过,但又不是我想要的,那没关系。

我还进一步研究了 PartitionManager类,该类使用 Zkstate类从以下代码片段中编写了偏移量:

分区管理器
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);

_committedTo = lastCompletedOffset;
LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
}

ZkState
public void writeBytes(String path, byte[] bytes) {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, bytes);
} else {
_curator.setData().forPath(path, bytes);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

我可以看到,对于第一条消息, writeBytes方法进入 if块并尝试创建路径,然后对于第二条消息,它进入 else块,这似乎没问题。但是,当我再次启动该项目时,会出现与上述相同的消息。找不到 partition information

最佳答案

我有同样的问题。原来,我是在本地模式下运行的,它使用的是内存中的Zookeeper,而不是Kafka使用的Zookeeper。

为了确保KafkaSpout不使用Storm的ZooKeeper作为存储偏移量的ZkState,除了SpoutConfig.zkServers之外,还需要设置SpoutConfig.zkPortSpoutConfig.zkRootZkHosts。例如

import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;

...

final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
final Integer zkPort = serverInetAddresses.get(0).getPort();
for (InetSocketAddress serverInetAddress : serverInetAddresses) {
serverAddresses.add(serverInetAddress.getHostName());
}

final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;

final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);

spoutConfig.zkServers = serverAddresses;
spoutConfig.zkPort = zkPort;
spoutConfig.zkRoot = kafkaZnode;

关于bigdata - 在Kafka-storm中未能将偏移数据写入Zookeeper,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24408063/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com