gpt4 book ai didi

r - R 中的 furrr 包不会继续将工作分散到所有内核?

转载 作者:行者123 更新时间:2023-12-04 15:50:21 41 4
gpt4 key购买 nike

我的任务是计算余弦差异。

给定一个用户观察的数据框,我在每对行之间执行余弦相异。

长话短说,我正在使用 furrr::future_map2_dfr 函数将计算分散到我拥有的所有内核上。

出于某种原因,当一些核心空闲而其他核心正在努力工作时,他们的工作不会继续分散到其他核心。

例如:

这里是起点:

enter image description here

现在正在计算中:

enter image description here

为什么核心 1、2、5、6、8、11、12、15 不参与并分享剩下的工作?

与其他计算相同。

我是否错过了任何可以改变当前行为的 furrr 设置?

附言

现在有 5 个核心“努力”工作,出于某种原因,furrr 没有将他们的工作分散到所有 16 个核心以使其更快。

函数:

dissimilarity_wrapper <- function(n_users, 
train_data,
train_data_std,
test_data,
std_thresh = 0.5) {

# NOTE:
# n_users must be set to maximum users in order to make this function
# work properly.

# Generating the options:

user_combinations <- expand.grid(i = seq_len(n_users),
j = seq_len(n_users))

plan(strategy = multicore)

expand_grid_options <- furrr::future_map2_dfr(.x = user_combinations$i,
.y = user_combinations$j,
function(x, y) {
expand.grid(test_idx = which(test_data$user_id == x),
train_idx = which(train_data$user_id == y))})

drop <- c("user_id", "row_num",
"obs_id", "scroll_id",
"time_stamp", "seq_label",
"scroll_length")

test <- test_data[expand_grid_options$test_idx, !names(test_data) %in% drop]

train <- train_data[expand_grid_options$train_idx, !names(train_data) %in% drop]

train_std <- train_data_std[expand_grid_options$train_idx, ]

# Calculate different D's:

D_manhattan_scaled <- (abs(test - train) / train_std) %>% rowSums()

D_cosinus <- 1 - (rowSums(test * train) / (sqrt(rowSums(test^2) * rowSums(train^2))))

train_std[train_std < std_thresh] <- 1

D_manhattan_scaled_adj_std <- (abs(test - train) / train_std) %>% rowSums()

D_manhattan <- (abs(test - train)) %>% rowSums()

return(expand_grid_options %>%
dplyr::mutate(
D_manhattan_scaled = D_manhattan_scaled,
D_cosinus = D_cosinus,
D_manhattan_scaled_adj_std = D_manhattan_scaled_adj_std,
D_manhattan = D_manhattan,
isSame = test_data[test_idx, ]$user_id == train_data[train_idx, ]$user_id))

}


