gpt4 book ai didi

java - 如何在Java中的javaPairRDD上使用aggregateByKey?

转载 作者:行者123 更新时间:2023-12-01 18:08:27 25 4
gpt4 key购买 nike

我搜索了很多,但没有找到在java代码中执行aggregateByKey的示例。

我想找到 JavaPairRDD 中按键减少的行数。

我读到aggregateByKey是最好的方法,但我使用Java而不是scala,并且我无法在Java中使用它。

请帮忙!!!

例如:

input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]

我想要做的与我的示例中完全相同,我想向输出行添加一个额外的列,以减少行数。

谢谢!!!

最佳答案

这是我如何在 java 中按键聚合的示例。

JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Row> call(Row tblRow) throws Exception {
String strID= CommonConstant.BLANKSTRING;
Object[] newRow = new Object[schemaSize];
for(String s: matchKey)
{
if(tblRow.apply(finalSchema.get(s))!=null){
strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
}
}
int rowSize= tblRow.length();
for (int itr = 0; itr < rowSize; itr++)
{
if(tblRow.apply(itr)!=null)
{
newRow[itr] = tblRow.apply(itr);
}
}
newRow[idIndex]= Utils.generateKey(strID);
return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
}
}).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>(){

private static final long serialVersionUID = 1L;

public Row call(Row argRow1, Row argRow2) throws Exception {
// TODO Auto-generated method stub

Integer rowThreshold= dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
Object[] newRow = new Object[schemaSize];
int rowSize= argRow1.length();

for (int itr = 0; itr < rowSize; itr++)
{
if(argRow1!=null && argRow2!=null)
{
if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
{
if(itr==rowSize-1){
newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
}else{
newRow[itr] = argRow2.apply(itr);
}
}
}
}

return RowFactory.create(newRow);

}

}, new Function2<Row,Row,Row>(){
private static final long serialVersionUID = 1L;

public Row call(Row v1, Row v2) throws Exception {
// TODO Auto-generated method stub
return v1;
}
});

JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() {
private static final long serialVersionUID = -5480405270683046298L;
public Row call(Tuple2<String, Row> rddRow) throws Exception {
return rddRow._2();
}
});

关于java - 如何在Java中的javaPairRDD上使用aggregateByKey?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34597255/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com