gpt4 book ai didi

elasticsearch - Python spark Dataframe 到 Elasticsearch

转载 作者:行者123 更新时间:2023-11-29 02:51:55 25 4
gpt4 key购买 nike

我不知道如何使用来自 spark 的 python 将数据框写入 elasticsearch。我按照 here 中的步骤操作.

这是我的代码:

# Read file
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='true') \
.load('/vagrant/data/input/input.csv', schema = customSchema)

df.registerTempTable("data")

# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")

es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)

上面的代码给出了

Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

我还从以下位置开始编写脚本: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar/vagrant/scripts/aggregation。 py 以确保加载 elasticsearch-hadoop

最佳答案

对于初学者,saveAsNewAPIHadoopFile 需要一个 RDD(key, value) 对,在你的例子中是 this may happen only accidentally .这同样适用于您声明的值格式。

我对 Elastic 不熟悉,但仅根据您可能应该尝试类似操作的论点:

kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)

由于 Elastic-Hadoop 提供了 SQL 数据源,您应该也可以跳过它并直接保存数据:

df.write.format("org.elasticsearch.spark.sql").save(...)

关于elasticsearch - Python spark Dataframe 到 Elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39559121/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com