gpt4 book ai didi

r - 如何在 Spark 中训练带有稀疏矩阵的随机森林?

转载 作者:行者123 更新时间:2023-12-02 15:45:36 24 4
gpt4 key购买 nike

考虑这个使用 sparklyr 的简单示例:

library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source: table<mytext_spark> [?? x 3]
# Database: spark_connection
text book label
<chr> <chr> <int>
1 SENSE AND SENSIBILITY Sense & Sensibility 0
2 "" Sense & Sensibility 0
3 by Jane Austen Sense & Sensibility 0
4 "" Sense & Sensibility 0
5 (1811) Sense & Sensibility 0
6 "" Sense & Sensibility 0
7 "" Sense & Sensibility 0
8 "" Sense & Sensibility 0
9 "" Sense & Sensibility 0
10 CHAPTER 1 Sense & Sensibility 0
11 "" Sense & Sensibility 0
12 "" Sense & Sensibility 0
13 The family of Dashwood had long been settled in Sussex. Their estate Sense & Sensibility 0
14 was large, and their residence was at Norland Park, in the centre of Sense & Sensibility 0
15 their property, where, for many generations, they had lived in so Sense & Sensibility 0
16 respectable a manner as to engage the general good opinion of their Sense & Sensibility 0

数据帧的大小相当小(大约70k行和14k唯一单词)。

现在,在我的集群上训练一个朴素贝叶斯模型只需要几秒钟。首先,我定义管道

pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))

然后训练朴素贝叶斯模型

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
expr min lq mean median uq max neval
model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832 3

现在的问题是,尝试在相同的()上运行任何基于的模型(随机森林增强树等)实际上很小!!)数据集将无法工作。

pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240,
cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 1580, 1.1.1.1.1, executor 5): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

我认为这是由于 token 的矩阵表示的稀疏性造成的,但是这里有什么可以做的吗?这是 sparklyr 问题吗? spark 问题?我的代码效率低下吗?

谢谢!

最佳答案

您收到此错误是因为您实际上达到了 Spark https://issues.apache.org/jira/browse/SPARK-6235 中著名的 2G 限制。

解决方案是在将数据输入算法之前重新分区。

这实际上是这篇文章中的两个陷阱:

  • 使用本地数据。
  • Spark 中基于树的模型非常消耗内存。

那么,让我们检查一下您的代码,它看起来无害;

 library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

那么最后一行是做什么的?

copy_to(不是为大数据集设计的),实际上只是将本地 R 数据帧复制到 1 个分区的 Spark DataFrame

因此,您只需要重新分区数据,以确保管道在输入 gbt 之前准备好数据后,分区大小小于 2GB。

因此,您可以执行以下操作来重新分区数据:

# 20 is an arbitrary number I chose to test and it seems to work well in this case, 
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
sdf_repartition(partitions = 20)

PS1: max_memory_in_mb 是您为 gbt 提供的用于计算其统计信息的内存量。它与输入的数据量没有直接关系。

PS2:如果您没有为执行程序设置足够的内存,您可能会遇到java.lang.OutOfMemoryError:超出 GC 开销限制

编辑:重新分区数据的含义是什么?

在讨论重新分区之前,我们总是可以引用分区的定义。我会尽量简短。

A partition is a logical chunk of a large distributed data set.

Spark manages data using partitions that helps parallelize distributed data processing with minimal network traffic for sending data between executors. By default, Spark tries to read data into an RDD from the nodes that are close to it. Since Spark usually accesses distributed partitioned data, to optimize transformation operations it creates partitions to hold the data chunks.

Increasing partitions count will make each partition to have less data (or not at all!)

source: excerpt from @JacekLaskowski Mastering Apache Spark book.

但是数据分区并不总是正确的,就像在本例中一样。所以需要重新分区。 (sdf_repartition 用于 sparklyr)

sdf_repartition 将在节点之间分散和洗牌数据。即 sdf_repartition(20) 将创建 20 个数据分区,而不是本例中最初的 1 个分区。

我希望这会有所帮助。

整个代码:

library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)

library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)

pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))

library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)

pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240, # this is amount of data that can be use for
cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)

pipeline3 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')

model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7>
# Stages
# |--1 RegexTokenizer (Transformer)
# | <regex_tokenizer_1ce4342b543b>
# | (Parameters -- Column Names)
# | input_col: text
# | output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# | <count_vectorizer_1ce4e0e6489>
# | (Parameters -- Column Names)
# | input_col: mytoken
# | output_col: finaltoken
# | (Transformer Info)
# | vocabulary: <list>
# |--3 GBTClassificationModel (Transformer)
# | <gbt_classifier_1ce41ab30213>
# | (Parameters -- Column Names)
# | features_col: finaltoken
# | label_col: label
# | prediction_col: pcol
# | probability_col: prcol
# | raw_prediction_col: rpcol
# | (Transformer Info)
# | feature_importances: num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...
# | num_classes: int 2
# | num_features: int 39158
# | total_num_nodes: int 540
# | tree_weights: num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...
# | trees: <list>

关于r - 如何在 Spark 中训练带有稀疏矩阵的随机森林?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50700233/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com