- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有两个数据帧,我想基于一列加入它们,但需要注意的是,该列是一个时间戳,并且该时间戳必须在某个偏移量(5 秒)内才能加入记录。更具体地说,记录在 dates_df
与 date=1/3/2015:00:00:00
应该加入 events_df
与 time=1/3/2015:00:00:01
因为两个时间戳彼此相差 5 秒以内。
我试图让这个逻辑与 python spark 一起工作,这非常痛苦。人们如何在 Spark 中进行这样的连接?
我的方法是向 dates_df
添加两列额外的列。这将确定 lower_timestamp
和 upper_timestamp
以 5 秒偏移为界,并执行条件连接。这就是它失败的地方,更具体地说:
joined_df = dates_df.join(events_df,
dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)
joined_df.explain()
Filter (time#6 < upper_timestamp#4)
CartesianProduct
....
from datetime import datetime, timedelta
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
master = 'local[*]'
app_name = 'stackoverflow_join'
conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
def lower_range_func(x, offset=5):
return x - timedelta(seconds=offset)
def upper_range_func(x, offset=5):
return x + timedelta(seconds=offset)
lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())
dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)
dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)
dates_df.show()
# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
withColumn('upper_timestamp', upper_range(dates_df['date']))
event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)
events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)
events_df.show()
# finally, join the data
joined_df = dates_df.join(events_df,
dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)
joined_df.show()
+-----+--------------------+
| name| date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+
+--------------------+-------+
| time| event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+
+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name| date| lower_timestamp| upper_timestamp| time| event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
最佳答案
我确实用 explain()
触发了 SQL 查询看看它是如何完成的,并在 python 中复制了相同的行为。首先是如何对 SQL spark 执行相同的操作:
dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and events.time < dates.upper_timestamp")
results.explain()
joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)
joined_df.explain()
产生与 sql spark
results.explain()
相同的查询所以我认为这就是事情的完成方式。
关于在python中按时加入两个 Spark 数据帧(TimestampType),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30630296/
我正在尝试创建自己的 Hibernate 映射以使用 Hibernate 将 UTC 时间戳读写到 DB,因此我正在扩展 Hibernate TimestampType 类,如下所示,但我总是收到以下
我有两个数据帧,我想基于一列加入它们,但需要注意的是,该列是一个时间戳,并且该时间戳必须在某个偏移量(5 秒)内才能加入记录。更具体地说,记录在 dates_df与 date=1/3/2015:00:
我有一个数据框,我想将其插入到 Spark 中的 Postgresql 中。在 spark 中,DateTimestamp 列为字符串格式。在 postgreSQL 中,它是没有时区的 TimeSta
我是 spark 和 scala 的新手。正在尝试读取文本文件并将其保存为 parquet 文件。对我来说,我正在使用的字段之一是 TimeStamp,它的文档说 spark1.1.0 支持 java
我有一些包含在字符串数组中的数据,如下所示(仅作为示例): val myArray = Array("1499955986039", "1499955986051", "1499955986122")
有一个包含事件和特定时间戳的表。我很难使用 Pyspark 2.0 API 计算过去的天数。当时间戳遵循另一种格式(yyyy-mm-dd)时,我设法做同样的事情 +--------------
我有以下在 Pyspark 中无法完全理解的问题。我有以下日期时间对象 utc_now = datetime.now().replace(tzinfo=tz.tzutc()) utc_now # da
当我尝试将字符串字段转换为 Spark DataFrame 中的 TimestampType 时,输出值具有微秒精度(yyyy-MM-dd HH:mm:ss.S)。但我需要格式为 yyyy-MM-dd
在我的数据框中,我有一列 TimestampType 格式为“2019-03-16T16:54:42.968Z”的列,我想将其转换为格式为“201903161654”的 StringType 列,而不
在我的数据框中,我有一列 TimestampType 格式为“2019-03-16T16:54:42.968Z”的列,我想将其转换为格式为“201903161654”的 StringType 列,而不
我有一个看起来像这样的 DataFrame。我想在 date_time 字段的当天进行操作。 root |-- host: string (nullable = true) |-- user_id
我有带有 take(5) 顶行的 Spark DataFrame,如下所示: [Row(date=datetime.datetime(1984, 1, 1, 0, 0), hour=1, value=
我正在尝试从以下列表中创建一个数据框: data = [(1,'abc','2020-08-20 10:00:00', 'I'), (1,'abc','2020-08-20 10:01:00', 'U
我有一个 Pandas 数据框,我正在写入 HDFS 中的一个表。我可以在 Srum_Entry_Creation 时将数据写入表是 StringType() ,但我需要它是 TimestampTyp
我有这个包含以下内容的 csv 文件 (test.csv): COLUMN_STRING;COLUMN_INT;COLUMN_TIMESTAMP String_Value_1;123456;20131
我有一列使用 org.hibernate.type.TimestampType 在 hibernate 中映射。如何使用 native Oracle SQL 根据 Oracle TIMESTAMP 存
我一直在使用 pyspark 3.0。我在 StringType 中有一个包含“时间”列的数据框。我正在尝试将其转换为时间戳。数据框如下所示。 +---------------+ |
我在Databricks笔记本上使用Spark 2.1和Scala 2.11 确切的TimestampType是什么? 从SparkSQL's documentation知道,官方的时间戳类型是Tim
我正在阅读的 CSV 文件包含 3 列。以下是列的格式。 DateTime1 的格式为 "mm/dd/yyyy hh:mm:ss" DateTime2 格式为"dd/mm/yy hh:mm:ss" 日
我在连接到 cassandra 时遇到问题。它总是告诉我: Exception in thread "main" java.lang.RuntimeException: org.apache.cass
我是一名优秀的程序员,十分优秀!