train_test_std_split <- function(data,
train_size,
test_size,
feature_selection) {

train_set <- data %>%
dplyr::ungroup() %>%
dplyr::arrange(time_stamp) %>%
dplyr::group_by(user_id) %>%
dplyr::filter(row_number() <= train_size) %>%
dplyr::ungroup()

if (length(feature_selection) > 1) {

# Manual:
# scaling_param_est <- scale_param_est_total_UG

scaling_param_est <- train_set %>%
dplyr::group_by(user_id) %>%
dplyr::summarize_at(vars(feature_selection), funs(mean, sd))

} else if (length(feature_selection) == 1) {

scaling_param_est <- train_set %>%
dplyr::group_by(user_id) %>%
dplyr::summarize_at(vars(feature_selection), funs(mean, sd)) %>%
dplyr::rename_at(vars("mean", "sd"),
funs(paste(feature_selection, ., sep = "_")))

}

train_set <- train_set %>%
dplyr::group_by(user_id) %>%
dplyr::mutate_at(vars(feature_selection), scale) %>%
data.table::as.data.table() %>%
dplyr::ungroup() %>%
dplyr::as_tibble() %>%
dplyr::arrange(time_stamp)

train_set_std <- train_set %>%
dplyr::left_join(train_set %>%
dplyr::group_by(user_id) %>%
dplyr::summarize_at(feature_selection, sd) %>%
dplyr::rename_at(vars(-"user_id"),
funs(paste0(feature_selection, "_sd"))), by = "user_id") %>%
dplyr::ungroup() %>%
dplyr::arrange(time_stamp) %>%
dplyr::select(matches("_sd"))

test_set_unscaled <- data %>%
dplyr::ungroup() %>%
dplyr::arrange(time_stamp) %>%
dplyr::filter(!(obs_id %in% train_set$obs_id)) %>%
dplyr::group_by(user_id) %>%
dplyr::filter(row_number() <= test_size) %>%
dplyr::ungroup()

# Manual:
# test_set_joined_with_scaling_params <- cbind(test_set_unscaled, scaling_param_est)
test_set_unscaled_joined_with_scaling_params <- test_set_unscaled %>%
dplyr::left_join(scaling_param_est, by = "user_id")

test_set_unscaled_joined_with_scaling_params[, feature_selection] <-
(test_set_unscaled_joined_with_scaling_params[, feature_selection] -
test_set_unscaled_joined_with_scaling_params[, paste0(feature_selection, "_mean")]) /
test_set_unscaled_joined_with_scaling_params[, paste0(feature_selection, "_sd")]

test_set <- test_set_unscaled_joined_with_scaling_params %>%
dplyr::select(user_id, obs_id, scroll_id,
time_stamp, row_num, scroll_length,
feature_selection)


# Validate:

# intersect(unique(test_set$obs_id), unique(train_set$obs_id))

# compute_std <- train_set %>%
# dplyr::group_by(user_id) %>%
# dplyr::select(-row_num) %>%
# dplyr::rename_at(vars(-user_id, -obs_id, -scroll_id,
# -time_stamp, -scroll_length),
# funs(paste(., "std", sep = "_"))) %>%
# dplyr::summarize_at(vars(matches("_std$")), funs(sd)) %>%
# dplyr::ungroup()

return(list("train_set" = train_set,
"train_set_std" = train_set_std,
"test_set" = test_set,
"test_set_unscaled" = test_set_unscaled))

}

build_dissimilarity_rank <- function(n_users,
train_set,
train_set_std,
test_set,
D_type = "D_cosinus") {

return(dissimilarity_wrapper(n_users, train_set, train_set_std, test_set) %>%
dplyr::mutate(train_user_id = train_set[train_idx, ]$user_id,
test_user_id = test_set[test_idx, ]$user_id) %>%
dplyr::select(test_idx,
train_user_id,
test_user_id,
train_idx,
D_manhattan_scaled,
D_cosinus,
D_manhattan_scaled_adj_std,
D_manhattan,
isSame) %>%
dplyr::group_by(test_idx, train_user_id) %>%
dplyr::arrange(train_user_id, !!rlang::sym(D_type)) %>%
dplyr::mutate(D_manhattan_rank = rank(D_manhattan),
D_manhattan_scaled_rank = rank(D_manhattan_scaled, ties.method = "first"),
D_cosinus_rank = rank(D_cosinus, ties.method = "first")) %>%
dplyr::ungroup())

}

build_param_est <- function(dissimilarity_rank,
K,
D_type_rank = "D_manhattan_scaled") {

return(dissimilarity_rank %>%
dplyr::filter(isSame, (!!rlang::sym(paste0(D_type_rank, "_rank"))) == K) %>%
dplyr::group_by(train_user_id) %>%
dplyr::summarise_at(vars(D_manhattan_scaled,
D_cosinus,
D_manhattan_scaled_adj_std,
D_manhattan),
funs(mean, median, sd, quantile(., probs = .9))) %>%
dplyr::rename_at(vars(matches("_quantile")),
funs(str_replace(., "_quantile", "_percentile_90"))) %>%
dplyr::rename_at(vars(matches("_sd")),
funs(str_replace(., "_sd", "_std")))
)
}

