gpt4 book ai didi

python - PySpark XML 到带时间序列数据的 JSON

转载 作者:太空宇宙 更新时间:2023-11-03 15:45:53 25 4
gpt4 key购买 nike

我有将近 50 万个包含时间序列数据的 XML 文件,每个文件大约 2-3MB,每个文件包含大约 10k 行时间序列数据。这个想法是将每个唯一 ID 的 XML 文件转换为 JSON。但是,每个ID的时间序列数据需要分解成行大小为10的批处理并转换为JSON并写入NoSQL数据库。最初,编写的代码是为每个 ID 迭代一个整体数据帧,并按行大小 10 递增,然后将文档写入数据库。

def resample_idx(X,resample_rate):
for idx in range(0,len(X),resample_rate):
yield X.iloc[idx:idx+resample_rate,:]

# Batch Documents
for idx, df_batch in enumerate(resample_idx(df,10))
dict_ = {}
dict_['id'] = soup.find('id').contents[0]
dict_['data'] = [v for k,v in pd.DataFrame.to_dict(df_batch.T).items()]

JSON 文档的示例如下所示:

{'id':123456A,
'data': [{'A': 251.23,
'B': 130.56,
'dtim': Timestamp('2011-03-24 11:18:13.350000')
},
{
'A': 253.23,
'B': 140.56,
'dtim': Timestamp('2011-03-24 11:19:21.310000')
},
.........
]
},
{'id':123593X,
'data': [{'A': 641.13,
'B': 220.51,
'C': 10.45
'dtim': Timestamp('2011-03-26 12:11:13.350000')
},
{
'A': 153.25,
'B': 810.16,
'C': 12.5
'dtim': Timestamp('2011-03-26 12:11:13.310000')
},
.........
]
}

这适用于小样本,但很快意识到这在创建批处理时无法扩展。因此,希望在 Spark 中复制它。使用 Spark 的经验有限,但这是我到目前为止所做的尝试:

首先获取所有ID的所有时间序列数据:

df = sqlContext.read.format("com.databricks.spark.xml").options(rowTag='log').load("dbfs:/mnt/timedata/")

XML 模式

 |-- _id: string (nullable = true)   
|-- collect_list(TimeData): array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- data: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- ColNames: string (nullable = true)
| | |-- Units: string (nullable = true)

获取 Spark DataFrame 的 SQL 查询 d = df.select("_id","TimeData.data",'TimeData.ColNames')

当前的 Spark 数据帧

