gpt4 book ai didi

JavaRDD 从 IgniteRDD 需要很长时间

转载 作者:太空宇宙 更新时间:2023-11-04 12:32:19 25 4
gpt4 key购买 nike

我已经使用 Spark 创建了一个 Apache Ignite 应用程序

  • Ignite 版本 - 1.6.0
  • Spark 版本 - 1.5.2(基于 Scala 2.11 构建)

应用程序将两个元组存储到IgniteRDD

当调用retrieve时,collect函数花费的时间超过3分钟。

提交的职位数量超过1000

代码片段:

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.ignite.spark.IgniteContext;
import org.apache.ignite.spark.IgniteRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class CopyOfMainIgnite {

public static void main(String args[]) {
SparkConf conf = new SparkConf().setAppName("Demo").setMaster(
"spark://169.254.228.183:7077");
System.out.println("Spark conf initialized.");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.addJar("./target/IgnitePOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar");
System.out.println("Spark context initialized.");
IgniteContext ic = new IgniteContext(sc.sc(),
"ignite/client-default-config.xml");
System.out.println("Ignite Context initialized.");
String cacheName = "demo6";
save(sc, ic, cacheName);

retrieve(ic, cacheName);
ic.close(false);
sc.close();

}

private static void retrieve(IgniteContext ic, String cacheName) {
System.out.println("Getting IgniteRDD saved.");
IgniteRDD<String, String> javaIRDDRet = ic.fromCache(cacheName);
long temp1 = System.currentTimeMillis();

JavaRDD<Tuple2<String, String>> javardd = javaIRDDRet.toJavaRDD();
System.out
.println("Is empty Start Time: " + System.currentTimeMillis());
System.out.println("javaIRDDRet.isEmpty(): " + javardd.isEmpty());
System.out.println("Is empty End Time: " + System.currentTimeMillis());
long temp2 = System.currentTimeMillis();
long temp3 = System.currentTimeMillis();
System.out.println("collect and println Start Time: "
+ System.currentTimeMillis());
javardd.collect().forEach(System.out::println);
System.out.println("collect and println End Time: "
+ System.currentTimeMillis());
long temp4 = System.currentTimeMillis();
System.out.println("Is empty : " + temp1 + " " + temp2
+ " Collect and print: " + temp3 + " " + temp4);
}

private static void save(JavaSparkContext sc, IgniteContext ic,
String cacheName) {
IgniteRDD<String, String> igniteRDD = ic.fromCache(cacheName);
System.out.println("IgniteRDD from cache initialized.");
Map<String, String> tempMap = new HashMap<String, String>();
tempMap.put("Aditya", "Jain");
tempMap.put("Pranjal", "Jaju");
Tuple2<String, String> tempTuple1 = new Tuple2<String, String>(
"Aditya", "Jain");
Tuple2<String, String> tempTuple2 = new Tuple2<String, String>(
"Pranjal", "Jaju");
List<Tuple2<String, String>> list = new LinkedList<Tuple2<String, String>>();
list.add(tempTuple1);
list.add(tempTuple2);
JavaPairRDD<String, String> jpr = sc.parallelizePairs(list, 4);
System.out.println("Random RDD saved.");
igniteRDD.savePairs(jpr.rdd(), false);
System.out.println("IgniteRDD saved.");
}
}

所以我的问题:从 Ignite 获取 2 个 Rdd 元组并在我的进程中收集它们是否需要 3-4 分钟?

或者我对它会在几毫秒内响应的期望是错误的?

调试后,我发现它在 ignite rdd 中创建 1024 个分区,这导致它触发 1024 个作业。而且我没有任何方法来控制分区数量。

最佳答案

您可以减少CacheConfiguration中的分区数量:

<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="affinity">
<bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
<property name="partitions" value="32"/>
</bean>
</property>
</bean>

您还可以使用 IgniteRDD.sql(..)IgniteRDD.objectSql(..) 方法利用快速索引搜索直接从 Ignite 检索数据。有关如何在 Ignite 中配置 SQL 的详细信息,请参阅此页面:https://apacheignite.readme.io/docs/sql-queries

关于JavaRDD 从 IgniteRDD 需要很长时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37727599/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com