- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
作为一点背景知识,我正在尝试实现 Kaplan-Meier在星火。特别是,我假设我有一个数据框/集,其中一个 Double
列表示为 Data
和一个 Int
列,名为 censorFlag
(0
值如果被审查,1
如果不是,优先使用此 Boolean
类型)。
示例:
val df = Seq((1.0, 1), (2.3, 0), (4.5, 1), (0.8, 1), (0.7, 0), (4.0, 1), (0.8, 1)).toDF("data", "censorFlag").as[(Double, Int)]
现在我需要计算一个列 wins
来计算每个 data
值的实例。我通过以下代码实现了这一点:
val distDF = df.withColumn("wins", sum(col("censorFlag")).over(Window.partitionBy("data").orderBy("data")))
当我需要计算一个名为 atRisk
的量时,问题就来了,它计算 data
的每个值,data
点的数量大于或等于它(累积过滤计数,如果你愿意的话)。
以下代码有效:
// We perform the counts per value of "bins". This is an array of doubles
val bins = df.select(col("data").as("dataBins")).distinct().sort("dataBins").as[Double].collect
val atRiskCounts = bins.map(x => (x, df.filter(col("data").geq(x)).count)).toSeq.toDF("data", "atRisk")
// this works:
atRiskCounts.show
但是,用例涉及从列 data
本身 派生 bins
,我宁愿将其保留为单列数据集(或最坏情况下的 RDD),但肯定不是本地数组。但这不起作用:
// Here, 'bins' rightfully come from the data itself.
val bins = df.select(col("data").as("dataBins")).distinct().as[Double]
val atRiskCounts = bins.map(x => (x, df.filter(col("data").geq(x)).count)).toSeq.toDF("data", "atRisk")
// This doesn't work -- NullPointerException
atRiskCounts.show
这也不行:
// Manually creating the bins and then parallelizing them.
val bins = Seq(0.7, 0.8, 1.0, 3.0).toDS
val atRiskCounts = bins.map(x => (x, df.filter(col("data").geq(x)).count)).toDF("data", "atRisk")
// Also fails with a NullPointerException
atRiskCounts.show
另一种确实有效但从并行化角度来看也不令人满意的方法是使用Window
:
// Do the counts in one fell swoop using a giant window per value.
val atRiskCounts = df.withColumn("atRisk", count("censorFlag").over(Window.orderBy("data").rowsBetween(0, Window.unboundedFollowing))).groupBy("data").agg(first("atRisk").as("atRisk"))
// Works, BUT, we get a "WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation."
atRiskCounts.show
最后一个解决方案没有用,因为它最终将我的数据洗牌到一个分区(在这种情况下,我不妨使用可行的选项 1)。
成功的方法很好,除了容器不是平行的,如果可能的话,我真的很想保留这一点。我查看了 groupBy
聚合、pivot
类型的聚合,但似乎都没有意义。
我的问题是:有没有办法以分布式方式计算 atRisk
列?另外,为什么我在失败的解决方案中得到了 NullPointerException
?
按评论编辑:
我最初并没有发布 NullPointerException
,因为它似乎没有包含任何有用的内容。我会记下这是通过自制软件安装在我的 Macbook Pro 上的 Spark(Spark 版本 2.2.1,独立本地主机模式)。
18/03/12 11:41:00 ERROR ExecutorClassLoader: Failed to check existence of class <root>.package on REPL class server at spark://10.37.109.111:53360/classes
java.net.URISyntaxException: Illegal character in path at index 36: spark://10.37.109.111:53360/classes/<root>/package.class
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
at java.net.URI$Parser.parseHierarchical(URI.java:3105)
at java.net.URI$Parser.parse(URI.java:3053)
at java.net.URI.<init>(URI.java:588)
at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:327)
at org.apache.spark.repl.ExecutorClassLoader.org$apache$spark$repl$ExecutorClassLoader$$getClassFileInputStreamFromSparkRPC(ExecutorClassLoader.scala:90)
at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
at org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:162)
at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:80)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
. . . .
18/03/12 11:41:00 ERROR ExecutorClassLoader: Failed to check existence of class <root>.scala on REPL class server at spark://10.37.109.111:53360/classes
java.net.URISyntaxException: Illegal character in path at index 36: spark://10.37.109.111:53360/classes/<root>/scala.class
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
at java.net.URI$Parser.parseHierarchical(URI.java:3105)
at java.net.URI$Parser.parse(URI.java:3053)
at java.net.URI.<init>(URI.java:588)
at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:327)
at org.apache.spark.repl.ExecutorClassLoader.org$apache$spark$repl$ExecutorClassLoader$$getClassFileInputStreamFromSparkRPC(ExecutorClassLoader.scala:90)
at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
at org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:162)
at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:80)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
. . .
18/03/12 11:41:00 ERROR ExecutorClassLoader: Failed to check existence of class <root>.org on REPL class server at spark://10.37.109.111:53360/classes
java.net.URISyntaxException: Illegal character in path at index 36: spark://10.37.109.111:53360/classes/<root>/org.class
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
at java.net.URI$Parser.parseHierarchical(URI.java:3105)
at java.net.URI$Parser.parse(URI.java:3053)
at java.net.URI.<init>(URI.java:588)
at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:327)
at org.apache.spark.repl.ExecutorClassLoader.org$apache$spark$repl$ExecutorClassLoader$$getClassFileInputStreamFromSparkRPC(ExecutorClassLoader.scala:90)
at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
at org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:162)
at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:80)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
. . .
18/03/12 11:41:00 ERROR ExecutorClassLoader: Failed to check existence of class <root>.java on REPL class server at spark://10.37.109.111:53360/classes
java.net.URISyntaxException: Illegal character in path at index 36: spark://10.37.109.111:53360/classes/<root>/java.class
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
at java.net.URI$Parser.parseHierarchical(URI.java:3105)
at java.net.URI$Parser.parse(URI.java:3053)
at java.net.URI.<init>(URI.java:588)
at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:327)
at org.apache.spark.repl.ExecutorClassLoader.org$apache$spark$repl$ExecutorClassLoader$$getClassFileInputStreamFromSparkRPC(ExecutorClassLoader.scala:90)
at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
at org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:162)
at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:80)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
. . .
18/03/12 11:41:00 ERROR Executor: Exception in task 0.0 in stage 55.0 (TID 432)
java.lang.NullPointerException
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1301)
at $line124.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:33)
at $line124.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:33)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/03/12 11:41:00 WARN TaskSetManager: Lost task 0.0 in stage 55.0 (TID 432, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1301)
at $line124.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:33)
at $line124.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:33)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
18/03/12 11:41:00 ERROR TaskSetManager: Task 0 in stage 55.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 55.0 failed 1 times, most recent failure: Lost task 0.0 in stage 55.0 (TID 432, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1301)
at $anonfun$1.apply(<console>:33)
at $anonfun$1.apply(<console>:33)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
... 50 elided
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1301)
at $anonfun$1.apply(<console>:33)
at $anonfun$1.apply(<console>:33)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
我最好的猜测是行 df("data").geq(x).count
可能是 barfs 的部分,因为不是每个节点都可能有 x
因此是一个空指针?
最佳答案
我没有测试过这个,所以语法可能很愚蠢,但我会做一系列的连接:
我相信你的第一个陈述等同于此——对于每个 data
值,计算有多少个 wins
:
val distDF = df.groupBy($"data").agg(sum($"censorFlag").as("wins"))
然后,如您所述,我们可以构建 bin 的数据框:
val distinctData = df.select($"data".as("dataBins")).distinct()
然后加入 >=
条件:
val atRiskCounts = distDF.join(distinctData, distDF.data >= distinctData.dataBins)
.groupBy($"data", $"wins")
.count()
关于scala - Spark RDD 或 SQL 操作来计算条件计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49184830/
我正在努力处理查询的 WHERE 部分。查询本身包含一个基于两个表中都存在的 ID 的 LEFT JOIN。但是,我要求 where 语句仅返回其中一列中存在的最大单个结果。目前我返回连接中的所有值,
我有这个代码来改变文件系统的大小。问题是,即使满足 if 条件,它也不会进入 if 条件,而我根本没有检查 if 条件。它直接进入 else 条件。 运行代码后的结果 post-install-ray
假设我有一个包含 2 列的 Excel 表格:单元格 A1 到 A10 中的日期和 B1 到 B10 中的值。 我想对五月日期的所有值求和。我有3种可能性: {=SUM((MONTH(A1:A10)=
伪代码: SELECT * FROM 'table' WHERE ('date' row.date 或 ,我们在Stack Overflow上找到一个类似的问题: https://stackove
我有下面这行代码做一个简单的查询 if ($this->fulfilled) $criteria->addCondition('fulfilled ' . (($this->fulfilled
如果在数据库中找到用户输入的键,我将尝试显示“表”中的数据。目前我已将其设置为让数据库检查 key 是否存在,如下所示: //Select all from table if a key entry
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 5 年前。 Improve th
在MYSQL中可以吗 一共有三个表 任务(task_id、task_status、...) tasks_assigned_to(ta_id、task_id、user_id) task_suggeste
我想先根据用户的状态然后根据用户名来排序我的 sql 请求。该状态由 user_type 列设置: 1=活跃,2=不活跃,3=创始人。 我会使用此请求来执行此操作,但它不起作用,因为我想在“活跃”成员
下面两个函数中最专业的代码风格是什么? 如果函数变得更复杂和更大,例如有 20 个检查怎么办? 注意:每次检查后我都需要做一些事情,所以我不能将所有内容连接到一个 if 语句中,例如: if (veh
我在 C# 项目中使用 EntityFramework 6.1.3 和 SQL Server。我有两个查询,基本上应该执行相同的操作。 1. Exams.GroupBy(x=>x.SubjectID)
我试图在 case when 语句中放入两个条件,但我在 postgresql 中遇到语法错误 case when condition 1 and condition 2 then X else Y
我正在构建一个连接多个表的查询,一个表 prodRecipe 将包含某些行的数据,但不是全部,但是 tmp_inv1 将包含所有行的计数信息。问题是,tmp_inv1.count 取决于某个项目是否在
我有一个涉及 couples of rows which have a less-than-2-hours time-difference 的查询(~0.08333 天): SELECT mt1.*,
我有一个包含许多这样的 OR 条件的代码(工作正常)来检查其中一个值是否为空,然后我们抛出一条错误消息(所有这些都必须填写) } elsif ( !$params{'account'}
我有一个名为 spGetOrders 的存储过程,它接受一些参数:@startdate 和 @enddate。这将查询“订单”表。表中的一列称为“ClosedDate”。如果订单尚未关闭,则此列将保留
在代码中,注释部分是我需要解决的问题...有没有办法在 LINQ 中编写这样的查询?我需要这个,因为我需要根据状态进行排序。 var result = ( from contact in d
我正在尝试创建一个允许省略参数的存储过程,但如果提供了参数,则进行 AND 操作: CREATE PROCEDURE MyProcedure @LastName Varchar(30)
我正在寻找一种方法来过滤我的主机文件中的新 IP 地址。我创建了一个脚本,每次我用来自矩阵企业管理器的数据调用它时都会更新我的主机文件。它工作正常。但是我必须找到一个解决方案,只允许更新 10.XX.
所以我正在做一种 slider ,当它完全向下时隐藏向下按钮,反之亦然,当向上按钮隐藏时,我遇到了问题。 var amount = $('slide').attr('number'); $('span
我是一名优秀的程序员,十分优秀!