- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要按特定列对一组 csv 行进行分组,并对每个组进行一些处理。
JavaRDD<String> lines = sc.textFile
("somefile.csv");
JavaPairRDD<String, String> pairRDD = lines.mapToPair(new SomeParser());
List<String> keys = pairRDD.keys().distinct().collect();
for (String key : keys)
{
List<String> rows = pairRDD.lookup(key);
noOfVisits = rows.size();
country = COMMA.split(rows.get(0))[6];
accessDuration = getAccessDuration(rows,timeFormat);
Map<String,Integer> counts = getCounts(rows);
whitepapers = counts.get("whitepapers");
tutorials = counts.get("tutorials");
workshops = counts.get("workshops");
casestudies = counts.get("casestudies");
productPages = counts.get("productpages");
}
private static long dateParser(String dateString) throws ParseException {
SimpleDateFormat format = new SimpleDateFormat("MMM dd yyyy HH:mma");
Date date = format.parse(dateString);
return date.getTime();
}
dateParser is called for each row. Then min and max for the group is calculated to get the access duration. Others are string matches.
pairRDD.lookup 非常慢..有没有更好的方法用 Spark 来做到这一点。
最佳答案
我认为您可以简单地使用该列作为键并执行groupByKey
。没有提及对这些行的操作。如果它是一个以某种方式组合这些行的函数,您甚至可以使用reduceByKey
。
类似于:
import org.apache.spark.SparkContext._ // implicit pair functions
val pairs = lines.map(parser _)
val grouped = pairs.groupByKey
// here grouped is of the form: (key, Iterator[String])
* 编辑*查看该过程后,我认为将每一行映射到它贡献的数据,然后使用aggregateByKey将它们全部减少到总数会更有效。aggregateByKey
需要 2 个函数和一个零:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
第一个函数是分区聚合器,将有效地运行本地分区,为每个分区创建本地聚合部分。组合操作将获取这些部分聚合并将它们组合在一起以获得最终结果。
类似这样的事情:
val lines = sc.textFile("somefile.csv")
// parse returns a key and a decomposed Record of values tracked:(key, Record("country", timestamp,"whitepaper",...))
val records = lines.map(parse(_))
val totals = records.aggregateByKey((0,Set[String].empty,Long.MaxValue, Long.MinValue, Map[String,Int].empty),
(record, (count, countrySet, minTime, maxTime, counterMap )) => (count+1,countrySet + record.country, math.min(minTime,record.timestamp), math.max(maxTime, record.timestamp), ...)
(cumm1, cumm2) => ??? // add each field of the cummulator
)
这是 Spark 中执行基于键的聚合的最有效方法。
关于java - 带分组的 Spark 数据处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26611471/
您好,我正在处理 BIRT 报告。我有一个查询,我必须对父级的重复数据进行分组,但子级也不能分组! 在我的查询中: item 是父项,item_ledger_entry 是子项。我有来自 item.N
我正在使用 GA API。 这是针对 MCF 目标报告(底部)的标准目标完成指标表(顶部) 看一下这个: 总数加起来 (12,238),但看看按 channel 分组的分割有多么不同!我以为这些会很接
我正在开发一个流量计数器,我想获得 IP 和重复计数,但是如何? 就像是 :select ip, count(ip) from Redirect 返回 : null total ip count 重定
我尝试编写一个正则表达式来匹配条件表达式,例如: a!=2 1+2=2+a 我尝试提取运算符。我当前的正则表达式是“.+([!=<>]+).+” 但问题是匹配器总是尝试匹配组中可能的最短字符串
在 MS Transact SQL 中,假设我有一个这样的表(订单): Order Date Order Total Customer # 09/30/2008 8
我想按 m.ID 分组,并对每个 m.id 求和 (pm.amount_construction* prod.anzahl) 实际上我有以下结果: Meterial_id | amount_const
我想根据多列中的值对值进行分组。这是一个例子: 我想得到输出: {{-30,-50,20},{-20,30,60},{-30,NULL or other value, 20}} 我设法到达: SELE
我正在尝试找出运行此查询的最佳方式。我基本上需要返回在我们的系统中只下了一个订单的客户的“登录”字段列表(登录字段基本上是客户 ID/ key )。 我们系统的一些背景...... 客户在同一日期下的
给定以下mysql结果集: id code name importance '1234', 'ID-CS-B', 'Chocolate Sauce'
大家好,我的数据框中有以下列: LC_REF 1 DT 16 2C 2 DT 16 2C 3 DT 16 2C 1 DT 16 3C 6 DT 16 3C 3
我有这样的 mongoDB 集合 { "_id" : "EkKTRrpH4FY9AuRLj", "stage" : 10, }, { "_id" : "EkKTRrpH4FY9
假设我有一组数据对,其中 index 0 是值,index 1 是类型: input = [ ('11013331', 'KAT'), ('9085267',
java中用stream进行去重,排序,分组 一、distinct 1. 八大基本数据类型 List collect = ListUtil.of(1, 2, 3, 1, 2).stream().fil
基本上,我从 TABLE_A 中的这个开始 France - 100 France - 200 France - 300 Mexico - 50 Mexico - 50 Mexico - 56 Pol
我希望这个正则表达式 ([A-Z]+)$ 将选择此示例中的最后一次出现: AB.012.00.022ABC-1 AB.013.00.022AB-1 AB.014.00.022ABAB-1 但我没有匹配
我创建了一个数据透视表,但数据没有组合在一起。 任何人都可以帮助我获得所需的格式吗? 我为获取数据透视表而编写的查询: DECLARE @cols AS NVARCHAR(MAX), -- f
我想按时间段(月,周,日,小时,...)选择计数和分组。例如,我想选择行数并将它们按 24 小时分组。 我的表创建如下。日期是时间戳。 CREATE TABLE MSG ( MSG_ID dec
在 SQL Server 2005 中,我有一个包含如下数据的表: WTN------------Date 555-111-1212 2009-01-01 555-111-1212 2009-
题 假设我有 k 个标量列,如果它们沿着每列彼此在一定距离内,我想对它们进行分组。 假设简单 k 是 2 并且它们是我唯一的列。 pd.DataFrame(list(zip(sorted(choice
问题 在以下数据框中 df : import random import pandas as pd random.seed(999) sz = 50 qty = {'one': 1, 'two': 2
我是一名优秀的程序员,十分优秀!