build_dissimilarity_table <- function(dissimilarity_rank,
param_est,
K,
i,
D_type_rank = "D_manhattan_scaled",
D_s = c("D_manhattan_scaled",
"D_cosinus",
"D_manhattan_scaled_adj_std",
"D_manhattan")) {

dissimilarity_table <- dissimilarity_rank %>%
dplyr::filter(isSame, (!!rlang::sym(paste0(D_type_rank, "_rank"))) == K) %>%
dplyr::left_join(param_est, by = c("train_user_id")) %>%
dplyr::ungroup()

dissimilarity_table[paste0(D_s, "_norm_standard")] <-
(dissimilarity_table[D_s] - dissimilarity_table[paste0(D_s, "_mean")]) /
dissimilarity_table[paste0(D_s, "_std")]

dissimilarity_table[paste0(D_s, "_norm_median")] <-
(dissimilarity_table[D_s] - dissimilarity_table[paste0(D_s, "_median")]) /
(dissimilarity_table[paste0(D_s, "_percentile_90")] - dissimilarity_table[paste0(D_s, "_median")])

# dplyr::mutate(experiment = i))

return(dissimilarity_table)

}

k_fold_data_prepare <- function(df, min_scroll_len = 3) {

# Given the data, split it by user id:

return(df %>%
dplyr::filter(scroll_length >= min_scroll_len) %>%
dplyr::arrange(time_stamp) %>%
dplyr::ungroup() %>%
split(.$user_id))

}

k_fold_engine <- function(df,
obs,
n_users,
K = 2,
feature_selection,
D_type = "D_cosinus") {

# Train - Test Split:

train_set <- df %>%
dplyr::arrange(time_stamp) %>%
dplyr::filter(obs_id != obs)

if (length(feature_selection) > 1) {

# Manual:
# scaling_param_est <- scale_param_est_total_UG
scaling_param_est <- train_set %>%
dplyr::arrange(time_stamp) %>%
dplyr::group_by(user_id) %>%
dplyr::summarize_at(vars(feature_selection),
funs(mean, sd))

} else if (length(feature_selection) == 1) {

scaling_param_est <- train_set %>%
dplyr::arrange(time_stamp) %>%
dplyr::group_by(user_id) %>%
dplyr::summarize_at(vars(feature_selection), funs(mean, sd)) %>%
dplyr::rename_at(vars("mean", "sd"),
funs(paste(feature_selection, ., sep = "_")))

}

train_set <- train_set %>%
dplyr::arrange(time_stamp) %>%
dplyr::group_by(user_id) %>%
dplyr::mutate_at(vars(feature_selection), scale) %>%
as.data.table() %>%
dplyr::ungroup() %>%
as_tibble()

# Compute std for each train variable:

train_set_std <- train_set %>%
dplyr::left_join(train_set %>%
dplyr::group_by(user_id) %>%
dplyr::summarize_at(feature_selection, sd) %>%
dplyr::rename_at(vars(-"user_id"),
funs(paste0(feature_selection, "_sd"))), by = "user_id") %>%
dplyr::select(matches("_sd"))

test_set <- df %>%
dplyr::filter(obs_id == obs)

test_set_joined_with_scaling_params <- test_set %>%
dplyr::left_join(scaling_param_est, by = "user_id") %>%
dplyr::arrange(time_stamp)

# Manual:
# test_set_joined_with_scaling_params <- cbind(test_set, scaling_param_est)

test_set_joined_with_scaling_params[, feature_selection] <-
(test_set_joined_with_scaling_params[, feature_selection] -
test_set_joined_with_scaling_params[, paste0(feature_selection, "_mean")]) /
test_set_joined_with_scaling_params[, paste0(feature_selection, "_sd")]

test_set <- test_set_joined_with_scaling_params %>%
dplyr::arrange(time_stamp) %>%
dplyr::select(user_id, obs_id, scroll_id,
time_stamp, row_num, scroll_length,
feature_selection)

# Compute std for each train variable:
# compute_std <- train_set %>%
# dplyr::group_by(user_id) %>%
# dplyr::select(-row_num) %>%
# dplyr::rename_at(vars(-user_id, -obs_id, -scroll_id,
# -time_stamp, -scroll_length),
# funs(paste(., "std", sep = "_"))) %>%
# dplyr::summarize_at(vars(matches("_std$")), funs(sd)) %>%
# dplyr::ungroup()
#
# train_set_std <- dplyr::left_join(train_set,
# compute_std,
# by = "user_id") %>%
# dplyr::ungroup() %>%
# dplyr::select(matches("_std$"))

# Compute the dissimilarities:

return(build_dissimilarity_rank(n_users,
train_set,
train_set_std,
test_set,
D_type))

}

