gpt4 book ai didi

r - 如何使用最新版本的 dplyr (1.0)、sparklyr (1.4) 和 SPARK (3.0)/Hadoop (2.7) 从 Spark 数据帧中提取每组的前 n 行?

转载 作者:行者123 更新时间:2023-12-05 06:10:03 25 4
gpt4 key购买 nike

我对 top_n()scale_head() 的尝试均因错误而失败。

https://github.com/tidyverse/dplyr/issues/4467 中报告了 top_n() 的问题并由 Hadley 以评论结束:

This will be resolved by #4687 + tidyverse/dbplyr#394 through theintroduction of new slice_min() and slice_max() functions, whichalso allow us to resolve some interface issues with top_n().

尽管更新了我所有的包,但调用 top_n() 失败并显示:

Error: org.apache.spark.sql.AnalysisException: Undefined function: 'top_n_rank'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 3 pos 7

(查看下面的完整代码和日志)

好的,top_n() 现在已在 dplyr 1.0 中被取代,所以我尝试了 slice_head()。这也失败了,因为:

Error in UseMethod("slice_head") : 
no applicable method for 'slice_head' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"

我刚开始使用 sparklyr...有人可以重现这些问题吗?或者我应该寻找安装问题/某些软件包的不兼容版本?

如果问题得到确认,我还能如何从 Spark 数据框中提取每组的前 n 行?

代码示例:

library(sparklyr)
library(dplyr)

# Just in case already connected
spark_disconnect_all()

# Connect to local Spark cluster
sc <- spark_connect(master = "local")

# Print the version of Spark
spark_version(sc = sc)

# Copy data frame to Spark
iris_tbl <- copy_to(sc, iris)

# List the data frames available in Spark
src_tbls(sc)

# Get some info from the data frame
dim(iris_tbl)
glimpse(iris_tbl)

# Return the first 10 rows for each Species
# Using top_n()
top_10 <- iris_tbl %>%
group_by(Species) %>%
top_n(10)
glimpse(top_10)

# Using slice_head()
slice_head_10 <- iris_tbl %>%
group_by(Species) %>%
slice_head(n = 10)
slice_head_10
glimpse(slice_head_10)

# Disconnect from Spark
spark_disconnect(sc = sc)

# session Info
sessionInfo()

完整日志(在 Rmarkdown 中运行):

Restarting R session...

> # Chunk 1: setup
> knitr::opts_chunk$set(echo = TRUE)
>
> # Chunk 2
> library(sparklyr)
> library(dplyr)

Attaching package: ‘dplyr’

The following objects are masked from ‘package:stats’:

filter, lag

The following objects are masked from ‘package:base’:

intersect, setdiff, setequal, union

>
> # Just in case already connected
> spark_disconnect_all()
[1] 0
>
> # Connect to local Spark cluster
> sc <- spark_connect(master = "local")
* Using Spark: 3.0.0
>
> # Print the version of Spark
> spark_version(sc = sc)
[1] ‘3.0.0’
>
> # Copy data frame to Spark
> iris_tbl <- copy_to(sc, iris)
>
> # List the data frames available in Spark
> src_tbls(sc)
[1] "iris"
>
> # Get some info from the data frame
> dim(iris_tbl)
[1] NA 5
> glimpse(iris_tbl)
Rows: ??
Columns: 5
Database: spark_connection
$ Sepal_Length <dbl> 5.1, 4.9, 4.7, 4.6, 5.0, 5.4, 4.6, 5.0, 4.4, 4.9, 5.4, 4.8…
$ Sepal_Width <dbl> 3.5, 3.0, 3.2, 3.1, 3.6, 3.9, 3.4, 3.4, 2.9, 3.1, 3.7, 3.4…
$ Petal_Length <dbl> 1.4, 1.4, 1.3, 1.5, 1.4, 1.7, 1.4, 1.5, 1.4, 1.5, 1.5, 1.6…
$ Petal_Width <dbl> 0.2, 0.2, 0.2, 0.2, 0.2, 0.4, 0.3, 0.2, 0.2, 0.1, 0.2, 0.2…
$ Species <chr> "setosa", "setosa", "setosa", "setosa", "setosa", "setosa"…
>
> # Return the first 10 rows for each Species
> # Using top_n()
> top_10 <- iris_tbl %>%
+ group_by(Species) %>%
+ top_n(10)
Selecting by Species
> glimpse(top_10)
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'top_n_rank'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 3 pos 7
at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.$anonfun$applyOrElse$102(Analyzer.scala:1852)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1852)
at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1843)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:96)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:96)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:87)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveExpressions$1.applyOrElse(AnalysisHelper.scala:129)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveExpressions$1.applyOrElse(AnalysisHelper.scala:128)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$4(AnalysisHelper.scala:113)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:113)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressions(AnalysisHelper.scala:128)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressions$(AnalysisHelper.scala:127)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveExpressions(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:1843)
at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:1840)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:176)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:130)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:154)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke.invoke(invoke.scala:147)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
at sparklyr.StreamHandler.read(stream.scala:61)
at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:58)
at scala.util.control.Brea
>
> # Using slice_head()
> slice_head_10 <- iris_tbl %>%
+ group_by(Species) %>%
+ slice_head(n = 10)
Error in UseMethod("slice_head") :
no applicable method for 'slice_head' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
> slice_head_10
Error: object 'slice_head_10' not found
> glimpse(slice_head_10)
Error in glimpse(slice_head_10) : object 'slice_head_10' not found
>
> # Disconnect from Spark
> spark_disconnect(sc = sc)
>
> # session Info
> sessionInfo()
R version 4.0.3 (2020-10-10)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 20.04.1 LTS

