gpt4 book ai didi

Java : Spark iterate through custom objects

转载 作者:行者123 更新时间:2023-11-30 06:08:08 28 4
gpt4 key购买 nike

我的程序中有以下代码:

finalJoined类型为DataSet<Row>.

RuleParamsRuleOutputParams是带有 setter 和 gettes 的 java pojo 类。

我在下面的代码中调用drools规则引擎

List<Row> finalList = finalJoined.collectAsList();
List<RuleOutputParams> ruleOutputParamsList = new ArrayList<RuleOutputParams>();
Dataset<RuleOutputParams> rulesParamDS = null;
Iterator<Row> iterator = finalList.iterator();
while (iterator.hasNext()) {
Row row = iterator.next();
RuleParams ruleParams = new RuleParams();
String outputString = (String) row.get(1);
// setting up parameters
System.out.println("Value of TXN DTTM is : " + row.getString(0));
ruleParams.setTxnDttm(row.getString(0));
ruleParams.setCisDivision(row.getString(1));
System.out.println("Division is : " + ruleParams.getCisDivision());
ruleParams.setTxnVol(row.getInt(2));
System.out.println("TXN Volume is : " + ruleParams.getTxnVol());
ruleParams.setTxnAmt(row.getInt(3));
System.out.println("TXN Amount is : " + ruleParams.getTxnAmt());
ruleParams.setCurrencyCode(row.getString(4));
ruleParams.setAcctNumberTypeCode(row.getString(5));
ruleParams.setAccountNumber(row.getLong(6));
ruleParams.setUdfChar1(row.getString(7));
System.out.println("UDF Char1 is : " + ruleParams.getUdfChar1());
ruleParams.setUdfChar2(row.getString(8));
ruleParams.setUdfChar3(row.getString(9));
ruleParams.setAccountId(row.getLong(10));
kSession.insert(ruleParams);
int output = kSession.fireAllRules();

System.out.println("FileAllRules Output" + output);
System.out.println("After firing rules");
System.out.println(ruleParams.getPriceItemParam1());
System.out.println(ruleParams.getCisDivision());
// generating output objects depending on the size of priceitems
// derived.
System.out.println("No. of priceitems derived : " + ruleParams.getPriceItemCd().size());
for (int index = 0; index < ruleParams.getPriceItemCd().size(); index++) {

System.out.println("Inside a for loop");

RuleOutputParams ruleOutputParams = new RuleOutputParams();

ruleOutputParams.setTxnDttm(ruleParams.getTxnDttm());
ruleOutputParams.setCisDivision(ruleParams.getCisDivision());
ruleOutputParams.setTxnVol(ruleParams.getTxnVol());
ruleOutputParams.setTxnAmt(ruleParams.getTxnAmt());
ruleOutputParams.setCurrencyCode(ruleParams.getCurrencyCode());
ruleOutputParams.setAcctNumberTypeCode(ruleParams.getAcctNumberTypeCode());
ruleOutputParams.setAccountNumber(ruleParams.getAccountNumber());
ruleOutputParams.setAccountId(ruleParams.getAccountId());
ruleOutputParams.setPriceItemCd(ruleParams.getPriceItemCd().get(index));
System.out.println(ruleOutputParams.getPriceItemCd());
ruleOutputParams.setPriceItemParam(ruleParams.getPriceItemParams().get(index));
System.out.println(ruleOutputParams.getPriceItemParam());
ruleOutputParams.setPriceItemParamCode(ruleParams.getPriceItemParamCodes().get(index));
ruleOutputParams.setProcessingDate(new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
ruleOutputParams.setUdfChar1(ruleParams.getUdfChar1());
ruleOutputParams.setUdfChar2(ruleParams.getUdfChar2());
ruleOutputParams.setUdfChar3(ruleParams.getUdfChar3());

ruleOutputParamsList.add(ruleOutputParams);
}
}
System.out.println("Size of ruleOutputParamsList is : " + ruleOutputParamsList.size());
Encoder<RuleOutputParams> rulesOutputParamEncoder = Encoders.bean(RuleOutputParams.class);
rulesParamDS = sparkSession.createDataset(Collections.unmodifiableList(ruleOutputParamsList),
rulesOutputParamEncoder);
rulesParamDS.show();

我用过whilefor代码中循环。

这段代码可以用spark的map重写吗, flatmapforEach功能?如何做到这一点?

这里的问题是Drools规则引擎顺序调用。我想并行执行它。

编辑 - 如上面的代码所示,我首先转换 DataFrameList然后在上面使用迭代器。我可以直接使用 DataFrameRDD为了我的目的?

最佳答案

一个非常简单的演示,用于展示我的测试中的 parallelStreamCompletableFuture

对于并行流

int parallelGet() {
return IntStream.rangeClosed(0, TOP).parallel().map(i -> getIoBoundNumber(i)).sum();
}

对于CompletableFuture

int concurrencyGetBasic() {
List<CompletableFuture<Integer>> futureList = IntStream.rangeClosed(0, TOP).boxed()
.map(i -> CompletableFuture.supplyAsync(() -> getIoBoundNumber(i)))
.collect(Collectors.toList());
return futureList.stream().map(CompletableFuture::join).reduce(0, Integer::sum);
}

更多教程可以查看Java 8 TutorialJava 8 in Action

关于Java : Spark iterate through custom objects,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50851316/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com