gpt4 book ai didi

java - 使用 Hadoop 从 Spark 连接到 ElasticSearch 不工作

转载 作者:可可西里 更新时间:2023-11-01 14:48:44 24 4
gpt4 key购买 nike

我在从我的 Java 代码连接到本地运行的 ElasticSearch 节点时遇到问题,该代码作为提交给 Spark 的作业运行(本地运行)。但是,当我不使用 Spark 时,连接没有问题。还运行 Python 作业并将其提交给 spark 工作正常。

我知道对于 Java,我需要通过端口 9300 而不是 9200(HTTP 端口)进行连接。尽管如此,我总是得到同样的异常,阅读或写作没有区别:

16/08/04 16:51:55 ERROR NetworkClient: Node [The server localhost failed to respond with a valid HTTP response] failed (localhost:9300); no other nodes left - aborting...Exception in thread "main" org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[localhost:9300]]     at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:102)    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:282)    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:266)    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:270)    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:108)    at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:90)    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:61)    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:434)    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:415)    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)    at scala.Option.getOrElse(Option.scala:120)    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)    at org.apache.spark.rdd.RDD.take(RDD.scala:1302)    at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342)    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)    at org.apache.spark.rdd.RDD.first(RDD.scala:1341)    at org.apache.spark.api.java.JavaPairRDD.first(JavaPairRDD.scala:211)    at com.dd.mediaforce.spark.most_popular.ExecutorMostPopular.main(ExecutorMostPopular.java:564)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

We are running Spark and ElasticSearch on a number of nodes. The Python code is running fine here, but trying the Java code with this setup of ES didn't help solving the problem either.

The code I'm using connect from Java:

    SparkConf _sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Test");
JavaSparkContext jsc = new JavaSparkContext(_sparkConf);
Configuration conf = new Configuration();
conf.set("cluster.name", "our_clustername");
conf.set("es.nodes", "localhost");
conf.setInt("es.port", 9300);
conf.set("es.resource", index_and_type);
JavaPairRDD readRdd = jsc.newAPIHadoopRDD(conf, org.elasticsearch.hadoop.mr.EsInputFormat.class, org.apache.hadoop.io.NullWritable.class, org.elasticsearch.hadoop.mr.LinkedMapWritable.class);
System.out.println(readRdd.first());
jsc.stop();

如前所述,使用 TransportClient(没有 Spark)的以下 Java 代码连接到 ES 没有问题,写入和读取工作正常:

    Client client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

ImmutableOpenMap<String, IndexMetaData> indices = client.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
for (ObjectCursor<IndexMetaData> value : indices.values()) {
log.info("Index: " + value.index + " : " + value.toString());
}

GetResponse response = client.prepareGet("index_name", "type_name", "1").get();
log.info(response.getIndex() + " : " + response.getId() + " : " + response.isExists());

String field_id = "6";
IndexRequest indexRequest = new IndexRequest("index_name", "type", "2")
.source(jsonBuilder()
.startObject()
.prettyPrint()
.field("field_id", field_id)
.field("another_field", "value")
.field("integer_field", 100)
.endObject());

UpdateRequest updateRequest = new UpdateRequest("index_name", "type_name", article_id)
.doc(jsonBuilder()
.startObject()
.prettyPrint()
.field("field_id", field_id)
.field("another_field", "value")
.field("integer_field", 100)
.endObject())
.upsert(indexRequest);

UpdateResponse responseUpdate = client.update(updateRequest).get();
log.info(responseUpdate.getIndex() + " : " + responseUpdate.getGetResult() + " : " + responseUpdate.getType());
client.close();

欢迎任何建议,因为我已经被困在这里好几天了,没有任何进一步的印象。我显然用 Google 搜索了这个问题并在 StackOverflow 上进行了搜索,但到目前为止我还没有找到我的问题的答案。

为了完整性,一些 Python 代码也可以使用 Spark 很好地读取和写入 ES。

conf = SparkConf()
conf = conf.setAppName('Test')
sc = SparkContext(conf=conf)

#Omitting some of the code in creating some_rdd on Spark:

index_and_type = index_name + '/type_name'
groovy_script = "if (ctx._source.%s) { ctx._source.%s+=value } else { ctx._source.%s=value }" % (field, field, field)

es_db_connection_dictionary = {
"es.nodes": db_hosts,
"es.port": db_port,
"es.resource": index_and_type,
"es.write.operation": "upsert",
"es.mapping.id": "field_id",
"es.update.script": groovy_script,
"es.update.script.params": "value:%s" % integer_field,
"es.http.timeout": "10s"
}


es_input = views_tuple_rdd.map(lambda item: (item[0],
{
'field_id': item[0],
"integer_field": item[1],
"another_field": client_name,
}))

es_input.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_db_connection_dictionary)

最佳答案

通常情况下,如果您使用的是 elasticsearch-spark 连接器,则如果默认端口为 9200,则无需使用端口 9300。它的行为与常规 elasticsearch API 不同。

而且您似乎还使用了与 elasticsearch 不兼容的连接器版本。这是一个常见的错误,因为大多数情况下它们主要在 2.x 中。

我相信 elasticsearch 5.x 不会出现这种情况,他们已将所有其他弹性产品版本与之对齐。

关于java - 使用 Hadoop 从 Spark 连接到 ElasticSearch 不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38771569/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com