- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个数据集如下:
id email Date_of_purchase time_of_purchase
1 abc@gmail.com 11/10/18 12:10 PM
2 abc@gmail.com 11/10/18 02:11 PM
3 abc@gmail.com 11/10/18 03:14 PM
4 abc@gmail.com 11/11/18 06:16 AM
5 abc@gmail.com 11/11/18 09:10 AM
6 def@gmail.com 11/10/18 12:17 PM
7 def@gmail.com 11/10/18 03:24 PM
8 def@gmail.com 11/10/18 08:16 PM
9 def@gmail.com 11/10/18 09:13 PM
10 def@gmail.com 11/11/18 12:01 AM
我想计算每个电子邮件 ID 在 4 小时内进行的交易数量。例如,电子邮件 ID:abc@gmail.com 从 11/10/18 12.10 PM 到 11/10/18 4.10 PM 进行了 3 笔交易,从 11/11/18 6.16 AM 到 11/11/18 进行了 2 笔交易上午 10.16。电子邮件 ID:def@gmail.com 从 11/10/18 12.17 PM 到 11/10/18 4.17 PM 进行了 2 笔交易,从 11/10/18 8.16 PM 到 11/11/18 12.16 AM 进行了 3 笔交易。
我想要的输出是:
email hour_interval purchase_in_4_hours
abc@gmail.com [11/10/18 12.10 PM to 11/10/18 4.10 PM] 3
abc@gmail.com [11/11/18 6.16 AM to 11/11/18 10.16 AM] 2
def@gmail.com [11/10/18 12.17 PM to 11/10/18 4.17 PM] 2
def@gmail.com [11/10/18 8.16 PM to 11/11/18 12.16 AM] 3
我的数据集有 1000k 行。我对 Spark 很陌生。任何帮助将不胜感激。附:时间间隔可以从 4 小时更改为 1 小时、6 小时、1 天等。
TIA。
最佳答案
其想法是通过电子邮件对数据进行分区,按日期和时间在每个分区内进行排序,然后将每个分区映射到所需的输出。如果每个分区的数据(=一个电子邮件地址的数据)适合一个 Spark 执行器的内存,则此方法将起作用。
实际的 Spark 逻辑遵循以下步骤
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from datetime import datetime, timedelta
spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.read.option("header", "true").csv(<path>) #or any other data source
df = df.withColumn("date_time", to_timestamp(concat(col("Date_of_purchase"), lit(" "), col("time_of_purchase")), "MM/dd/yy hh:mm aa")) \
.drop("Date_of_purchase", "time_of_purchase") \
.repartition(col("email")) \
.sortWithinPartitions(col("email"), col("date_time"))
def process_partition(df_chunk):
row_list = list(df_chunk)
if len(row_list) == 0:
return
email = row_list[0]['email']
start = row_list[0]['date_time']
end = start + timedelta(hours=4)
count = 0
for row in row_list:
if email == row['email'] and end > row['date_time']:
count = count +1
else:
yield Row(email, start, end, count)
email = row['email']
start = row['date_time']
end = start + timedelta(hours=4)
count = 1
yield Row(email, start, end, count)
result = df.rdd.mapPartitions(process_partition).toDF(["email", "from", "to", "count"])
result.show()
输出:
+-------------+-------------------+-------------------+-----+
| email| from| to|count|
+-------------+-------------------+-------------------+-----+
|def@gmail.com|2018-11-10 12:17:00|2018-11-10 16:17:00| 2|
|def@gmail.com|2018-11-10 20:16:00|2018-11-11 00:16:00| 3|
|abc@gmail.com|2018-11-10 12:10:00|2018-11-10 16:10:00| 3|
|abc@gmail.com|2018-11-11 06:16:00|2018-11-11 10:16:00| 2|
+-------------+-------------------+-------------------+-----+
要更改周期长度,可以将 timedelta
设置为任何值。
关于python - Pyspark - 如何将 '4 hours' 多个窗口分组聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58221350/
我有一个 Cassandra 集群,里面有 4 个表和数据。 我想使用聚合函数(sum,max ...)发出请求,但我在这里读到这是不可能的: http://www.datastax.com/docu
我有以下两张表 Table: items ID | TITLE 249 | One 250 | Two 251 | Three 我投票给这些: Table: votes VID | IID | u
这个问题在这里已经有了答案: Update MongoDB field using value of another field (12 个答案) 关闭 3 年前。 我想根据另一个“源”集合的文档中
我的收藏包含以下文件。我想使用聚合来计算里面有多少客户,但我遇到了一些问题。我可以获得总行数,但不能获得总(唯一)客户。 [{ _id: "n001", channel: "Kalip
我有下表 Id Letter 1001 A 1001 H 1001 H 1001 H 1001 B 1001 H 1001 H 1001
得到一列的表 ABC。 “创建”的日期列。所以样本值就像; created 2009-06-18 13:56:00 2009-06-18 12:56:00 2009-06-17 14:02:0
我有一个带有数组字段的集合: {[ name:String buyPrice:Int sellPrice:Int ]} 我试图找到最低和最高买入/卖出价格。在某些条目中,买入或卖出价格为零
我有以下问题: 在我的 mongo db 中,我有以下结构: { "instanceId": "12", "eventId": "0-1b", "activityType":
下面给出的是我要在其上触发聚合查询的 Elasticsearch 文档。 { "id": 1, "attributes": [ { "fieldId": 1,
我正在使用 Django 的 aggregate query expression总计一些值。最终值是一个除法表达式,有时可能以零作为分母。如果是这种情况,我需要一种方法来逃避,以便它只返回 0。 我
我正在学习核心数据,特别是聚合。 当前我想要做的事情:计算表中在某些条件上具有逆关系的多对关系的记录数。 目前我正在这样做: NSExpression *ex = [NSExpression expr
我需要有关 Delphi 中的 ClientDatasets 的一些帮助。 我想要实现的是一个显示客户的网格,其中一列显示每个客户的订单数量。我将 ClientDataset 放在表单上并从 Delp
我的集合有 10M 个文档,并且有一个名为 movieId 的字段;该文档具有以下结构: { "_id" : ObjectId("589bed43e3d78e89bfd9b779"), "us
这个问题已经有答案了: What is the difference between association, aggregation and composition? (21 个回答) 已关闭 9
我在 elasticsearch 中有一些类似于这些示例的文档: { "id": ">", "list": [ "a", "b", "c" ] } { "id"
我正在做一些聚合。但是结果完全不是我所期望的,似乎它们没有聚合索引中与我的查询匹配的所有文档,在这种情况下 - 它有什么好处? 例如,首先我做这个查询: {"index":"datalayer","t
假设我在 ES 中有这些数据。 | KEY | value | |:-----------|------------:| | A |
可能在我的文档中,我有一个被分析的文本字段。我只是在ElasticSearch AggregationAPI中迷路了。我需要2种不同情况的支持: 情况A)结果是带有计数标记(条款)的篮子下降。 情况B
我正在为网上商店构建多面过滤功能,如下所示: Filter on Brand: [ ] LG (10) [ ] Apple (5) [ ] HTC (3) Filter on OS: [ ] Andr
我有一个父/子关系并且正在搜索 child 。 是否可以在父属性上创建聚合? 例如parent 是 POST,children 是 COMMENT。如果父项具有“类别”属性,是否可以搜索 COMMEN
我是一名优秀的程序员,十分优秀!