- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Spark ML_pipelines
在使用 SCALA
的生产环境中轻松部署我在 Sparklyr
中开发的操作。它工作得很好,除了一个部分:似乎当我从 Hive
读取一个表然后创建一个将操作应用于该表的管道时,该管道还将保存表读取操作,从而表的名称。但是我希望管道独立于此。
这是一个可重现的例子:
Sparklyr
部分:
sc = spark2_context(memory = "4G")
iris <- copy_to(sc, iris, overwrite=TRUE)
spark_write_table(iris, "base.iris")
spark_write_table(iris, "base.iris2")
df1 <- tbl(sc, "base.iris")
df2 <- df1 %>%
mutate(foo = 5)
pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(df2) %>%
ml_fit(df1)
ml_save(pipeline,
paste0(save_pipeline_path, "test_pipeline_reading_from_table"),
overwrite = TRUE)
df2 <- pipeline %>% ml_transform(df1)
dbSendQuery(sc, "drop table base.iris")
SCALA
部分:
import org.apache.spark.ml.PipelineModel
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val df1 = spark.sql("select * from base.iris2")
val pipeline = PipelineModel.load(pipeline_path + "/test_pipeline_reading_from_table")
val df2 = pipeline.transform(df1)
我收到这个错误:
org.apache.spark.sql.AnalysisException: Table or view not found: `base`.`iris`; line 2 pos 5;
'Project ['Sepal_Length, 'Sepal_Width, 'Petal_Length, 'Petal_Width, 'Species, 5.0 AS foo#110]
+- 'UnresolvedRelation `base`.`iris`
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:82)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:637)
at org.apache.spark.ml.feature.SQLTransformer.transformSchema(SQLTransformer.scala:86)
at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.PipelineModel.transformSchema(Pipeline.scala:310)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:304)
... 71 elided
我可以看到 2 个解决方案:
持久化 dataframe
似乎是一个解决方案,但我需要找到一种方法来避免让我的内存过载,因此我的问题是 unpersisting
将 Hive 中的表名作为管道参数传递,我试图在 this question 中解决这个问题
现在,说了这么多,我可能会遗漏一些东西,因为我只是一个初学者......
编辑:这与 this question 有很大不同。因为这涉及集成刚刚在管道中读取的数据框的特定问题,如标题中所述。
编辑:对于我的项目,在我阅读表格后保留表格是一个可行的解决方案。不知道有没有更好的解决办法。
最佳答案
Then the pipeline would call my table "base.table", making it impossible to apply it to another table.
事实并非如此。 ft_dplyr_transformer
是Spark自带的SQLTransformer
的语法糖。内部 dplyr
expression is converted to SQL query, and the name of the table is replaced with __THIS__
(Spark 占位符指的是当前表)。
假设您有这样的转换:
copy_to(sc, iris, overwrite=TRUE)
df <- tbl(sc, "iris") %>%
mutate(foo = 5)
pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(df) %>%
ml_fit(tbl(sc, "iris"))
ml_stage(pipeline, "dplyr_transformer") %>% spark_jobj() %>% invoke("getStatement")
[1] "SELECT `Sepal_Length`, `Sepal_Width`, `Petal_Length`, `Petal_Width`, `Species`, 5.0 AS `foo`\nFROM `__THIS__`"
然而,这是一种相当困惑的表达方式,直接使用 native SQL 转换器更有意义:
pipeline <- ml_pipeline(sc) %>%
ft_sql_transformer("SELECT *, 5 as `foo` FROM __THIS__") %>%
ml_fit(df)
编辑:
您在这里遇到的问题看起来像是一个错误。 get_base_name
函数返回不带引号的表名,因此您的情况下的值为
> get_base_name(x$ops)
<IDENT> default.iris
模式将是
> pattern
[1] "\\bdefault.iris\\b"
但是dbplyr::sql_render
返回反引号的完全限定名称:
> dbplyr::sql_render(x)
<SQL> SELECT `Sepal_Length`, `Sepal_Width`, `Petal_Length`, `Petal_Width`, `Species`, 5.0 AS `foo`
FROM `default`.`iris`
因此模式与名称不匹配。
关于r - Spark ML_pipelines : managing table reading,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56344675/
目前正在学习 Spark 的类(class)并了解到执行者的定义: Each executor will hold a chunk of the data to be processed. Thisc
阅读了有关 http://spark.apache.org/docs/0.8.0/cluster-overview.html 的一些文档后,我有一些问题想要澄清。 以 Spark 为例: JavaSp
Spark核心中的调度器与以下Spark Stack(来自Learning Spark:Lightning-Fast Big Data Analysis一书)中的Standalone Schedule
我想在 spark-submit 或 start 处设置 spark.eventLog.enabled 和 spark.eventLog.dir -all level -- 不要求在 scala/ja
我有来自 SQL Server 的数据,需要在 Apache Spark (Databricks) 中进行操作。 在 SQL Server 中,此表的三个键列使用区分大小写的 COLLATION 选项
所有这些有什么区别和用途? spark.local.ip spark.driver.host spark.driver.bind地址 spark.driver.hostname 如何将机器修复为 Sp
我有大约 10 个 Spark 作业,每个作业都会进行一些转换并将数据加载到数据库中。必须为每个作业单独打开和关闭 Spark session ,每次初始化都会耗费时间。 是否可以只创建一次 Spar
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell 20/09/23 10:58:45 WARN Utils: Your hostname,
我是 Spark 的完全新手,并且刚刚开始对此进行更多探索。我选择了更长的路径,不使用任何 CDH 发行版安装 hadoop,并且我从 Apache 网站安装了 Hadoop 并自己设置配置文件以了解
TL; 博士 Spark UI 显示的内核和内存数量与我在使用 spark-submit 时要求的数量不同 更多细节: 我在独立模式下运行 Spark 1.6。 当我运行 spark-submit 时
spark-submit 上的文档说明如下: The spark-submit script in Spark’s bin directory is used to launch applicatio
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 6 个月前关闭。 Improve
我想了解接收器如何在 Spark Streaming 中工作。根据我的理解,将有一个接收器任务在执行器中运行,用于收集数据并保存为 RDD。当调用 start() 时,接收器开始读取。需要澄清以下内容
有没有办法在不同线程中使用相同的 spark 上下文并行运行多个 spark 作业? 我尝试使用 Vertx 3,但看起来每个作业都在排队并按顺序启动。 如何让它在相同的 spark 上下文中同时运行
我们有一个 Spark 流应用程序,这是一项长期运行的任务。事件日志指向 hdfs 位置 hdfs://spark-history,当我们开始流式传输应用程序时正在其中创建 application_X
我们正在尝试找到一种加载 Spark (2.x) ML 训练模型的方法,以便根据请求(通过 REST 接口(interface))我们可以查询它并获得预测,例如http://predictor.com
Spark newb 问题:我在 spark-sql 中进行完全相同的 Spark SQL 查询并在 spark-shell . spark-shell版本大约需要 10 秒,而 spark-sql版
我正在使用 Spark 流。根据 Spark 编程指南(参见 http://spark.apache.org/docs/latest/programming-guide.html#accumulato
我正在使用 CDH 5.2。我可以使用 spark-shell 运行命令。 如何运行包含spark命令的文件(file.spark)。 有没有办法在不使用 sbt 的情况下在 CDH 5.2 中运行/
我使用 Elasticsearch 已经有一段时间了,但使用 Cassandra 的经验很少。 现在,我有一个项目想要使用 Spark 来处理数据,但我需要决定是否应该使用 Cassandra 还是
我是一名优秀的程序员,十分优秀!