gpt4 book ai didi

json - 使用 Spark 2 从 json 解析纪元毫秒

转载 作者:行者123 更新时间:2023-12-01 01:51:18 25 4
gpt4 key购买 nike

有没有人使用 from_json 解析毫秒时间戳在 Spark 2+ 中?它是怎么做的?

所以Spark changed TimestampType将纪元数值解析为秒而不是 v2 中的毫秒。

我的输入是一个 hive 表,在我试图这样解析的列中有一个 json 格式的字符串:

val spark = SparkSession
.builder
.appName("Problematic Timestamps")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val schema = StructType(
StructField("categoryId", LongType) ::
StructField("cleared", BooleanType) ::
StructField("dataVersion", LongType) ::
StructField("details", DataTypes.createArrayType(StringType)) ::

StructField("timestamp", TimestampType) ::
StructField("version", StringType) :: Nil
)
val item_parsed =
spark.sql("select * FROM source.jsonStrInOrc")
.select('itemid, 'locale,
from_json('internalitem, schema)
as 'internalitem,
'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
.select('itemid, 'locale,
$"internalitem.*",
'version as'outer_version, 'createdat, 'modifiedat)

这可以解析包含以下内容的列的行:

{"timestamp": 1494790299549, "cleared": false, "version": "V1", "dataVersion": 2, "categoryId": 2641, "details": [], …}



这给了我 timestamp字段,如 49338-01-08 00:39:09.0来自值 1494790299549我宁愿读为: 2017-05-14 19:31:39.549
现在我可以将时间戳的架构设置为长,然后将值除以 1000 并转换为时间戳,但随后我会得到 2017-05-14 19:31:39.000不是 2017-05-14 19:31:39.549 .我无法弄清楚我该怎么做:
  • 告诉 from_json解析毫秒时间戳(可能通过以某种方式对 TimestampType 进行子类化以在架构中使用)
  • 使用 LongType在架构中并将其转换为 保留毫秒的时间戳 .

  • UDF 附录

    我发现尝试在选择中进行除法然后进行转换对我来说看起来并不干净,尽管这是一种完全有效的方法。我选择了使用 java.sql.timestamp 的 UDF这实际上是以纪元毫秒为单位指定的。
    import java.sql.Timestamp

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{explode, from_json, udf}
    import org.apache.spark.sql.types.
    {BooleanType, DataTypes, IntegerType, LongType,
    StringType, StructField, StructType, TimestampType}

    val tsmillis = udf { t: Long => new Timestamp (t) }

    val spark = SparkSession
    .builder
    .appName("Problematic Timestamps")
    .enableHiveSupport()
    .getOrCreate()
    import spark.implicits._
    val schema = StructType(
    StructField("categoryId", LongType) ::
    StructField("cleared", BooleanType) ::
    StructField("dataVersion", LongType) ::
    StructField("details", DataTypes.createArrayType(StringType)) ::

    StructField("timestamp", LongType) ::
    StructField("version", StringType) :: Nil
    )
    val item_parsed =
    spark.sql("select * FROM source.jsonStrInOrc")
    .select('itemid, 'locale,
    from_json('internalitem, schema)
    as 'internalitem,
    'version, 'createdat, 'modifiedat)
    val item_flattened = item_parsed
    .select('itemid, 'locale,
    $"internalitem.categoryId", $"internalitem.cleared",
    $"internalitem.dataVersion", $"internalitem.details",
    tsmillis($"internalitem.timestamp"),
    $"internalitem.version",
    'version as'outer_version, 'createdat, 'modifiedat)

    看看它是如何选择的。
    我认为进行性能测试以查看是否使用 withcolumn 是值得的。除法和类型转换比 udf 更快.

    最佳答案

    Now I could set the schema for timestamp to be a long, then divide the value by 1000


    实际上这正是您所需要的,只需保持正确的类型即可。假设您只有 Long timestamp field :
    val df = spark.range(0, 1).select(lit(1494790299549L).alias("timestamp"))
    // df: org.apache.spark.sql.DataFrame = [timestamp: bigint]
    如果除以 1000:
    val inSeconds = df.withColumn("timestamp_seconds", $"timestamp" / 1000)
    // org.apache.spark.sql.DataFrame = [timestamp: bigint, timestamp_seconds: double]
    您将获得以秒为单位的双倍时间戳(请注意,这是 SQL,而不是 Scala 行为)。
    剩下的就是 cast ( Spark < 3.1 )
    inSeconds.select($"timestamp_seconds".cast("timestamp")).show(false)
    // +-----------------------+
    // |timestamp_seconds |
    // +-----------------------+
    // |2017-05-14 21:31:39.549|
    // +-----------------------+
    或 ( Spark >= 3.1 ) timestamp_seconds (或直接 timestamp_millis)
    import org.apache.spark.sql.functions.{expr, timestamp_seconds}

    inSeconds.select(timestamp_seconds($"timestamp_seconds")).show(false)

    // +------------------------------------+
    // |timestamp_seconds(timestamp_seconds)|
    // +------------------------------------+
    // |2017-05-14 21:31:39.549 |
    // +------------------------------------+

    df.select(expr("timestamp_millis(timestamp)")).show(false)
    // +---------------------------+
    // |timestamp_millis(timestamp)|
    // +---------------------------+
    // |2017-05-14 21:31:39.549 |
    // +---------------------------+

    关于json - 使用 Spark 2 从 json 解析纪元毫秒,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44314199/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com