gpt4 book ai didi

apache-spark - Elasticsearch Spark 解析问题 - 无法解析字段 [Y] 的值 [X]

转载 作者:行者123 更新时间:2023-11-29 02:55:58 24 4
gpt4 key购买 nike

我正在使用 Spark 2.3 (Pyspark) 从 Elasticsearch 6.6 索引中读取数据。
Spark 作业正在尝试创建 df,但因解析问题而失败:

Spark 代码:

df = spark.read.format("org.elasticsearch.spark.sql").option("es.resource.read", index_name).option("es.nodes", hosts).load ()

错误信息:

org.elasticsearch.hadoop.rest.EsHadoopParsingException:无法解析字段 [GenerateTime] 的值 [2019/05/06 19:31:21]

我认为这在一定程度上是由于源日期格式未在公认的 ISO 8601 中造成的格式。

此外,在阅读 Time/Date Mapping docs ,我知道这可以通过创建映射来解决,但这只会影响新索引,不会更改历史索引的映射。

问题:

有没有办法解决这个问题,以便我可以通过 Spark 成功读取历史索引(例如,在可能需要的任何映射更改之前)?我也试过 .option("es.mapping.date.rich", False) 没有任何运气。

最佳答案

我已经根据您在 ES 6.4/Spark 2.1 版本中的数据创建了一个示例文档,并使用了以下代码,以便读取 GenerateTime 字段text 而不是 spark 中的日期类型。

ES中的映射

PUT somedateindex
{
"mappings": {
"mydocs":{
"properties": {
"GenerateTime": {
"type": "date",
"format": "yyyy/MM/dd HH:mm:ss"
}
}
}
}
}

注意该字段在ES中是date类型的。

Spark 代码将 ES 中的日期字段用作字符串

请注意,我使用了配置选项("es.mapping.date.rich", false)

    val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

val df = spark.read.format("org.elasticsearch.spark.sql")
.option("es.resource.read","somedateindex")
.option("es.nodes", "some_host_name")
.option("es.mapping.date.rich", false)
.option("es.port","9200")
.load()

df.show()
df.printSchema()

我的 Eclipse 控制台中的 Spark 代码结果:

19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
| GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+

root
|-- GenerateTime: string (nullable = true)

19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....

请注意,printSchema 显示该表有一个 GenerateTime 列,其类型为 string

如果您不想继续更改映射,上面的内容应该对您有所帮助。

我建议日期字段采用日期格式而不是文本格式,并且也采用 ISO-8601 支持的格式,这样当类型推断开始时,您最终会在 Spark 中获得正确类型的数据,您可以简单地专注于业务逻辑,很多时候正确的解决方案在于我们如何存储数据而不是我们如何处理数据。

将字符串转换为时间戳/日期的 Spark 代码

但是,如果由于某种原因您无法从源(即 elasticsearch)更改映射,您可以进一步添加以下代码,使用以下代码将字符串值转换为时间戳:

    import org.apache.spark.sql.functions._

//String into Timestamp Transformation
val df2_timestamp = df.withColumn("GenerateTime_timestamp", from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
df2_timestamp.show(false)
df2_timestamp.printSchema();

如果你运行上面的代码,你会看到如下输出:

19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+

root
|-- GenerateTime: string (nullable = true)
|-- GenerateTime_timestamp: timestamp (nullable = true)

19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook

另请注意,我的解决方案是在 Scala 中。如果有帮助,请告诉我!

关于apache-spark - Elasticsearch Spark 解析问题 - 无法解析字段 [Y] 的值 [X],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56092231/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com