Matrix products: default
BLAS: /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3
LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/liblapack.so.3

locale:
[1] LC_CTYPE=C.UTF-8 LC_NUMERIC=C LC_TIME=C.UTF-8
[4] LC_COLLATE=C.UTF-8 LC_MONETARY=C.UTF-8 LC_MESSAGES=C.UTF-8
[7] LC_PAPER=C.UTF-8 LC_NAME=C LC_ADDRESS=C
[10] LC_TELEPHONE=C LC_MEASUREMENT=C.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] stats graphics grDevices utils datasets methods base

other attached packages:
[1] dplyr_1.0.2.9000 sparklyr_1.4.0

loaded via a namespace (and not attached):
[1] pillar_1.4.6 compiler_4.0.3 dbplyr_1.4.4 r2d3_0.2.3
[5] base64enc_0.1-3 tools_4.0.3 digest_0.6.27 jsonlite_1.7.1
[9] lifecycle_0.2.0 tibble_3.0.4 pkgconfig_2.0.3 rlang_0.4.8
[13] DBI_1.1.0 cli_2.1.0 rstudioapi_0.11 yaml_2.2.1
[17] parallel_4.0.3 xfun_0.18 withr_2.3.0 httr_1.4.2
[21] knitr_1.30 generics_0.0.2 htmlwidgets_1.5.2 vctrs_0.3.4
[25] askpass_1.1 rappdirs_0.3.1 rprojroot_1.3-2 tidyselect_1.1.0
[29] glue_1.4.2 forge_0.2.0 R6_2.4.1 fansi_0.4.1
[33] purrr_0.3.4 tidyr_1.1.2 blob_1.2.1 magrittr_1.5
[37] backports_1.1.10 ellipsis_0.3.1 htmltools_0.5.0 assertthat_0.2.1
[41] config_0.3 utf8_1.1.4 openssl_1.4.3 crayon_1.3.4
>

最佳答案

使用filterrow_number。请注意,您需要先指定 arrange 才能使 row_numbersparklyr 中工作。

iris_tbl %>%
group_by(Species) %>%
arrange(Sepal_Length) %>%
filter(row_number() <= 3)
#> # Source: spark<?> [?? x 5]
#> # Groups: Species
#> # Ordered by: Sepal_Length
#> Sepal_Length Sepal_Width Petal_Length Petal_Width Species
#> <dbl> <dbl> <dbl> <dbl> <chr>
#> 1 4.9 2.4 3.3 1 versicolor
#> 2 5 2 3.5 1 versicolor
#> 3 5 2.3 3.3 1 versicolor
#> 4 4.9 2.5 4.5 1.7 virginica
#> 5 5.6 2.8 4.9 2 virginica
#> 6 5.7 2.5 5 2 virginica
#> 7 4.3 3 1.1 0.1 setosa
#> 8 4.4 2.9 1.4 0.2 setosa
#> 9 4.4 3 1.3 0.2 setosa

关于r - 如何使用最新版本的 dplyr (1.0)、sparklyr (1.4) 和 SPARK (3.0)/Hadoop (2.7) 从 Spark 数据帧中提取每组的前 n 行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64569388/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com