gpt4 book ai didi

elasticsearch - 使用 elasticsearch-hadoop 库将元组从 storm 索引到 elasticsearch 不起作用

转载 作者:行者123 更新时间:2023-11-29 02:56:54 25 4
gpt4 key购买 nike

我想将文档从 Storm 索引到 Elasticsearch,但是我无法将任何文档索引到 Elasticsearch。

在我的拓扑结构中,我有一个 KafkaSpout,它发出一个像这样的 json { “tweetId”: 1, “text”: “hello” } 到 EsBolt,它是来自 elasticsearch-hadoop 库的本地 bolt ,它将 Storm 元组写入到Elasticsearch(文档在这里:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html)。这些是我的 EsBolt 的配置:

Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);

前两个配置默认有这些值,但我选择显式设置它们。我也试过没有它们,得到了相同的结果。

这就是我构建拓扑的方式:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
.setNumTasks(kafkaSpoutNumberOfTasks);


builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
.setNumTasks(elasticsearchBoltNumberOfTasks)
.shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);

return builder.createTopology();

在本地运行拓扑之前,我在 Elasticsearch 中创建了“twitter”索引,并为该索引创建了一个映射“tweet”。如果我为新创建的类型检索映射 (curl -XGET ' http://localhost:9200/twitter/_mapping/tweet '),这就是我得到的:

{
"twitter": {
"mappings": {
"tweet": {
"properties": {
"text": {
"type": "string"
},
"tweetId": {
"type": "string"
}
}
}
}
}
}

我在本地运行拓扑,这是我在处理元组时在控制台中得到的:

Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]

TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]

BOLT ack TASK: 6 TIME: TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:

因此元组似乎已被处理。但是,我没有在 Elasticsearch 中索引任何文档。

我想我在为 EsBolt 设置配置时做错了什么,可能缺少配置或其他东西。

最佳答案

文档只有在达到由 es.storm.bolt.flush.entries.size 指定的刷新大小后才会被索引

或者,您可以设置触发队列刷新的 TICK 频率。

config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);

默认情况下,es-hadoop 根据 es.storm.bolt.tick.tuple.flush 刷新。参数。

关于elasticsearch - 使用 elasticsearch-hadoop 库将元组从 storm 索引到 elasticsearch 不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36662213/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com