gpt4 book ai didi

elasticsearch - pyspark-将dstream写入elasticsearch时出错

转载 作者:行者123 更新时间:2023-12-03 01:44:15 25 4
gpt4 key购买 nike

我在将数据从Spark Streaming(PySpark)索引到Elasticserach时遇到问题。数据的类型为dstream。下面看起来

(u'01B', 0)
(u'1A5', 1)
....

这是我正在使用的 flex 索引:index = clus和type = data
GET /clus/_mapping/data
{
"clus": {
"mappings": {
"data": {
"properties": {
"content": {
"type": "text"
}
}
}
}
}
}

这是我的代码:
ES_HOST = {
"host" : "localhost",
"port" : 9200
}

INDEX_NAME = 'clus'
TYPE_NAME = 'data'
ID_FIELD = 'responseID'

# create ES client
es = Elasticsearch(hosts = [ES_HOST])

# some config before sending to elastic
if not es.indices.exists(INDEX_NAME):
request_body = {
"settings" : {
"number_of_shards": 1,
"number_of_replicas": 0
}
}
res = es.indices.create(index = INDEX_NAME, body = request_body)
es_write_conf = {
"es.nodes": "localhost",
"es.port": "9200",
"es.resource": INDEX_NAME+"/"+TYPE_NAME
}
sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)

# .....
#loading data to put in elastic : lines4

lines4.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf))




ssc.start()
ssc.awaitTermination()

这是错误:

17/07/25 15:31:31 ERROR Executor: Exception in task 2.0 in stage 11.0 (TID 23) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse; Bailing out.. at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220) at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1295) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 17/07/25 15:31:31 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 21) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse; Bailing out.. at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220) at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1295) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 17/07/25 15:31:31 ERROR Executor: Exception in task 1.0 in stage 11.0 (TID 22) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse; Bailing out.. at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220) at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1295) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

最佳答案

您创建索引的方式似乎有误。创建索引时,您需要在请求的mapping中发送body。这是一个工作示例:

from elasticsearch import Elasticsearch 

es = Elasticsearch(["http://localhost:9200"])
# create index
index_name = "clus"
index_mapping = {
"clus": {
"mappings": {
"data": {
"properties": {
"content": {
"type": "text"
}
}
}
}
}
}


if not es.indices.exists(index_name):
res = es.indices.create(index=index_name, body=index_mapping)
print res

您应该将此 {u'acknowledged': True}作为答复,以确认您已创建索引。

然后,使用foreachRDD遍历数据dstream,并应用一个函数,该函数会将数据转换为json结构 {"content": str((u'1A5', 1))}并对其进行索引,如下所示
doc = {"content": str((u'1A5', 1))}
res = es.index(index="clus", doc_type='data', body=doc)

附带说明一下,不建议将数据作为列表 (u'1A5', 1)进行索引,否则您将很难在其他上下文中使用它,例如在kibana上进行可视化。

关于elasticsearch - pyspark-将dstream写入elasticsearch时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45305721/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com