+--------------------+--------------------+--------------------+
| id | data| ColNames|
+--------------------+--------------------+--------------------+
|123456A |[2011-03-24 11:18...|dTim,A,B |
|123456A |[2011-03-24 11:19...|dTim,A,B |
|123593X |[2011-03-26 12:11...|dTim,A,B,C |
|123593X |[2011-03-26 12:11...|dTim,A,B,C |
+--------------------+--------------------+--------------------+

预期的 Spark 数据帧

+--------------------+--------------------+----------+----------+
| id | dTime| A| B|
+--------------------+--------------------+----------+----------+
|123456A |2011-03-24 11:18... | 251.23| 130.56|
|123456A |2011-03-24 11:19... | 253.23| 140.56|
+--------------------+--------------------+----------+----------+

+--------------------+--------------------+----------+----------+----------+
| id | dTime| A| B| C|
+--------------------+--------------------+----------+----------+----------+
|123593X |2011-03-26 12:11... | 641.13| 220.51| 10.45|
|123593X |2011-03-26 12:11... | 153.25| 810.16| 12.5|
+--------------------+-------------------+---------- +----------+----------+

我在这里只显示了两个时间戳的数据,但是我如何才能将上面的 DataFrame 转换为每第 n 行(对于每个 id)的批处理 JSON 文件,类似于上面显示的使用 Pandas 完成的方式?最初的想法是执行 groupBy 并将 UDF 应用于每个 ID?输出类似于上面的 JSON 结构。

XML 结构:

<log>
<id>"ABC"</id>
<TimeData>
<colNames>dTim,colA,colB,colC,</colNames>
<data>2011-03-24T11:18:13.350Z,0.139,38.988,0,110.307</data>
<data>2011-03-24T11:18:43.897Z,0.138,39.017,0,110.307</data>
</TimeData>
</log>

请注意,每个 ID 没有固定数量的 coNames,范围可以在 5-30 之间,具体取决于为该 ID 收集的数据源。

最佳答案

嗯,根据信息,这可能是一个解决方案。不幸的是,我的 Python 有点生疏,但这里应该有所有 Scala 函数的等价物

// Assume nth is based of dTim ordering
val windowSpec = Window
.partitionBy($"_id")
.orderBy($"dTim".desc)

val nthRow = 2 // define the nthItem to be fetched

df.select(
$"_id",
$"TimeData.data".getItem(0).getItem(0).cast(TimestampType).alias("dTim"),
$"TimeData.data".getItem(0).getItem(1).cast(DoubleType).alias("A"),
$"TimeData.data".getItem(0).getItem(2).cast(DoubleType).alias("B"),
$"TimeData.data".getItem(0).getItem(3).cast(DoubleType).alias("C")
).withColumn("n", row_number().over(windowSpec))
.filter(col("n") === nthRow)
.drop("n")
.show()

会输出类似的东西

+-------+--------------------+------+------+-----+
| _id| dTim| A| B| C|
+-------+--------------------+------+------+-----+
|123456A|2011-03-24 11:18:...|251.23|130.56| null|
|123593X|2011-03-26 12:11:...|641.13|220.51|10.45|
+-------+--------------------+------+------+-----+

如果我知道更多,我会改进答案


更新

我喜欢这个谜题,所以如果我正确理解问题,这可能是一个解决方案:

我创建了 3 个 xml 文件,每 2 个数据记录总共有 2 个不同的 id

val df = spark
.sqlContext
.read
.format("com.databricks.spark.xml")
.option("rowTag", "log")
.load("src/main/resources/xml")


// Could be computationally heavy, maybe cache df first if possible, otherwise run it on a sample, otherwise hardcode possible colums
val colNames = df
.select(explode(split($"TimeData.colNames",",")).as("col"))
.distinct()
.filter($"col" =!= lit("dTim") && $"col" =!= "")
.collect()
.map(_.getString(0))
.toList
.sorted

// or list all possible columns
//val colNames = List("colA", "colB", "colC")


// Based on XML colNames and data are comma seprated strings that have to be split. Could be done using sql split function, but this UDF maps the columns to the correct field
def mapColsToData = udf((cols:String, data:Seq[String]) =>
if(cols == null || data == null) Seq.empty[Map[String, String]]
else {
data.map(str => (cols.split(",") zip str.split(",")).toMap)
}
)

// The result of this action is 1 record for each datapoint for all XML's. Each data record is key->value map of colName->data
val denorm = df.select($"id", explode(mapColsToData($"TimeData.colNames", $"TimeData.data")).as("data"))

denorm.show(false)

输出:

+-------+-------------------------------------------------------------------------------+
|id |data |
+-------+-------------------------------------------------------------------------------+
|123456A|Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0)|
|123593X|Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988) |
|123593X|Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017) |
|123456A|Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0)|
+-------+-------------------------------------------------------------------------------+
// now create column for each map value, based on predef / found columnNames
val columized = denorm.select(
$"id",
$"data.dTim".cast(TimestampType).alias("dTim"),
$"data"
)

columized.show()

输出:

+-------+--------------------+--------------------+
| id| dTim| data|
+-------+--------------------+--------------------+
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
+-------+--------------------+--------------------+
// create window over which to resample
val windowSpec = Window
.partitionBy($"id")
.orderBy($"dTim".desc)

val resampleRate = 2

// add batchId based on resample rate. Group by batch and
val batched = columized
.withColumn("batchId", floor((row_number().over(windowSpec) - lit(1)) / lit(resampleRate)))
.groupBy($"id", $"batchId")
.agg(collect_list($"data").as("data"))
.drop("batchId")

batched.show(false)

输出:

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |data |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123593X|[Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017), Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)] |
|123456A|[Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0), Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)]|
|123456A|[Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0), Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)]|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// Store as 1 huge json file (drop reapatrition if you can handle multiple json, better for master as well)
batched.repartition(1).write.mode(SaveMode.Overwrite).json("/tmp/xml")

输出json:

{"id":"123593X","data":[{"dTim":"2011-03-26T12:20:43.897+01:00","colA":"1.138","colB":"29.017"},{"dTim":"2011-03-26T12:20:13.350+01:00","colA":"1.139","colB":"28.988"}]}
{"id":"123456A","data":[{"dTim":"2011-03-27T13:18:43.897+02:00","colA":"0.128","colB":"35.017","colC":"0"},{"dTim":"2011-03-27T13:18:13.350+02:00","colA":"0.129","colB":"35.988","colC":"0"}]}
{"id":"123456A","data":[{"dTim":"2011-03-24T12:18:43.897+01:00","colA":"0.138","colB":"39.017","colC":"0"},{"dTim":"2011-03-24T12:18:13.350+01:00","colA":"0.139","colB":"38.988","colC":"0"}]}

关于python - PySpark XML 到带时间序列数据的 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49967984/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com