gpt4 book ai didi

java - 我知道如何在数据集上执行 orderBy ("a"、 "b"...)、groupBy。我需要独立地对每个记录子集进行计算和处理

转载 作者:行者123 更新时间:2023-12-04 15:18:31 26 4
gpt4 key购买 nike

我正在阅读城市的会计文件。我的目标是为每个机构的每个会计编号提供一些有用的小计:

一些列,从 (cumulSD3, cumulSC3) 到 (cumulSD7, cumulSC7) 添加到根帐户的记录和聚合 soldeDebiteursoldeCrediteur:帐号 13248 将在 13248 下聚合,1324132 级别,例如。

+--------------------------+----------+-----------------+---------------------+---------------------+---------+----------+------------+-----------+------------+----------+---------------------+-----------+------------+------------------+-------------------+------------------------+-------------------------+---------------------------+----------------------------+-----------------------------+------------------------------+-------------+--------------+-------------+---------------+--------------------------+--------+--------+-----------------------------------------------------------------------------------------------------+-------------------------+------------+----------------+----------------+----------+----------+----------------+----------+----------+----------------+----------+---------+---------------+-----------+--------------+----------------+--------+---------+
|libelleBudget |typeBudget|typeEtablissement|sousTypeEtablissement|nomenclatureComptable|siren |codeRegion|codeActivite|codeSecteur|numeroFINESS|codeBudget|categorieCollectivite|typeBalance|numeroCompte|balanceEntreeDebit|balanceEntreeCredit|operationBudgetaireDebit|operationBudgetaireCredit|operationNonBudgetaireDebit|operationNonBudgetaireCredit|operationOrdreBudgetaireDebit|operationOrdreBudgetaireCredit|soldeDebiteur|soldeCrediteur|anneeExercice|budgetPrincipal|nombreChiffresNumeroCompte|cumulSD7|cumulSC7|libelleCompte |nomenclatureComptablePlan|sirenCommune|populationTotale|numeroCompteSur3|cumulSD3 |cumulSC3 |numeroCompteSur4|cumulSD4 |cumulSC4 |numeroCompteSur5|cumulSD5 |cumulSC5 |codeDepartement|codeCommune|siret |numeroCompteSur6|cumulSD6|cumulSC6 |
+--------------------------+----------+-----------------+---------------------+---------------------+---------+----------+------------+-----------+------------+----------+---------------------+-----------+------------+------------------+-------------------+------------------------+-------------------------+---------------------------+----------------------------+-----------------------------+------------------------------+-------------+--------------+-------------+---------------+--------------------------+--------+--------+-----------------------------------------------------------------------------------------------------+-------------------------+------------+----------------+----------------+----------+----------+----------------+----------+----------+----------------+----------+---------+---------------+-----------+--------------+----------------+--------+---------+
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |1021 |0.0 |349139.71 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |349139.71 |2019 |true |4 |0.0 |0.0 |Dotation |M14 |210100012 |794 |102 |0.0 |995427.19 |1021 |0.0 |349139.71 |1021 |0.0 |0.0 |01 |01001 |21010001200017|1021 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |10222 |0.0 |554545.85 |0.0 |30003.0 |0.0 |0.0 |0.0 |0.0 |0.0 |584548.85 |2019 |true |5 |0.0 |0.0 |F.C.T.V.A. |M14 |210100012 |794 |102 |0.0 |995427.19 |1022 |0.0 |646287.48 |10222 |0.0 |584548.85|01 |01001 |21010001200017|10222 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |10223 |0.0 |4946.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |4946.0 |2019 |true |5 |0.0 |0.0 |T.L.E. |M14 |210100012 |794 |102 |0.0 |995427.19 |1022 |0.0 |646287.48 |10223 |0.0 |4946.0 |01 |01001 |21010001200017|10223 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |10226 |0.0 |41753.65 |0.0 |12078.54 |0.0 |0.0 |0.0 |0.0 |0.0 |53832.19 |2019 |true |5 |0.0 |0.0 |Taxe d’aménagement |M14 |210100012 |794 |102 |0.0 |995427.19 |1022 |0.0 |646287.48 |10226 |0.0 |53832.19 |01 |01001 |21010001200017|10226 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |10227 |0.0 |2960.44 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |2960.44 |2019 |true |5 |0.0 |0.0 |Versement pour sous-densité |M14 |210100012 |794 |102 |0.0 |995427.19 |1022 |0.0 |646287.48 |10227 |0.0 |2960.44 |01 |01001 |21010001200017|10227 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |1068 |0.0 |2281475.34 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |2281475.34 |2019 |true |4 |0.0 |0.0 |Excédents de fonctionnement capitalisés |M14 |210100012 |794 |106 |0.0 |2281475.34|1068 |0.0 |2281475.34|1068 |0.0 |0.0 |01 |01001 |21010001200017|1068 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |110 |0.0 |97772.73 |0.0 |0.0 |0.0 |112620.66 |0.0 |0.0 |0.0 |210393.39 |2019 |true |3 |0.0 |0.0 |Report à nouveau (solde créditeur) |M14 |210100012 |794 |110 |0.0 |210393.39 |110 |0.0 |0.0 |110 |0.0 |0.0 |01 |01001 |21010001200017|110 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |12 |0.0 |112620.66 |0.0 |0.0 |112620.66 |0.0 |0.0 |0.0 |0.0 |0.0 |2019 |true |2 |0.0 |0.0 |RÉSULTAT DE L'EXERCICE (excédentaire ou déficitaire) |M14 |210100012 |794 |12 |0.0 |0.0 |12 |0.0 |0.0 |12 |0.0 |0.0 |01 |01001 |21010001200017|12 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |1321 |0.0 |29097.78 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |29097.78 |2019 |true |4 |0.0 |0.0 |État et établissements nationaux |M14 |210100012 |794 |132 |0.0 |296722.26 |1321 |0.0 |29097.78 |1321 |0.0 |0.0 |01 |01001 |21010001200017|1321 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |1322 |0.0 |201.67 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |201.67 |2019 |true |4 |0.0 |0.0 |Régions |M14 |210100012 |794 |132 |0.0 |296722.26 |1322 |0.0 |201.67 |1322 |0.0 |0.0 |01 |01001 |21010001200017|1322 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |1323 |0.0 |163194.37 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |163194.37 |2019 |true |4 |0.0 |0.0 |Départements |M14 |210100012 |794 |132 |0.0 |296722.26 |1323 |0.0 |163194.37 |1323 |0.0 |0.0 |01 |01001 |21010001200017|1323 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |13248 |0.0 |1129.37 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |1129.37 |2019 |true |5 |0.0 |0.0 |Autres communes |M14 |210100012 |794 |132 |0.0 |296722.26 |1324 |0.0 |1129.37 |13248 |0.0 |1129.37 |01 |01001 |21010001200017|13248 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |13251 |0.0 |47079.11 |0.0 |2387.05 |0.0 |0.0 |0.0 |0.0 |0.0 |49466.16 |2019 |true |5 |0.0 |0.0 |GFP de rattachement |M14 |210100012 |794 |132 |0.0 |296722.26 |1325 |0.0 |49532.16 |13251 |0.0 |49466.16 |01 |01001 |21010001200017|13251 |0.0 |0.0 |
|ABERGEMENT-CLEMENCIAT (L')|1 |101 |00 |M14 |210100012|084 |40 |null |null |null |Commune |DEF |13258 |0.0 |66.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |66.0 |2019 |true |5 |0.0 |0.0 |Autres groupements |M14 |210100012 |794 |132 |0.0 |296722.26 |1325 |0.0 |49532.16 |13258 |0.0 |66.0 |01 |01001 |21010001200017|13258 |0.0 |0.0 |

为了更清楚,只保留参与计算的主要字段,这里是我的函数关注的地方:

+--------------+------------+-------------+--------------+--------+--------+--------+--------+---------+---------+----------+----------+----------+----------+
| siret|numeroCompte|soldeDebiteur|soldeCrediteur|cumulSD7|cumulSC7|cumulSD6|cumulSC6| cumulSD5| cumulSC5| cumulSD4| cumulSC4| cumulSD3| cumulSC3|
+--------------+------------+-------------+--------------+--------+--------+--------+--------+---------+---------+----------+----------+----------+----------+
|21010001200017| 1021| 0.0| 349139.71| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 349139.71| 0.0| 995427.19|
|21010001200017| 10222| 0.0| 584548.85| 0.0| 0.0| 0.0| 0.0| 0.0|584548.85| 0.0| 646287.48| 0.0| 995427.19|
|21010001200017| 10223| 0.0| 4946.0| 0.0| 0.0| 0.0| 0.0| 0.0| 4946.0| 0.0| 646287.48| 0.0| 995427.19|
|21010001200017| 10226| 0.0| 53832.19| 0.0| 0.0| 0.0| 0.0| 0.0| 53832.19| 0.0| 646287.48| 0.0| 995427.19|
|21010001200017| 10227| 0.0| 2960.44| 0.0| 0.0| 0.0| 0.0| 0.0| 2960.44| 0.0| 646287.48| 0.0| 995427.19|
|21010001200017| 1068| 0.0| 2281475.34| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|2281475.34| 0.0|2281475.34|
|21010001200017| 110| 0.0| 210393.39| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 210393.39|
|21010001200017| 12| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
|21010001200017| 1321| 0.0| 29097.78| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 29097.78| 0.0| 296722.26|
|21010001200017| 1322| 0.0| 201.67| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 201.67| 0.0| 296722.26|
|21010001200017| 1323| 0.0| 163194.37| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 163194.37| 0.0| 296722.26|
|21010001200017| 13248| 0.0| 1129.37| 0.0| 0.0| 0.0| 0.0| 0.0| 1129.37| 0.0| 1129.37| 0.0| 296722.26|
|21010001200017| 13251| 0.0| 49466.16| 0.0| 0.0| 0.0| 0.0| 0.0| 49466.16| 0.0| 49532.16| 0.0| 296722.26|
|21010001200017| 13258| 0.0| 66.0| 0.0| 0.0| 0.0| 0.0| 0.0| 66.0| 0.0| 49532.16| 0.0| 296722.26|
|21010001200017| 1328| 0.0| 53566.91| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 53566.91| 0.0| 296722.26|
|21010001200017| 1341| 0.0| 142734.21| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 142734.21| 0.0| 145233.21|
|21010001200017| 1342| 0.0| 2499.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 2499.0| 0.0| 145233.21|
|21010001200017| 1383| 0.0| 2550.01| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 2550.01| 0.0| 2550.01|
|21010001200017| 1641| 0.0| 236052.94| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 236052.94| 0.0| 236052.94|

这从按部门城市代码帐号siret(我们的机构标识符)。
然而,缺乏知识,我正在做一些令人心碎的事情:

通过 RDD 进行的第一次尝试成本高昂但有效

/**
* Créer un dataset de cumuls de comptes parents par siret.
* @param session Session Spark.
* @param comptes Dataset des comptes de comptabilités de tous les siret.
* @return Dataset avec un siret associés à des cumuls par comptes à 7, 6, 5, 4, 3 chiffres, pour soldes de débit et soldes de crédit.
*/
private Dataset<Row> cumulsComptesParentsParSiret(SparkSession session, Dataset<Row> comptes) {
JavaPairRDD<String, Iterable<Row>> rddComptesParSiret = comptes.javaRDD().groupBy((Function<Row, String>)compte -> compte.getAs("siret"));

// Réaliser les cumuls par siret et compte, par compte parent.
JavaRDD<Row> rdd = rddComptesParSiret.flatMap((FlatMapFunction<Tuple2<String, Iterable<Row>>, Row>)comptesSiret -> {
String siret = comptesSiret._1();
AccumulateurCompte comptesParentsPourSiret = new AccumulateurCompte(siret);

for(Row rowCompte : comptesSiret._2()) {
String numeroCompte = rowCompte.getAs("numeroCompte");
Double soldeSD = rowCompte.getAs("soldeDebiteur");
Double soldeSC = rowCompte.getAs("soldeCrediteur");

comptesParentsPourSiret.add(numeroCompte, soldeSD, soldeSC);
}

// Faire une ligne de regroupement siret, compte et ses comptes parents.
List<Row> rowsCumulsPourSiret = new ArrayList<>();

for(Row rowCompte : comptesSiret._2()) {
String numeroCompte = rowCompte.getAs("numeroCompte");
double sd[] = new double[6];
double sc[] = new double[6];

for(int nombreChiffres = numeroCompte.length(); nombreChiffres >= 3; nombreChiffres--) {
String compteParent = numeroCompte.substring(0, nombreChiffres);
Double cumulDebits = comptesParentsPourSiret.getCumulSD(compteParent);
Double cumulCredits = comptesParentsPourSiret.getCumulSC(compteParent);

sd[nombreChiffres - 3] = cumulDebits != null ? Precision.round(cumulDebits, 2, BigDecimal.ROUND_CEILING) : 0.0;
sc[nombreChiffres - 3] = cumulCredits != null ? Precision.round(cumulCredits, 2, BigDecimal.ROUND_CEILING) : 0.0;
}

Row rowCumulsPourCompte = RowFactory.create(siret, numeroCompte, sd[4], sc[4], sd[3], sc[3], sd[2], sc[2], sd[1], sc[1], sd[0], sc[0]);
rowsCumulsPourSiret.add(rowCumulsPourCompte);
}

return rowsCumulsPourSiret.iterator();
});

return session.createDataFrame(rdd, schemaCumulComptesParents());
}

第二次尝试,通过数据集:高效,但不允许对会计记录进行低级管理

/**
* Cumuler les sous-comptes.
* @param comptes Dataset de comptes.
* @return Dataset aux cumuls de comptes à 3, 4, 5, 6, 7 chiffres réalisés, par commune.
*/
private Dataset<Row> cumulsSousComptes(Dataset<Row> comptes) {
Dataset<Row> comptesAvecCumuls = comptes;

for(int nombreChiffresNiveauCompte = 3; nombreChiffresNiveauCompte < 7; nombreChiffresNiveauCompte ++) {
comptesAvecCumuls = cumulsCompteParent(comptesAvecCumuls, nombreChiffresNiveauCompte);
}

return comptesAvecCumuls;
}

/**
* Cumul par un niveau de compte parent.
* @param comptes Liste des comptes.
* @param nombreChiffres Nombre de chiffres auquel réduire le compte à cummuler. Exemple 4 : 2041582 est cumulé sur 2041.
* @return cumuls par compte parent : dataset au format (cumul des soldes débiteurs, cumul des soldes créditeurs).
*/
private Dataset<Row> cumulsCompteParent(Dataset<Row> comptes, int nombreChiffres) {
// Cumuler pour un niveau de compte parent sur le préfixe de leurs comptes réduits à nombreChiffres.
Column nombreChiffresCompte = comptes.col("nombreChiffresNumeroCompte");

String aliasNumeroCompte = MessageFormat.format("numeroCompteSur{0}", nombreChiffres);
RelationalGroupedDataset group = comptes.groupBy(col("codeDepartement"), col("codeCommune"), col("siret"), col("numeroCompte").substr(1,nombreChiffres).as(aliasNumeroCompte));

String nomChampCumulSD = MessageFormat.format("cumulSD{0}", nombreChiffres);
String nomChampCumulSC = MessageFormat.format("cumulSC{0}", nombreChiffres);
Column sd = sum(when(nombreChiffresCompte.$greater$eq(lit(nombreChiffres)), col("soldeDebiteur")).otherwise(lit(0.0))).as(nomChampCumulSD);
Column sc = sum(when(nombreChiffresCompte.$greater$eq(lit(nombreChiffres)), col("soldeCrediteur")).otherwise(lit(0.0))).as(nomChampCumulSC);

Dataset<Row> cumuls = group.agg(sd, sc);

// Associer à chaque compte la colonne de cumuls de comptes parents, pour le niveau en question.
Column jointure =
comptes.col("codeDepartement").equalTo(cumuls.col("codeDepartement"))
.and(comptes.col("codeCommune").equalTo(cumuls.col("codeCommune")))
.and(comptes.col("siret").equalTo(cumuls.col("siret")))
.and(comptes.col("numeroCompte").substr(1, nombreChiffres).equalTo(cumuls.col(aliasNumeroCompte)));

Dataset<Row> comptesAvecCumuls = comptes.join(cumuls, jointure, "left_outer")
.drop(comptes.col("siret"))
.drop(comptes.col("codeDepartement"))
.drop(comptes.col("codeCommune"))
.drop(comptes.col(nomChampCumulSD))
.drop(comptes.col(nomChampCumulSC))
.withColumnRenamed("cumulSD", nomChampCumulSD)
.withColumnRenamed("cumulSC", nomChampCumulSC)
.withColumn(nomChampCumulSD, round(col(nomChampCumulSD), 2))
.withColumn(nomChampCumulSC, round(col(nomChampCumulSC), 2));

return comptesAvecCumuls;
}

低级别管理,我的意思是:一些最后一刻的验证发出一些警告或在求和时排除一些值:

  • 如果会计术语在机构记录中发生变化。
  • 就我拥有的其他知识而言,警告一个看起来很奇怪的值。

我想要的

我需要独立浏览每个组的行内容。一组接一组。

我需要一个 Spark 函数来实现回调方法,其中:

  • 第一个参数将给出当前键值(部门代码城市代码siret),
  • 第二个是与这些键关联的行。
Dataset<Row> eachGroupContent(Row keys, Dataset<Row> groupContent);

它将由 Spark 使用这些条目参数连续调用:

Row (keys) : {Department : 01, City code : 01001, siret : 21010001200017}

Dataset<Row> (values) associated :
+---------------+-----------+--------------+------------+-------------+--------------+--------+
|codeDepartement|codeCommune| siret|numeroCompte|soldeDebiteur|soldeCrediteur|(others)|
+---------------+-----------+--------------+------------+-------------+--------------+--------+
| 01| 01001|21010001200017| 1021| 0.0| 349139.71| ...|
| 01| 01001|21010001200017| 10222| 0.0| 584548.85| ...|
| 01| 01001|21010001200017| 10223| 0.0| 4946.0| ...|
| 01| 01001|21010001200017| 10226| 0.0| 53832.19| ...|
Row : {Department : 01, City code : 01001, siret : 21010001200033}

Dataset<Row> :
| 01| 01001|21010001200033| 1021| 0.0| 38863.22| ...|
| 01| 01001|21010001200033| 10222| 0.0| 62067.0| ...|
| 01| 01001|21010001200033| 10228| 0.0| 9666.0| ...|
| 01| 01001|21010001200033| 1068| 0.0| 100121.62| ...|
Row : {Department : 01, City code : 01001, siret : 21010001200066}

Dataset<Row> :
| 01| 01001|21010001200066| 1641| 0.0| 100000.0| ...|
| 01| 01001|21010001200066| 3355| 587689.33| 0.0| ...|
| 01| 01001|21010001200066| 4011| 0.0| 0.0| ...|
| 01| 01001|21010001200066| 40171| 0.0| 10036.5| ...|

这是我第一次尝试时能够做到的,

rddComptesParSiret.flatMap((FlatMapFunction<Tuple2<String, Iterable<Row>>, Row>)comptesSiret

但没有提供所有好的键(部门和城市代码丢失,破坏了之前完成的所有排序),而且:RDD 不再受到青睐。

但我无法通过似乎不提供此类工具的 RelationalGroupedDataset 方法在 Java 中实现。

目前,我知道如何进行 groupBy 或排序:

accounting.groupBy("department", "cityCode", "accountNumber", "siret").agg(...);

我的问题

如何浏览
每条记录
每组
[执行子计算或其他工作]
一组接一组

最佳答案

KeyValueGroupedDataset.mapGroups将为您提供一个遍历给定组的所有行的迭代器。实现接口(interface)时 MapGroupsFunction您可以在整个组中访问此迭代器。

Dataset<Row> df = spark.read().option("header", true).option("inferSchema", true).csv(...);

Dataset<Result> resultDf = df
.groupByKey((MapFunction<Row, Key>) (Row r)
-> new Key(r.getInt(r.fieldIndex("codeDepartement")),
r.getInt(r.fieldIndex("codeCommune")),
r.getLong(r.fieldIndex("siret"))),
Encoders.bean(Key.class))
.mapGroups(new MyMapGroupsFunction(), Encoders.bean(Result.class));
resultDf.show();

在 Java 世界中,我们必须为数据集定义 bean 类。

一个用于分组列:

public static class Key {
private int codeDepartement;
private int codeCommune;
private long siret;
//constructors, getters and setters
...
}

还有一个用于结果列:

public static class Result {
private int codeDepartement;
private int codeCommune;
private long siret;
private double result1;
private double result2;
//constructors, getters and setters
...
}

在这个例子中,我使用了一个由三个键列和两个计算列 result1result2 组成的结果结构。可以在此处添加更多结果列。

实际逻辑发生在 MyMapGroupsFunction 内部:

public static class MyMapGroupsFunction implements MapGroupsFunction<Key, Row, Result> {

@Override
public Result call(Key key, Iterator<Row> values) throws Exception {
//drain the iterator into a list. The list now
//contains all rows that belong to one single group
List<Row> rows = new ArrayList<>();
values.forEachRemaining(rows::add);

//now any arbitrary logic can be used to calculate the result values
//based on the contents of the list
double result1 = 0;
double result2 = 0;
for (Row r : rows) {
double cumulSD3 = r.getDouble(r.fieldIndex("cumulSC3"));
double cumulSD4 = r.getDouble(r.fieldIndex("cumulSC4"));
result1 += cumulSD3 + cumulSD4;
result2 += cumulSD3 * cumulSD4;
}

//return the result consisting of the elements of the key and the calculated values
return new Result(key.getCodeDepartement(), key.getCodeCommune(),
key.getSiret(), result1, result2);
}
}

打印我们得到的结果

+-----------+---------------+--------------------+--------------------+--------------+
|codeCommune|codeDepartement| result1| result2| siret|
+-----------+---------------+--------------------+--------------------+--------------+
| 1001| 1| 692508.8400000001|2.939458891576320...|21010001200019|
| 1001| 1|1.4411536300000003E7|8.198151013048245E12|21010001200017|
| 1001| 1| 692508.8400000001|2.939458891576320...|21010001200018|
+-----------+---------------+--------------------+--------------------+--------------+

如果可以切换到 Scala,我会建议这样做。数据集 API 更适用于 Scala。

关于java - 我知道如何在数据集上执行 orderBy ("a"、 "b"...)、groupBy。我需要独立地对每个记录子集进行计算和处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63857257/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com