gpt4 book ai didi

Java:测试 spark-sql

转载 作者:行者123 更新时间:2023-12-05 07:37:32 24 4
gpt4 key购买 nike

我用 spark-sql 为应用程序编写了测试。而且这个测试不起作用。没有 spark-sql 模块 - 所有测试都有效(RDD)。

库版本:

  • 6月:4.12
  • Spark 核心:2.2.1
  • Spark SQL:2.2.1

测试是:

List<Claim> claims = FileResource.loadListObjOfFile("cg-32-claims-load.json", Claim[].class);
assertTrue(claims.size() == 1000L);

Dataset<Claim> dataset = getSparkSession().createDataset(claims, Encoders.bean(Claim.class));
assertTrue(dataset.count() == 1000L);

Dataset<ResultBean> resDataSet = dataset
.groupByKey((MapFunction<Claim, Integer>) Claim::getMbrId, Encoders.INT())
.mapGroups((MapGroupsFunction<Integer, Claim, ResultBean>) (key, values) -> new ResultBean(), Encoders.bean(ResultBean.class));

assertTrue(resDataSet.count() == 42L);

在最后一行我有一个异常(exception)。应用程序仅在测试中抛出此异常。 (简单的主类 - 工作正常)。

看起来 spark sql 由于某种原因无法初始化 java bean。堆栈跟踪:

+- AppendColumns <function1>, initializejavabean(newInstance(class test.input.Claim), (setDiag1,diag1#28.toString), .... [input[0, java.lang.Integer, true].intValue AS value#84]
+- LocalTableScan [birthDt#23, birthDtStr#24, clmFromDt#25, .... pcdCd#45, plcOfSvcCd#46, ... 2 more fields]

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
....
Caused by: java.lang.AssertionError: index (23) should < 23
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:133)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:352)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply2_7$(generated.java:52)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:600)
at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:41)
at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:36)
at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:48)
at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:48)
at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:52)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.AppendColumnsExec.doExecute(objects.scala:272)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 86 more

最佳答案

当 bean 类中存在问题时会发生此错误,这将有助于检查您的 bean 类中是否具有所有的 getter 和 setter。

希望这可以帮助任何卡在这上面的人!

关于Java:测试 spark-sql,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48515627/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com