gpt4 book ai didi

java - 带分组的 Spark 数据处理

转载 作者:行者123 更新时间:2023-12-02 05:19:28 27 4
gpt4 key购买 nike

我需要按特定列对一组 csv 行进行分组,并对每个组进行一些处理。

    JavaRDD<String> lines = sc.textFile
("somefile.csv");
JavaPairRDD<String, String> pairRDD = lines.mapToPair(new SomeParser());
List<String> keys = pairRDD.keys().distinct().collect();
for (String key : keys)
{
List<String> rows = pairRDD.lookup(key);

noOfVisits = rows.size();
country = COMMA.split(rows.get(0))[6];
accessDuration = getAccessDuration(rows,timeFormat);
Map<String,Integer> counts = getCounts(rows);
whitepapers = counts.get("whitepapers");
tutorials = counts.get("tutorials");
workshops = counts.get("workshops");
casestudies = counts.get("casestudies");
productPages = counts.get("productpages");
}

private static long dateParser(String dateString) throws ParseException {
SimpleDateFormat format = new SimpleDateFormat("MMM dd yyyy HH:mma");
Date date = format.parse(dateString);
return date.getTime();
}
dateParser is called for each row. Then min and max for the group is calculated to get the access duration. Others are string matches.

pairRDD.lookup 非常慢..有没有更好的方法用 Spark 来做到这一点。

最佳答案

我认为您可以简单地使用该列作为键并执行groupByKey。没有提及对这些行的操作。如果它是一个以某种方式组合这些行的函数,您甚至可以使用reduceByKey

类似于:

import org.apache.spark.SparkContext._  // implicit pair functions
val pairs = lines.map(parser _)
val grouped = pairs.groupByKey
// here grouped is of the form: (key, Iterator[String])

* 编辑*查看该过程后,我认为将每一行映射到它贡献的数据,然后使用aggregateByKey将它们全部减少到总数会更有效。aggregateByKey 需要 2 个函数和一个零:

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

第一个函数是分区聚合器,将有效地运行本地分区,为每个分区创建本地聚合部分。组合操作将获取这些部分聚合并将它们组合在一起以获得最终结果。

类似这样的事情:

val lines = sc.textFile("somefile.csv")
// parse returns a key and a decomposed Record of values tracked:(key, Record("country", timestamp,"whitepaper",...))

val records = lines.map(parse(_))

val totals = records.aggregateByKey((0,Set[String].empty,Long.MaxValue, Long.MinValue, Map[String,Int].empty),
(record, (count, countrySet, minTime, maxTime, counterMap )) => (count+1,countrySet + record.country, math.min(minTime,record.timestamp), math.max(maxTime, record.timestamp), ...)
(cumm1, cumm2) => ??? // add each field of the cummulator
)

这是 Spark 中执行基于键的聚合的最有效方法。

关于java - 带分组的 Spark 数据处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26611471/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com