gpt4 book ai didi

java - 如何使用Rabbitmq Spark Stram查询ES

转载 作者:行者123 更新时间:2023-12-01 09:20:04 25 4
gpt4 key购买 nike

我正在使用 Spark2。我正在尝试从 Rabbitmq 获取搜索文本流并再次查询 Elasticsearch。

params.put("hosts", "IP");
params.put("queueName", "query");
params.put("exchangeName", "Exchangequery");
params.put("vHost", "/");
params.put("userName", "test");
params.put("password", "test");

Function<byte[], String> messageHandler = new Function<byte[], String>() {

public String call(byte[] message) {
return new String(message);
}
};

JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jssc, String.class, params, messageHandler);

messages.foreachRDD();

上面的代码从rabbitmq接收stram。但我不知道如何连接 ES 并查询流批处理。一件事是,如果我使用messages.foreachRDD();并查询elasticsearch的每个输入项,那么它将影响性能。

我总是只使用一个字段来查询elasticsearch。例如

我的 stram messages 的输入类似于

apple 
orange

我在 es fruit 中有一个索引,我想查询 ?q=apple or Orange。我知道我必须在 elasticsearch 中使用 should 构建查询。 我的问题是如何使用从 Rabbitmq 流收到的值查询 ES

最佳答案

该代码仅对elasticsearch服务器进行一次调用(基本上它构建了一个带有大量should子句的单个查询)

public static void main(String[] args) throws UnknownHostException {

Client client = TransportClient.builder().build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

List<String> messages = new ArrayList<>();
messages.add("apple");
messages.add("orange");

String index = "fruit";
String fieldName = "fruit_type";

BoolQueryBuilder query = QueryBuilders.boolQuery();

for (String message : messages) {
query.should(QueryBuilders.matchQuery(fieldName, message));
// alternative if you are not analyzing fields
// query.should(QueryBuilders.termQuery(fieldName, message));
}


int size = 60; //you may want to change this since it defaults to 10
SearchResponse response = client.prepareSearch(index).setQuery(query).setSize(size).execute().actionGet();

long totalHits = response.getHits().getTotalHits();
System.out.println("Found " + totalHits + " documents");
for (SearchHit hit : response.getHits().getHits()) {
System.out.println(hit.getSource());
}
}

生成的查询:

{
"bool" : {
"should" : [ {
"match" : {
"fruit_type" : {
"query" : "apple",
"type" : "boolean"
}
}
}, {
"match" : {
"fruit_type" : {
"query" : "orange",
"type" : "boolean"
}
}
} ]
}
}

关于java - 如何使用Rabbitmq Spark Stram查询ES,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40221905/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com