gpt4 book ai didi

apache-spark - 使用结构化流将数组扩展到 spark 中的列

转载 作者:行者123 更新时间:2023-12-04 17:44:02 24 4
gpt4 key购买 nike

我有这个问题:

我正在使用结构化流从 Kafka 读取数据,数据是 CSV 行。当我从 Kafka 获取数据时,我有一个流式数据帧,其中 CSV 行位于“值”内,它是一个字节序列。

 sDF2 = sDF.selectExpr("CAST(value as string)").select( split("value",","))

使用它我有一个新的数据框,其中“值”是一个字符串,它是 CSV 行。

如何在解析 CSV 字段并将其拆分为数据帧列的情况下获取新数据帧?

例子: csv 行是“abcd,123,frgh,1321”

sDF schema, which contains the data downloaded from Kafka, is  
key, value, topic, timestamp etc... and here value is a byte sequence with no type

sDF2.schema has only a column ( named value of type string )

我喜欢新的数据框是

sDF3.col1 = abcd
sDF3.col2 = 123
sDF3.col3 = frgh ...etc

其中所有列都是字符串。

我仍然可以这样做:

 sDF3 = sDF2.select( sDF2.csv[0].alias("EventId").cast("string"),
sDF2.csv[1].alias("DOEntitlementId").cast("string"),
sDF2.csv[3].alias("AmazonSubscriptionId").cast("string"),
sDF2.csv[4].alias("AmazonPlanId").cast("string"),
... etc ...

但它看起来很丑。

最佳答案

我还没有尝试过,但是像这样的东西应该可以。

sDF2 = 
sDF.selectExpr("CAST(value as string)")
.alias("csv").select("csv.*")
.select("split(value,',')[0] as DOEntitlementId",
"split(value,',')[1] as AmazonSubscriptionId",
"split(value,',')[2] as AmazonPlanId")

关于apache-spark - 使用结构化流将数组扩展到 spark 中的列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53030128/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com