gpt4 book ai didi

apache-spark - 我如何将时间戳作为额外的列添加到我的数据框

转载 作者:行者123 更新时间:2023-12-03 15:59:28 25 4
gpt4 key购买 nike

*大家好,

我对大家有一个简单的问题。
我有一个使用createStream方法从kafka流创建的RDD。
现在我想在转换为数据帧之前将时间戳记作为此rdd的值添加。
我尝试使用withColumn()向数据框添加值,但返回此错误*

val topicMaps = Map("topic" -> 1)
val now = java.util.Calendar.getInstance().getTime()

val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

messages.foreachRDD(rdd =>
{

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val dataframe = sqlContext.read.json(rdd.map(_._2))



val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))

val d =dataframe.withColumn("timeStamp_column",dataframe.col("now")) org.apache.spark.sql.AnalysisException: Cannot resolve column name "now" among (action, device_os_ver, device_type, event_name, item_name, lat, lon, memberid, productUpccd, tenantid); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:15



正如我所知道的那样,DataFrames不能更改,因为它们是不可变的,但是RDD也是不可变的。
那么什么是最好的方式做到这一点。
如何将值添加到RDD(将时间戳动态添加到RDD)。

最佳答案

尝试使用current_timestamp函数。

import org.apache.spark.sql.functions.current_timestamp    
df.withColumn("time_stamp", current_timestamp())

关于apache-spark - 我如何将时间戳作为额外的列添加到我的数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41544253/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com