gpt4 book ai didi

java - 使用 Apache Spark 将可迭代值映射到键

转载 作者:行者123 更新时间:2023-12-01 08:59:44 27 4
gpt4 key购买 nike

我将以下数据保存在 JavaPairRDD 中。

起始数据

key(nodeName)    PointsTo                         1/n     n

node1 [node2,node3,node4] 0.33 3
node2 [node1,node5] 0.50 2
node3 [node1,node2,node4,node5] 0.25 4
node4 [node1,node2] 0.50 2
node5 [node2,node3,node4] 0.33 3


key(nodeName) PointsTo 1/n n

node2 node1 0.33 3
node3 node1 0.33 3
node4 node1 0.33 3

node1 node2 0.50 2
node5 node2 0.50 2

node1 node3 0.25 4
node2 node3 0.25 4
node4 node3 0.25 4
node5 node3 0.25 4

node1 node4 0.50 2
node2 node4 0.50 2

node2 node5 0.33 3
node3 node5 0.33 3
node4 node5 0.33 3

JavaPairRDD 看起来像这样。

JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>>

其中String = key(nodeName),而Tuple3分别具有PointsTo、1/n和n值

现在我想执行 2 个步骤。

第 1 步:

key(nodeName)    PointsTo                         1/n     n

node1 [node2,node3,node4] 0.33 3
node2 [node1,node3,node4,node5] 0.50 2
node3 [node1,node5] 0.25 4
node4 [node1,node3,node5] 0.50 2
node5 [node2,node3] 0.33 3

第 2 步:

key(nodeName)    PointsTo                         1/n     n

node1 [0.11,0.14,0.32] 0.33 3
node2 [0.92,0.14,0.32,0.67] 0.50 2
node3 [0.92,0.67] 0.25 4
node4 [0.92,0.14,0.67] 0.50 2
node5 [0.11,0.14] 0.33 3

如果我用单个节点来解释它会容易得多。那么让我们以node4为例。 node4 出现在 PointsTo 中,其中 keynode1,node3,node5。因此,我们只需将 PointsTo(其中 key=node4)更新为现在的 node1,node3,node5

请注意,1/n 和 n 列中的值与键相同。

第 2 步中,我们只需将 nodeNames 替换为其各自的 1/n 值。

所以最终的 JavaPairRDD 将如下所示。

JavaPairRDD<String, Tuple3<Iterable<Double>,Double,Double>>

最佳答案

我对此的尝试:

  • 来自 JavaPairRDD获取引用数据(data1、data2:有关详细信息,请参阅输出)并存储在 Map
  • 创建函数来扫描输入数据并替换为新的 Iterable<String>使用引用数据 2 的值 Map新中Iterable<String> :参见getStep1Data()
  • 创建扫描功能 Iterable<String> PointsTo1来自源 RDD 并替换为其 1/n使用引用值Map新中Iterable<Double> :参见getPointsToN()

来源:

public class SparkTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");
SparkContext sc = new SparkContext(conf);

PairFunction<String, String,Tuple3<Iterable<String>,Double,Double>> keyData =
new PairFunction<String,String,Tuple3<Iterable<String>,Double,Double>>() {

private static final long serialVersionUID = 1L;
public Tuple2<String, Tuple3<Iterable<String>, Double, Double>> call(String line) throws Exception {
return new Tuple2(line.split("\\|")[0],
new Tuple3(Arrays.asList(line.split("\\|")[1].split(",")),
Double.parseDouble(line.split("\\|")[2]),Double.parseDouble(line.split("\\|")[3])));
}
};

System.out.println(" --- Input Data ---"); //Data to start with
JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>> masterData =
sc.textFile("Data\\node.txt", 1).toJavaRDD().mapToPair(keyData);
masterData.foreach(line -> System.out.println(line));

Map<String,Double> refData1 = new HashMap<String,Double>();
Map<String,Iterable<String>> refData2 = new LinkedHashMap<String,Iterable<String>>();
masterData.collect().forEach(line -> {refData1.put(line._1, line._2._2()); refData2.put(line._1, line._2._1());});
System.out.println(" --- Referacne Data 1-- "); //stored referance Data node and its 1/n value
for(String k: refData1.keySet())System.out.println(k + " --> " + refData1.get(k));

System.out.println(" --- Referacne Data 2-- "); //stored referance Data node and its 1/n value
for(String k: refData2.keySet()) System.out.println(k + " --> " + refData2.get(k));

System.out.println(" --- Data in Step 1 --- ");
JavaPairRDD<String, Tuple3<Iterable<String>,Double,Double>> step1RDD =
masterData.mapToPair(line ->new Tuple2(line._1, new Tuple3(getStep1Data(line._1,refData2), line._2._2(), line._2._3())));

step1RDD.foreach(line -> System.out.println(line));

System.out.println(" --- Data in Step 2 --- ");
JavaPairRDD<String, Tuple3<Iterable<Double>,Double,Double>> step2RDD =
step1RDD.mapToPair(line ->new Tuple2(line._1, new Tuple3(getPointsToN(refData1, line._2._1()), line._2._2(), line._2._3())));

step2RDD.foreach(line -> System.out.println(line));
}
// get 1/n values from referance Data for pointsTo in new list
public static Iterable<Double> getPointsToN(Map<String, Double> refData, Iterable<String> pointsTo){
ArrayList<Double> n1 = new ArrayList<Double>();
for(String node: pointsTo)
if(refData.containsKey(node))
n1.add(refData.get(node));
return n1;
}
// get node values from referance Data 2 for pointsTo in new list
public static Iterable<String> getStep1Data(String node,Map<String, Iterable<String>> refData2){
ArrayList<String> n1 = new ArrayList<String>();
for(String n: refData2.keySet())
refData2.get(n).forEach(element -> {if(element.equals(node)) n1.add(n);});
return n1;
}
}

enter image description here

关于java - 使用 Apache Spark 将可迭代值映射到键,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41783411/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com