- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
尝试在 sparklyr 中拆分一个字符串,然后将其用于连接/过滤
我尝试了将字符串标记化然后将其分离到新列的建议方法。这是一个可重现的示例(请注意,我必须将在 copy_to 之后变成字符串“NA”的 NA 翻译成实际的 NA,有没有办法不必这样做)
x <- data.frame(Id=c(1,2,3,4),A=c('A-B','A-C','A-D',NA))
df <- copy_to(sc,x,'df')
df %>% mutate(A = ifelse(A=='NA',NA,A)) %>% ft_regex_tokenizer(input.col="A", output.col="B", pattern="-",to_lower_case=F) %>%
sdf_separate_column("B", into=c("C", "D")) %>% filter(C=='A')
问题是,如果我尝试过滤新创建的列(例如 %>% filter(C=='A')
或加入它们,我会收到错误,见下文
Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 367.0 failed 4 times, most recent failure: Lost task 0.3 in stage 367.0 (TID 5062, 10.139.64.4, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:51)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:148)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:147)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
at java.util.regex.Matcher.reset(Matcher.java:309)
at java.util.regex.Matcher.<init>(Matcher.java:229)
at java.util.regex.Pattern.matcher(Pattern.java:1093)
at java.util.regex.Pattern.split(Pattern.java:1206)
at java.util.regex.Pattern.split(Pattern.java:1273)
at scala.util.matching.Regex.split(Regex.scala:526)
at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:144)
at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:141)
... 15 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2252)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:259)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2827)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3439)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2794)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2794)
at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3423)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2794)
at sparklyr.Utils$.collect(utils.scala:200)
at sparklyr.Utils.collect(utils.scala)
at sun.reflect.GeneratedMethodAccessor577.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke.invoke(invoke.scala:139)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66)
at sparklyr.BackendHandler.channelRead0(handler.scala:51)
at sparklyr.BackendHandler.channelRead0(handler.scala:4)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkExcepti
In addition: Warning messages:
1: The parameter `input.col` is deprecated and will be removed in a future release. Please use `input_col` instead.
2: The parameter `output.col` is deprecated and will be removed in a future release. Please use `output_col` instead
不确定为什么创建的列的类型根据 sdf_schema 是“StringType”。
是否有使用 sparklyr 实际分离列的解决方案,我以后可以将其用作字符串,而不必将框架写到文件中,或者不必收集到驱动程序节点?
最佳答案
在这里使用 Spark ML 转换器不是一个好的选择。相反,你应该 split
函数:
df %>%
mutate(B = split(A, "-")) %>%
sdf_separate_column("B", into = c("C", "D")) %>%
filter(C %IS NOT DISTINCT FROM% "A")
# Source: spark<?> [?? x 5]
Id A B C D
<dbl> <chr> <list> <chr> <chr>
1 1 A-B <list [2]> A B
2 2 A-C <list [2]> A C
3 3 A-D <list [2]> A D
或regexp_extract
pattern <- "^(.*)-(.*)$"
df %>%
mutate(
C = regexp_extract(A, pattern, 1),
D = regexp_extract(A, pattern, 2)
) %>%
filter(C %IS NOT DISTINCT FROM% "A")
# Source: spark<?> [?? x 4]
Id A C D
<dbl> <chr> <chr> <chr>
1 1 A-B A B
2 2 A-C A C
3 3 A-D A D
尽管如此,如果你想让 RegexpTokenzier
工作,你必须先处理 NULL
(外部 R 类型中的 NA
)。例如可以使用 coalesce
tokenizer <- ft_regex_tokenizer(
sc, input_col = "A", output_col = "B",
pattern = "-", to_lower_case = F
)
df %>%
mutate(A = coalesce(A, "")) %>%
ml_transform(tokenizer, .) %>%
sdf_separate_column("B", into=c("C", "D")) %>%
filter(C %IS NOT DISTINCT FROM% "A")
# Source: spark<?> [?? x 5]
Id A B C D
<dbl> <chr> <list> <chr> <chr>
1 1 A-B <list [2]> A B
2 2 A-C <list [2]> A C
3 3 A-D <list [2]> A D
或者先删除丢失的数据:
df %>%
# or filter(!is.na(A))
na.omit(columns=c("A")) %>%
ml_transform(tokenizer, .) %>%
sdf_separate_column("B", into=c("C", "D")) %>%
filter(C %IS NOT DISTINCT FROM% "A")
* Dropped 1 rows with 'na.omit' (4 => 3)
# Source: spark<?> [?? x 5]
Id A B C D
<dbl> <chr> <list> <chr> <chr>
1 1 A-B <list [2]> A B
2 2 A-C <list [2]> A C
3 3 A-D <list [2]> A D
关于r - Sparklyr 拆分字符串(到字符串),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54522707/
当我尝试使用 Rstudio 和 sparklyr 使用此代码访问 Hive 表时: library(sparklyr) library(dplyr) Sys.setenv(SPARK_HOME="/
以下示例描述了如何在不使用 dplyr 和 sparklyr 聚合行的情况下计算不同值的数量。 有没有不破坏命令链的解决方法? 更一般地说,如何在 sparklyr 数据帧上使用类似 sql 的窗口函
我正在尝试在 sparklyr 中复制 tidyr:complete 函数。我有一个包含一些缺失值的数据框,我必须填写这些行。在 dplyr/tidyr 中我可以这样做: data
我想从 sparklyr 中我的 Spark DataFrame 的每个类中采样 n 行。 我知道 dplyr::sample_n 函数不能用于此 (Is sample_n really a rand
希望将一些 R 代码转换为 Sparklyr,函数如 lmtest::coeftest() 和 sandwich::sandwich()。尝试开始使用 Sparklyr 扩展,但对 Spark API
我想跳过(退出)文本文件的前两行: 据我所知,使用 sparklyr 方法是不可能的 spark_read_csv .有一些解决方法可以解决这个简单的问题吗? 我知道 sparklyr extensi
在 Spark 2.0 中,我可以将多个文件路径合并为一个加载(参见例如 How to import multiple csv files in a single load?)。 如何使用 spark
Sparklyr 处理分类变量 我来自 R 背景,习惯于在后端处理分类变量(作为因子)。对于 Sparklyr,使用 string_indexer 或 onehotencoder 非常令人困惑。 例如
我正在尝试在sparklyr中读取2GB〜(5mi行)的.csv: bigcsvspark <- spark_read_csv(sc, "bigtxt", "path",
我很抱歉这个问题很难完全重现,因为它涉及一个正在运行的 spark 上下文(在下面引用为 sc),但我正在尝试在 sparklyr 中设置一个 hadoopConfiguration,专门用于从 RS
我有一个朴素贝叶斯模型在 sparklyr 中使用 ml_naive_bayes 运行,如下所示: library(sparklyr) library(dplyr) sc model Call: m
我在使用 ft_.. sparklyr R 包中的函数时遇到了一些问题。 ft_bucketizer 有效,但 ft_normalizer 或 ft_min_max_scaler 无效。这是一个例子:
即使在相当小的数据集上,我也会遇到堆空间错误。我可以确定我没有耗尽系统内存。例如,考虑一个包含大约 20M 行和 9 列的数据集,它在磁盘上占用 1GB。我在具有 30GB 内存的 Google Co
尝试在 sparklyr 中拆分一个字符串,然后将其用于连接/过滤 我尝试了将字符串标记化然后将其分离到新列的建议方法。这是一个可重现的示例(请注意,我必须将在 copy_to 之后变成字符串“NA”
我对 sparklyr 和 spark 很陌生,所以如果这不是执行此操作的“spark”方式,请告诉我。 我的问题 我有 50 多个 .txt 文件,每个文件大约 300 mb,都在同一个文件夹中,将
我对 Spark 很陌生,目前正在通过 sparkly 包使用 R API 使用它。我从 hive 查询创建了一个 Spark 数据框。源表中未正确指定数据类型,我试图通过利用来自 dplyr 的函数
我需要使用 sparklyr 计算 R 中两个字符串之间的距离。有没有办法使用 stringdist 或任何其他包?我想使用cousine distance。此距离用作 stringdist 函数的方
在以下示例中,我加载了一个 Parquet 文件,该文件包含 meta 中 map 对象的嵌套记录。 field 。 sparklyr似乎在处理这些方面做得很好。然而tidyr::unnest不会转换
我是 sparklyr 的新手(但熟悉 spark 和 pyspark),我有一个非常基本的问题。我正在尝试根据部分匹配过滤列。在 dplyr 中,我会这样写我的操作: businesses %>%
我在 Spark 中有一个数据框,希望在按特定列分组后计算 0.1 分位数。 例如: > library(sparklyr) > library(tidyverse) > con = spark_co
我是一名优秀的程序员,十分优秀!