- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如果在任何一个中遇到错误(例如 stop()
),是否可以请求 parallel::mclapply()
尽快放弃所有进一步处理它的流程?
最佳答案
这是另一种方法:想法是在用#!!
指示的三个位置修改parallel::mclapply()
。新参数 stop.on.error 可用于指定发生错误时是否应停止执行。
library(parallel)
Mclapply <- function (X, FUN, ..., mc.preschedule = TRUE,
mc.set.seed = TRUE, mc.silent = FALSE,
mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE,
affinity.list = NULL, stop.on.error=FALSE)
{
stop.on.error <- stop.on.error[1] #!!
stopifnot(is.logical(stop.on.error)) #!!
cores <- as.integer(mc.cores)
if ((is.na(cores) || cores < 1L) && is.null(affinity.list))
stop("'mc.cores' must be >= 1")
parallel:::.check_ncores(cores)
if (parallel:::isChild() && !isTRUE(mc.allow.recursive))
return(lapply(X = X, FUN = FUN, ...))
if (!is.vector(X) || is.object(X))
X <- as.list(X)
if (!is.null(affinity.list) && length(affinity.list) < length(X))
stop("affinity.list and X must have the same length")
if (mc.set.seed)
mc.reset.stream()
if (length(X) < 2) {
old.aff <- mcaffinity()
mcaffinity(affinity.list[[1]])
res <- lapply(X = X, FUN = FUN, ...)
mcaffinity(old.aff)
return(res)
}
if (length(X) < cores)
cores <- length(X)
if (cores < 2L && is.null(affinity.list))
return(lapply(X = X, FUN = FUN, ...))
jobs <- list()
parallel:::prepareCleanup()
on.exit(parallel:::cleanup(mc.cleanup))
if (!mc.preschedule) {
FUN <- match.fun(FUN)
if (length(X) <= cores && is.null(affinity.list)) {
jobs <- lapply(seq_along(X), function(i) mcparallel(FUN(X[[i]],
...), name = names(X)[i], mc.set.seed = mc.set.seed,
silent = mc.silent))
res <- mccollect(jobs)
if (length(res) == length(X))
names(res) <- names(X)
has.errors <- sum(sapply(res, inherits, "try-error"))
}
else {
sx <- seq_along(X)
res <- vector("list", length(sx))
names(res) <- names(X)
fin <- rep(FALSE, length(X))
if (!is.null(affinity.list)) {
cores <- max(unlist(x = affinity.list, recursive = TRUE))
d0 <- logical(cores)
cpu.map <- lapply(sx, function(i) {
data <- d0
data[as.vector(affinity.list[[i]])] <- TRUE
data
})
ava <- do.call(rbind, cpu.map)
}
else {
ava <- matrix(TRUE, nrow = length(X), ncol = cores)
}
jobid <- integer(cores)
for (i in 1:cores) {
jobid[i] <- match(TRUE, ava[, i])
ava[jobid[i], ] <- FALSE
}
if (anyNA(jobid)) {
unused <- which(is.na(jobid))
jobid <- jobid[-unused]
ava <- ava[, -unused, drop = FALSE]
}
jobs <- lapply(jobid, function(i) mcparallel(FUN(X[[i]],
...), mc.set.seed = mc.set.seed, silent = mc.silent,
mc.affinity = affinity.list[[i]]))
jobsp <- parallel:::processID(jobs)
has.errors <- 0L
delivered.result <- 0L
while (!all(fin)) {
s <- parallel:::selectChildren(jobs[!is.na(jobsp)], -1)
if (is.null(s))
break
if (is.integer(s))
for (ch in s) {
ji <- match(TRUE, jobsp == ch)
ci <- jobid[ji]
r <- parallel:::readChild(ch)
if (is.raw(r)) {
child.res <- unserialize(r)
if (inherits(child.res, "try-error")){
if(stop.on.error) #!!
stop("error in process X = ", ci, "\n", attr(child.res, "condition")$message) #!!
has.errors <- has.errors + 1L
}
if (!is.null(child.res))
res[[ci]] <- child.res
delivered.result <- delivered.result +
1L
}
else {
fin[ci] <- TRUE
jobsp[ji] <- jobid[ji] <- NA
if (any(ava)) {
nexti <- which.max(ava[, ji])
if (!is.na(nexti)) {
jobid[ji] <- nexti
jobs[[ji]] <- mcparallel(FUN(X[[nexti]],
...), mc.set.seed = mc.set.seed,
silent = mc.silent, mc.affinity = affinity.list[[nexti]])
jobsp[ji] <- parallel:::processID(jobs[[ji]])
ava[nexti, ] <- FALSE
}
}
}
}
}
nores <- length(X) - delivered.result
if (nores > 0)
warning(sprintf(ngettext(nores, "%d parallel function call did not deliver a result",
"%d parallel function calls did not deliver results"),
nores), domain = NA)
}
if (has.errors)
warning(gettextf("%d function calls resulted in an error",
has.errors), domain = NA)
return(res)
}
if (!is.null(affinity.list))
warning("'mc.preschedule' must be false if 'affinity.list' is used")
sindex <- lapply(seq_len(cores), function(i) seq(i, length(X),
by = cores))
schedule <- lapply(seq_len(cores), function(i) X[seq(i, length(X),
by = cores)])
ch <- list()
res <- vector("list", length(X))
names(res) <- names(X)
cp <- rep(0L, cores)
fin <- rep(FALSE, cores)
dr <- rep(FALSE, cores)
inner.do <- function(core) {
S <- schedule[[core]]
f <- parallel:::mcfork()
if (isTRUE(mc.set.seed))
parallel:::mc.advance.stream()
if (inherits(f, "masterProcess")) {
on.exit(mcexit(1L, structure("fatal error in wrapper code",
class = "try-error")))
if (isTRUE(mc.set.seed))
parallel:::mc.set.stream()
if (isTRUE(mc.silent))
closeStdout(TRUE)
parallel:::sendMaster(try(lapply(X = S, FUN = FUN, ...), silent = TRUE))
parallel:::mcexit(0L)
}
jobs[[core]] <<- ch[[core]] <<- f
cp[core] <<- parallel:::processID(f)
NULL
}
job.res <- lapply(seq_len(cores), inner.do)
ac <- cp[cp > 0]
has.errors <- integer(0)
while (!all(fin)) {
s <- parallel:::selectChildren(ac[!fin], -1)
if (is.null(s))
break
if (is.integer(s))
for (ch in s) {
a <- parallel:::readChild(ch)
if (is.integer(a)) {
core <- which(cp == a)
fin[core] <- TRUE
}
else if (is.raw(a)) {
core <- which(cp == attr(a, "pid"))
job.res[[core]] <- ijr <- unserialize(a)
if (inherits(ijr, "try-error")){
has.errors <- c(has.errors, core)
if(stop.on.error) #!!
stop("error in one of X = ", paste(schedule[[core]], collapse=", "), "\n", attr(ijr, "condition")$message) #!!
}
dr[core] <- TRUE
}
else if (is.null(a)) {
core <- which(cp == ch)
fin[core] <- TRUE
}
}
}
for (i in seq_len(cores)) {
this <- job.res[[i]]
if (inherits(this, "try-error")) {
for (j in sindex[[i]]) res[[j]] <- this
}
else if (!is.null(this))
res[sindex[[i]]] <- this
}
nores <- cores - sum(dr)
if (nores > 0)
warning(sprintf(ngettext(nores, "scheduled core %s did not deliver a result, all values of the job will be affected",
"scheduled cores %s did not deliver results, all values of the jobs will be affected"),
paste(which(dr == FALSE), collapse = ", ")), domain = NA)
if (length(has.errors)) {
if (length(has.errors) == cores)
warning("all scheduled cores encountered errors in user code")
else warning(sprintf(ngettext(has.errors, "scheduled core %s encountered error in user code, all values of the job will be affected",
"scheduled cores %s encountered errors in user code, all values of the jobs will be affected"),
paste(has.errors, collapse = ", ")), domain = NA)
}
res
}
测试:
f <- function(x, errorAt=1, sleep=2){
if(x==errorAt) stop("-->> test error <<--")
Sys.sleep(sleep)
x
}
options(mc.cores=2)
Mclapply(X=1:4, FUN=f, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, stop.on.error = TRUE) :
## error in one of X = 1, 3
## -->> test error <<--
Mclapply(X=1:4, FUN=f, errorAt=3, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, errorAt = 3, stop.on.error = TRUE) :
## error in one of X = 1, 3
## -->> test error <<--
Mclapply(X=1:4, FUN=f, errorAt=Inf, stop.on.error=TRUE)
## [[1]]
## [1] 1
##
## [[2]]
## [1] 2
##
## [[3]]
## [1] 3
##
## [[4]]
## [1] 4
Mclapply(X=1:4, FUN=f, mc.preschedule=FALSE, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, mc.preschedule = FALSE, stop.on.error = TRUE) :
## error in process X = 1
## -->> test error <<--
Mclapply(X=1:4, FUN=f, errorAt=3, mc.preschedule=FALSE, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, errorAt = 3, mc.preschedule = FALSE, :
## error in process X = 3
## -->> test error <<--
Mclapply(X=1:4, FUN=f, errorAt=Inf, mc.preschedule=FALSE, stop.on.error=TRUE)
## [[1]]
## [1] 1
##
## [[2]]
## [1] 2
##
## [[3]]
## [1] 3
##
## [[4]]
## [1] 4
此方法使用了parallel包的许多内部函数(例如parallel:::isChild()
)。它适用于 R 版本 3.6.0。
关于R 并行中止所有 mclapply 操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56353069/
我正在从 Stata 迁移到 R(plm 包),以便进行面板模型计量经济学。在 Stata 中,面板模型(例如随机效应)通常报告组内、组间和整体 R 平方。 I have found plm 随机效应
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?将问题更新为 on-topic对于堆栈溢出。 6年前关闭。 Improve this qu
我想要求用户输入整数值列表。用户可以输入单个值或一组多个值,如 1 2 3(spcae 或逗号分隔)然后使用输入的数据进行进一步计算。 我正在使用下面的代码 EXP <- as.integer(rea
当 R 使用分类变量执行回归时,它实际上是虚拟编码。也就是说,省略了一个级别作为基础或引用,并且回归公式包括所有其他级别的虚拟变量。但是,R 选择了哪一个作为引用,以及我如何影响这个选择? 具有四个级
这个问题基本上是我之前问过的问题的延伸:How to only print (adjusted) R-squared of regression model? 我想建立一个线性回归模型来预测具有 15
我在一台安装了多个软件包的 Linux 计算机上安装了 R。现在我正在另一台 Linux 计算机上设置 R。从他们的存储库安装 R 很容易,但我将不得不使用 安装许多包 install.package
我正在阅读 Hadley 的高级 R 编程,当它讨论字符的内存大小时,它说: R has a global string pool. This means that each unique strin
我们可以将 Shiny 代码写在两个单独的文件中,"ui.R"和 "server.R" , 或者我们可以将两个模块写入一个文件 "app.R"并调用函数shinyApp() 这两种方法中的任何一种在性
我正在使用 R 通过 RGP 包进行遗传编程。环境创造了解决问题的功能。我想将这些函数保存在它们自己的 .R 源文件中。我这辈子都想不通怎么办。我尝试过的一种方法是: bf_str = print(b
假设我创建了一个函数“function.r”,在编辑该函数后我必须通过 source('function.r') 重新加载到我的全局环境中。无论如何,每次我进行编辑时,我是否可以避免将其重新加载到我的
例如,test.R 是一个单行文件: $ cat test.R # print('Hello, world!') 我们可以通过Rscript test.R 或R CMD BATCH test.R 来
我知道我可以使用 Rmd 来构建包插图,但想知道是否可以更具体地使用 R Notebooks 来制作包插图。如果是这样,我需要将 R Notebooks 编写为包小插图有什么不同吗?我正在使用最新版本
我正在考虑使用 R 包的共享库进行 R 的站点安装。 多台计算机将访问该库,以便每个人共享相同的设置。 问题是我注意到有时您无法更新包,因为另一个 R 实例正在锁定库。我不能要求每个人都关闭它的 R
我知道如何从命令行启动 R 并执行表达式(例如, R -e 'print("hello")' )或从文件中获取输入(例如, R -f filename.r )。但是,在这两种情况下,R 都会运行文件中
我正在尝试使我当前的项目可重现,因此我正在创建一个主文档(最终是一个 .rmd 文件),用于调用和执行其他几个文档。这样我自己和其他调查员只需要打开和运行一个文件。 当前设置分为三层:主文件、2 个读
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?将问题更新为 on-topic对于堆栈溢出。 5年前关闭。 Improve this qu
我的 R 包中有以下描述文件 Package: blah Title: What the Package Does (one line, title case) Version: 0.0.0.9000
有没有办法更有效地编写以下语句?accel 是一个数据框。 accel[[2]]<- accel[[2]]-weighted.mean(accel[[2]]) accel[[3]]<- accel[[
例如,在尝试安装 R 包时 curl作为 usethis 的依赖项: * installing *source* package ‘curl’ ... ** package ‘curl’ succes
我想将一些软件作为一个包共享,但我的一些脚本似乎并不能很自然地作为函数运行。例如,考虑以下代码块,其中“raw.df”是一个包含离散和连续类型变量的数据框。函数“count.unique”和“squa
我是一名优秀的程序员,十分优秀!