- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我编写了一个小型 Spark 应用程序,它应该测量 Spark 在分区 RDD 上运行操作所需的时间(combineByKey 函数对值求和)。
我的问题是,第一次迭代似乎工作正常(计算的持续时间约为 25 毫秒),但接下来的迭代花费的时间要少得多(约为 5 毫秒)。在我看来,Spark 在没有任何请求的情况下保留数据!?我可以通过编程方式避免这种情况吗?
我必须知道 Spark 计算新 RDD 所需的持续时间(没有任何缓存/保留早期迭代)--> 我认为持续时间应始终约为 20-25 毫秒!
为了确保重新计算,我将 SparkContext 生成移到了 for 循环中,但这并没有带来任何变化...
感谢您的建议!
我的代码似乎保留了任何数据:
public static void main(String[] args) {
switchOffLogging();
// jetzt
try {
// Setup: Read out parameters & initialize SparkContext
String path = args[0];
SparkConf conf = new SparkConf(true);
JavaSparkContext sc;
// Create output file & writer
System.out.println("\npar.\tCount\tinput.p\tcons.p\tTime");
// The RDDs used for the benchmark
JavaRDD<String> input = null;
JavaPairRDD<Integer, String> pairRDD = null;
JavaPairRDD<Integer, String> partitionedRDD = null;
JavaPairRDD<Integer, Float> consumptionRDD = null;
// Do the tasks iterative (10 times the same benchmark for testing)
for (int i = 0; i < 10; i++) {
boolean partitioning = true;
int partitionsCount = 8;
sc = new JavaSparkContext(conf);
setS3credentials(sc, path);
input = sc.textFile(path);
pairRDD = mapToPair(input);
partitionedRDD = partition(pairRDD, partitioning, partitionsCount);
// Measure the duration
long duration = System.currentTimeMillis();
// Do the relevant function
consumptionRDD = partitionedRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
duration = System.currentTimeMillis() - duration;
// So some action to invoke the calculation
System.out.println(consumptionRDD.collect().size());
// Print the results
System.out.println("\n" + partitioning + "\t" + partitionsCount + "\t" + input.partitions().size() + "\t" + consumptionRDD.partitions().size() + "\t" + duration + " ms");
input = null;
pairRDD = null;
partitionedRDD = null;
consumptionRDD = null;
sc.close();
sc.stop();
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}
一些辅助函数(应该不是问题所在):
private static void switchOffLogging() {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
}
private static void setS3credentials(JavaSparkContext sc, String path) {
if (path.startsWith("s3n://")) {
Configuration hadoopConf = sc.hadoopConfiguration();
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
hadoopConf.set("fs.s3n.awsAccessKeyId", "mycredentials");
hadoopConf.set("fs.s3n.awsSecretAccessKey", "mycredentials");
}
}
// Initial element
private static Function<String, Float> createCombiner = new Function<String, Float>() {
public Float call(String dataSet) throws Exception {
String[] data = dataSet.split(",");
float value = Float.valueOf(data[2]);
return value;
}
};
// merging function for a new dataset
private static Function2<Float, String, Float> mergeValue = new Function2<Float, String, Float>() {
public Float call(Float sumYet, String dataSet) throws Exception {
String[] data = dataSet.split(",");
float value = Float.valueOf(data[2]);
sumYet += value;
return sumYet;
}
};
// function to sum the consumption
private static Function2<Float, Float, Float> mergeCombiners = new Function2<Float, Float, Float>() {
public Float call(Float a, Float b) throws Exception {
a += b;
return a;
}
};
private static JavaPairRDD<Integer, String> partition(JavaPairRDD<Integer, String> pairRDD, boolean partitioning, int partitionsCount) {
if (partitioning) {
return pairRDD.partitionBy(new HashPartitioner(partitionsCount));
} else {
return pairRDD;
}
}
private static JavaPairRDD<Integer, String> mapToPair(JavaRDD<String> input) {
return input.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String debsDataSet) throws Exception {
String[] data = debsDataSet.split(",");
int houseId = Integer.valueOf(data[6]);
return new Tuple2<Integer, String>(houseId, debsDataSet);
}
});
}
最后是 Spark 控制台的输出:
part. Count input.p cons.p Time
true 8 6 8 20 ms
true 8 6 8 23 ms
true 8 6 8 7 ms // Too less!!!
true 8 6 8 21 ms
true 8 6 8 13 ms
true 8 6 8 6 ms // Too less!!!
true 8 6 8 5 ms // Too less!!!
true 8 6 8 6 ms // Too less!!!
true 8 6 8 4 ms // Too less!!!
true 8 6 8 7 ms // Too less!!!
最佳答案
我现在找到了一个解决方案:我编写了一个单独的类,它在新进程上调用 spark-submit 命令。这可以在一个循环中完成,因此每个基准测试都在一个新线程中启动,并且每个进程的 sparkContext 也是分开的。所以垃圾收集已经完成,一切正常!
String submitCommand = "/root/spark/bin/spark-submit " + submitParams + " -- class partitioning.PartitionExample /root/partitioning.jar " + javaFlags;
Process p = Runtime.getRuntime().exec(submitCommand);
BufferedReader reader;
String line;
System.out.println(p.waitFor());
reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
while ((line = reader.readLine())!= null) {
System.out.println(line);
}
关于java - 如何测量 Spark 需要在分区 RDD 上运行操作的时间?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30626003/
是一种在 Neo4j 分区之间进行物理分离的方法吗? 这意味着以下查询将转到 node1: Match (a:User:Facebook) 虽然此查询将转到另一个节点(可能托管在 docker 上)
我尝试在我的 SQL 服务器上使用分区函数对我的一个大表进行分区,但我收到一条错误消息 “只能在SQL Server企业版中创建分区功能。只有SQL Server企业版支持分区。” 所以我想知道没有企
在hadoop文件系统中,我有两个文件,分别是X和Y。通常,hadoop制作的文件X和Y的大小为64 MB。是否可以强制hadoop划分两个文件,以便从X的32 MB和Y的32 MB中创建一个64 M
据我了解,如果我们有一个主键,则使用该键对数据进行分区并将其存储在节点中(例如使用随机分区器)。 现在我不确定的是,如果我有多个键(又名复合键),是用于分区数据的键的组合还是它将是第一个主键? 例如,
我正在向我的 SSAS 多维数据集添加分区,我想知道是否有多个分区可以保留在下面?多少太多了,最佳实践限制是 20 还是 200?有没有人可以分享任何真实世界的知识? 最佳答案 这是 another
我有一个包含大约 200 万条记录的大表,我想对其进行分区。 我将 id 列设置为 PRIMARY AUTO_INCRMENT int (并且它必须始终是唯一的)。我有一列“theyear”int(4
我正在做 mysql 列表分区。我的表数据如下 ---------------------------------------- id | unique_token | city | student_
我有一个表,我们每天在其中插入大约 2000 万个条目(没有任何限制的盲插入)。我们有两个外键,其中一个是对包含大约 1000 万个条目的表的引用 ID。 我打算删除此表中超过一个月的所有数据,因为不
我想在一款足球奇幻游戏中尝试使用 MySQL Partitioning,该游戏的用户分布在联赛中,每个联赛都有一个用户可以买卖球员的市场。当很多用户同时玩时,我在这张表中遇到了一些僵局(在撰写本文时大
我是 jQuery 的新手,想知道是否可以获取一些变量并将它们的除法作为 CSS 宽度。到目前为止我在这里: var x = $(".some-container").length; var y =
所以我正在做家庭作业,我需要为分区、斯特林数(第一类和第二类)和第一类的切比雪夫多项式创建递归函数。我的程序应该能够让用户输入一个正整数 n,然后创建名为 Partitions.txt、Stirlin
我在数据框中有一列,其中包含大约 1,4M 行聊天对话,其中每个单元格中的一般格式为 (1): “名称代理 : 对话” 但是,并非列中的所有单元格都采用这种格式。有些单元格只是 (2): “对话” 我
我在尝试隐藏 a 时遇到了一些问题,直到用户单击某个元素为止。 HTML 看起来像: BRAND item 1 item 2 item 3
一.为什么kafka要做分区? 因为当一台机器有可能扛不住(类比:就像redis集群中的redis-cluster一样,一个master抗不住写,那么就多个master去抗写)
我有一些销售数据,我需要发送存储在单独表中的可用槽中的数量。 销售数据示例: id数量112131415369 create table sales (id serial primary key, q
我计划设置多个节点以使用 glusterfs 创建分布式复制卷 我使用主(也是唯一)分区上的目录在两个节点上创建了一个 gluster 复制卷。 gluster volume create vol_d
我正在尝试使用 sum() over (partition by) 但在总和中过滤。我的用例是将每个产品的 12 个月累计到一个月的条目,因此: ITEM MONTH SALES Item
是否可以创建多个 Enumerators出单Enumerator ? 我正在寻找的相当于 List.partition返回 (List[A], List[A]) ,比如 List().partitio
我正在创建一个基于 x86 的非常简单的 Yocto 图像。 我希望/文件系统是只读的,所以我设置了 IMAGE_FEATURES_append = " read-only-rootfs " 在原件的
是否可以使用一次 collect 调用来创建 2 个新列表?如果没有,我该如何使用分区来做到这一点? 最佳答案 collect(在TraversableLike上定义并在所有子类中可用)与集合和Par
我是一名优秀的程序员,十分优秀!