gpt4 book ai didi

elasticsearch - 如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引

转载 作者:行者123 更新时间:2023-11-29 02:47:33 28 4
gpt4 key购买 nike

Elasticsaerch 的文档仅涵盖将完整索引加载到 Spark。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()

如何执行查询以从 Elasticsearch 索引返回数据并使用 pyspark 将它们作为 DataFrame 加载到 Spark?

最佳答案

下面是我的做法。

一般环境设置和命令:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar

代码:

from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

q ="""{
"query": {
"filtered": {
"filter": {
"exists": {
"field": "label"
}
},
"query": {
"match_all": {}
}
}
}
}"""

es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "titanic/passenger",
"es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

您还可以定义数据框列。引用Here了解更多信息。

希望对您有所帮助!

关于elasticsearch - 如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38162901/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com