k_fold_wrapper <- function(data_df,
K = 2,
D_type_rank = "D_cosinus",
feature_selection) {

data_seqed <- k_fold_data_prepare(data_df)

# Given the data splitted by user id, split it by observation id:

data_seqed_by_obs <- future_imap(data_seqed, ~split(., .$obs_id ))

# Get the observation ids per each splitted sub dataframe:

obs_ids <- future_imap(data_seqed_by_obs, ~as.integer(names(.)))

# Feed kfold engine with splitted data by user id and observations names:

plan(strategy = multicore)

dissimilarity_rank <- furrr::future_map_dfr(data_seqed, function(x) {

furrr::future_map_dfr(obs_ids[[as.character(x$user_id[1])]],

function(df,
obs,
n_users,
K,
feature_selection,
D_type_rank) {

k_fold_engine(df,
obs,
n_users,
K,
feature_selection,
D_type_rank) },

df = x, n_users = x$user_id[1],

K = K, feature_selection = feature_selection,

D_type = D_type_rank) } )


if(nrow(dissimilarity_rank[which(rowSums(is.na(dissimilarity_rank)) > 0), ])) {

dissimilarity_rank <- dissimilarity_rank[which(rowSums(is.na(dissimilarity_rank)) == 0), ] %>%
dplyr::mutate(row_num = row_number())

}

param_estimations <- dissimilarity_rank %>%
build_param_est(K, D_type_rank = D_type_rank)

# Summarize and return final param estimation (average):

# return(param_estimations %>%
# dplyr::group_by(train_user_id) %>%
# summarize_at(vars(-"train_user_id"), mean))

return(list("dissimilarity_rank" = dissimilarity_rank,
"param_estimations" = param_estimations))

}

导致问题的最终脚本:

n_users <- max(unique(data$user_id))

train_df <- data %>%
dplyr::group_by(user_id) %>%
dplyr::filter(row_number() <= 50)

filter_users_low_amount_obs <- train_df %>%
dplyr::group_by(user_id) %>%
dplyr::summarise(n_obs = length(unique(obs_id))) %>%
dplyr::arrange(n_obs) %>%
dplyr::filter(n_obs >= 3) %>%
select(user_id)

train_df <- train_df %>%
filter(user_id %in% filter_users_low_amount_obs$user_id)

k_fold_d_rank_param_est <- k_fold_wrapper(train_df, K, D_type_rank = D_type, feature_selection)

dissimilarity_rank_1 <- k_fold_d_rank_param_est$dissimilarity_rank

param_est <- k_fold_d_rank_param_est$param_estimations

train_test_std_split_2 <- train_test_std_split(data,
train_size_2,
test_size = Inf,
feature_selection)

dissimilarity_rank_2 <- build_dissimilarity_rank(n_users,
train_test_std_split_2$train_set,
train_test_std_split_2$train_set_std,
train_test_std_split_2$test_set)

最佳答案

我相信您缺少的选项是 furrr 的调度选项。默认情况下,您的数据被分成与您在 future_map 调用开始时指定的工作人员一样多的 block ,然后每个工作人员被分配一个 block 来处理。一旦工作人员完成了它的 block ,它将寻找另一个 block 并开始处理它。如果没有剩余的 block ,工作人员将空闲。

您可以使用调度选项指定每个工作人员应将数据分成多少 block 。例如,.options = furrr_options(scheduling = 2) 将为每个 worker 创建两个 block ,提前完成的 worker 将开始处理另一个 block 。

有关更多信息,请参阅有关分块的小插图 https://davisvaughan.github.io/furrr/articles/articles/chunking.html

PS:您的代码中有一些嵌套的 future 调用,具体取决于您指定的 future::plan() 这只会减慢代码

关于r - R 中的 furrr 包不会继续将工作分散到所有内核?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54106675/

41 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com