gpt4 book ai didi

java - 如何运行多个 Spark Cassandra 查询

转载 作者:行者123 更新时间:2023-11-30 05:34:44 24 4
gpt4 key购买 nike

我需要在下面运行这样的任务。不知何故,我错过了一点。我知道,我不能像这样使用 javasparkcontext 并传递 javafunctions,因为存在序列化问题。

我需要以 cartesian.size() 的大小运行多个 cassandra 查询。有什么建议吗?

JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<DateTime> dateTimeJavaRDD = jsc.parallelize(dateTimes); //List<DateTime>
JavaRDD<Integer> virtualPartitionJavaRDD = jsc.parallelize(virtualPartitions); //List<Integer>
JavaPairRDD<DateTime, Integer> cartesian = dateTimeJavaRDD.cartesian(virtualPartitionJavaRDD);

long c = cartesian.map(new Function<Tuple2<DateTime, Integer>, Long>() {
@Override
public Long call(Tuple2<DateTime, Integer> tuple2) throws Exception {
return javaFunctions(jsc).cassandraTable("keyspace", "table").where("p1 = ? and p2 = ?", tuple2._1(), tuple2._2()).count();
}
}).reduce((a,b) -> a + b);


System.out.println("TOTAL ROW COUNT IS: " + c);

最佳答案

正确的解决方案应该是在数据和 Casasndra 表之间执行联接。有joinWithCassandraTable function这就是您需要做的事情 - 您只需生成包含 p1p2 值的 Tuple2 的 RDD,然后调用 joinWithCassandra 表,类似于这个(未经测试,采用我的示例 here ):

JavaRDD<Tuple2<Integer, Integer>> trdd = cartesian.map(new Function<Tuple2<DateTime, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<DateTime, Integer> tuple2) throws Exception {
return new Tuple2<Integer, Integer>(tuple2._1(), tuple2._2());
}
});
CassandraJavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Integer, String>> joinedRDD =
trdd.joinWithCassandraTable("test", "jtest",
someColumns("p1", "p2"), someColumns("p1", "p2"),
mapRowToTuple(Integer.class, String.class), mapTupleToRow(Integer.class));
// perform counting here...

关于java - 如何运行多个 Spark Cassandra 查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56867238/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com