- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如果在任何一个中遇到错误(例如 stop()
),是否可以请求 parallel::mclapply()
尽快放弃所有进一步处理它的流程?
最佳答案
这是另一种方法:想法是在用#!!
指示的三个位置修改parallel::mclapply()
。新参数 stop.on.error 可用于指定发生错误时是否应停止执行。
library(parallel)
Mclapply <- function (X, FUN, ..., mc.preschedule = TRUE,
mc.set.seed = TRUE, mc.silent = FALSE,
mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE,
affinity.list = NULL, stop.on.error=FALSE)
{
stop.on.error <- stop.on.error[1] #!!
stopifnot(is.logical(stop.on.error)) #!!
cores <- as.integer(mc.cores)
if ((is.na(cores) || cores < 1L) && is.null(affinity.list))
stop("'mc.cores' must be >= 1")
parallel:::.check_ncores(cores)
if (parallel:::isChild() && !isTRUE(mc.allow.recursive))
return(lapply(X = X, FUN = FUN, ...))
if (!is.vector(X) || is.object(X))
X <- as.list(X)
if (!is.null(affinity.list) && length(affinity.list) < length(X))
stop("affinity.list and X must have the same length")
if (mc.set.seed)
mc.reset.stream()
if (length(X) < 2) {
old.aff <- mcaffinity()
mcaffinity(affinity.list[[1]])
res <- lapply(X = X, FUN = FUN, ...)
mcaffinity(old.aff)
return(res)
}
if (length(X) < cores)
cores <- length(X)
if (cores < 2L && is.null(affinity.list))
return(lapply(X = X, FUN = FUN, ...))
jobs <- list()
parallel:::prepareCleanup()
on.exit(parallel:::cleanup(mc.cleanup))
if (!mc.preschedule) {
FUN <- match.fun(FUN)
if (length(X) <= cores && is.null(affinity.list)) {
jobs <- lapply(seq_along(X), function(i) mcparallel(FUN(X[[i]],
...), name = names(X)[i], mc.set.seed = mc.set.seed,
silent = mc.silent))
res <- mccollect(jobs)
if (length(res) == length(X))
names(res) <- names(X)
has.errors <- sum(sapply(res, inherits, "try-error"))
}
else {
sx <- seq_along(X)
res <- vector("list", length(sx))
names(res) <- names(X)
fin <- rep(FALSE, length(X))
if (!is.null(affinity.list)) {
cores <- max(unlist(x = affinity.list, recursive = TRUE))
d0 <- logical(cores)
cpu.map <- lapply(sx, function(i) {
data <- d0
data[as.vector(affinity.list[[i]])] <- TRUE
data
})
ava <- do.call(rbind, cpu.map)
}
else {
ava <- matrix(TRUE, nrow = length(X), ncol = cores)
}
jobid <- integer(cores)
for (i in 1:cores) {
jobid[i] <- match(TRUE, ava[, i])
ava[jobid[i], ] <- FALSE
}
if (anyNA(jobid)) {
unused <- which(is.na(jobid))
jobid <- jobid[-unused]
ava <- ava[, -unused, drop = FALSE]
}
jobs <- lapply(jobid, function(i) mcparallel(FUN(X[[i]],
...), mc.set.seed = mc.set.seed, silent = mc.silent,
mc.affinity = affinity.list[[i]]))
jobsp <- parallel:::processID(jobs)
has.errors <- 0L
delivered.result <- 0L
while (!all(fin)) {
s <- parallel:::selectChildren(jobs[!is.na(jobsp)], -1)
if (is.null(s))
break
if (is.integer(s))
for (ch in s) {
ji <- match(TRUE, jobsp == ch)
ci <- jobid[ji]
r <- parallel:::readChild(ch)
if (is.raw(r)) {
child.res <- unserialize(r)
if (inherits(child.res, "try-error")){
if(stop.on.error) #!!
stop("error in process X = ", ci, "\n", attr(child.res, "condition")$message) #!!
has.errors <- has.errors + 1L
}
if (!is.null(child.res))
res[[ci]] <- child.res
delivered.result <- delivered.result +
1L
}
else {
fin[ci] <- TRUE
jobsp[ji] <- jobid[ji] <- NA
if (any(ava)) {
nexti <- which.max(ava[, ji])
if (!is.na(nexti)) {
jobid[ji] <- nexti
jobs[[ji]] <- mcparallel(FUN(X[[nexti]],
...), mc.set.seed = mc.set.seed,
silent = mc.silent, mc.affinity = affinity.list[[nexti]])
jobsp[ji] <- parallel:::processID(jobs[[ji]])
ava[nexti, ] <- FALSE
}
}
}
}
}
nores <- length(X) - delivered.result
if (nores > 0)
warning(sprintf(ngettext(nores, "%d parallel function call did not deliver a result",
"%d parallel function calls did not deliver results"),
nores), domain = NA)
}
if (has.errors)
warning(gettextf("%d function calls resulted in an error",
has.errors), domain = NA)
return(res)
}
if (!is.null(affinity.list))
warning("'mc.preschedule' must be false if 'affinity.list' is used")
sindex <- lapply(seq_len(cores), function(i) seq(i, length(X),
by = cores))
schedule <- lapply(seq_len(cores), function(i) X[seq(i, length(X),
by = cores)])
ch <- list()
res <- vector("list", length(X))
names(res) <- names(X)
cp <- rep(0L, cores)
fin <- rep(FALSE, cores)
dr <- rep(FALSE, cores)
inner.do <- function(core) {
S <- schedule[[core]]
f <- parallel:::mcfork()
if (isTRUE(mc.set.seed))
parallel:::mc.advance.stream()
if (inherits(f, "masterProcess")) {
on.exit(mcexit(1L, structure("fatal error in wrapper code",
class = "try-error")))
if (isTRUE(mc.set.seed))
parallel:::mc.set.stream()
if (isTRUE(mc.silent))
closeStdout(TRUE)
parallel:::sendMaster(try(lapply(X = S, FUN = FUN, ...), silent = TRUE))
parallel:::mcexit(0L)
}
jobs[[core]] <<- ch[[core]] <<- f
cp[core] <<- parallel:::processID(f)
NULL
}
job.res <- lapply(seq_len(cores), inner.do)
ac <- cp[cp > 0]
has.errors <- integer(0)
while (!all(fin)) {
s <- parallel:::selectChildren(ac[!fin], -1)
if (is.null(s))
break
if (is.integer(s))
for (ch in s) {
a <- parallel:::readChild(ch)
if (is.integer(a)) {
core <- which(cp == a)
fin[core] <- TRUE
}
else if (is.raw(a)) {
core <- which(cp == attr(a, "pid"))
job.res[[core]] <- ijr <- unserialize(a)
if (inherits(ijr, "try-error")){
has.errors <- c(has.errors, core)
if(stop.on.error) #!!
stop("error in one of X = ", paste(schedule[[core]], collapse=", "), "\n", attr(ijr, "condition")$message) #!!
}
dr[core] <- TRUE
}
else if (is.null(a)) {
core <- which(cp == ch)
fin[core] <- TRUE
}
}
}
for (i in seq_len(cores)) {
this <- job.res[[i]]
if (inherits(this, "try-error")) {
for (j in sindex[[i]]) res[[j]] <- this
}
else if (!is.null(this))
res[sindex[[i]]] <- this
}
nores <- cores - sum(dr)
if (nores > 0)
warning(sprintf(ngettext(nores, "scheduled core %s did not deliver a result, all values of the job will be affected",
"scheduled cores %s did not deliver results, all values of the jobs will be affected"),
paste(which(dr == FALSE), collapse = ", ")), domain = NA)
if (length(has.errors)) {
if (length(has.errors) == cores)
warning("all scheduled cores encountered errors in user code")
else warning(sprintf(ngettext(has.errors, "scheduled core %s encountered error in user code, all values of the job will be affected",
"scheduled cores %s encountered errors in user code, all values of the jobs will be affected"),
paste(has.errors, collapse = ", ")), domain = NA)
}
res
}
测试:
f <- function(x, errorAt=1, sleep=2){
if(x==errorAt) stop("-->> test error <<--")
Sys.sleep(sleep)
x
}
options(mc.cores=2)
Mclapply(X=1:4, FUN=f, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, stop.on.error = TRUE) :
## error in one of X = 1, 3
## -->> test error <<--
Mclapply(X=1:4, FUN=f, errorAt=3, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, errorAt = 3, stop.on.error = TRUE) :
## error in one of X = 1, 3
## -->> test error <<--
Mclapply(X=1:4, FUN=f, errorAt=Inf, stop.on.error=TRUE)
## [[1]]
## [1] 1
##
## [[2]]
## [1] 2
##
## [[3]]
## [1] 3
##
## [[4]]
## [1] 4
Mclapply(X=1:4, FUN=f, mc.preschedule=FALSE, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, mc.preschedule = FALSE, stop.on.error = TRUE) :
## error in process X = 1
## -->> test error <<--
Mclapply(X=1:4, FUN=f, errorAt=3, mc.preschedule=FALSE, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, errorAt = 3, mc.preschedule = FALSE, :
## error in process X = 3
## -->> test error <<--
Mclapply(X=1:4, FUN=f, errorAt=Inf, mc.preschedule=FALSE, stop.on.error=TRUE)
## [[1]]
## [1] 1
##
## [[2]]
## [1] 2
##
## [[3]]
## [1] 3
##
## [[4]]
## [1] 4
此方法使用了parallel包的许多内部函数(例如parallel:::isChild()
)。它适用于 R 版本 3.6.0。
关于R 并行中止所有 mclapply 操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56353069/
我正在努力做到这一点 在我的操作中从数据库获取对象列表(确定) 在 JSP 上打印(确定) 此列表作为 JSP 中的可编辑表出现。我想修改然后将其提交回同一操作以将其保存在我的数据库中(失败。当我使用
我有以下形式的 Linq to Entities 查询: var x = from a in SomeData where ... some conditions ... select
我有以下查询。 var query = Repository.Query() .Where(p => !p.IsDeleted && p.Article.ArticleSections.Cou
我正在编写一个应用程序包,其中包含一个主类,其中主方法与GUI类分开,GUI类包含一个带有jtabbedpane的jframe,它有两个选项卡,第一个选项卡包含一个jtable,称为jtable1,第
以下代码产生错误 The nested query is not supported. Operation1='Case' Operation2='Collect' 问题是我做错了什么?我该如何解决?
我已经为 HA redis 集群(2 个副本、1 个主节点、3 个哨兵)设置了本地 docker 环境。只有哨兵暴露端口(10021、10022、10023)。 我使用的是 stackexchange
我正在 Desk.com 中构建一个“集成 URL”,它使用 Shopify Liquid 模板过滤器语法。对于开始日期为 7 天前而结束日期为现在的查询,此 URL 需要包含“开始日期”和“结束日期
你一定想过。然而情况却不理想,python中只能使用类似于 i++/i--等操作。 python中的自增操作 下面代码几乎是所有程序员在python中进行自增(减)操作的常用
我需要在每个使用 github 操作的手动构建中显示分支。例如:https://gyazo.com/2131bf83b0df1e2157480e5be842d4fb 我应该显示分支而不是一个。 最佳答
我有一个关于 Perl qr 运算符的问题: #!/usr/bin/perl -w &mysplit("a:b:c", /:/); sub mysplit { my($str, $patt
我已经使用 ArgoUML 创建了一个 ERD(实体关系图),我希望在一个类中创建两个操作,它们都具有 void 返回类型。但是,我只能创建一个返回 void 类型的操作。 例如: 我能够将 book
Github 操作仍处于测试阶段并且很新,但我希望有人可以提供帮助。我认为可以在主分支和拉取请求上运行 github 操作,如下所示: on: pull_request push: b
我正在尝试创建一个 Twilio 工作流来调用电话并记录用户所说的内容。为此,我正在使用 Record,但我不确定要在 action 参数中放置什么。 尽管我知道 Twilio 会发送有关调用该 UR
我不确定这是否可行,但值得一试。我正在使用模板缓冲区来减少使用此算法的延迟渲染器中光体积的过度绘制(当相机位于体积之外时): 使用廉价的着色器,将深度测试设置为 LEQUAL 绘制背面,将它们标记在模
有没有聪明的方法来复制 和 重命名 文件通过 GitHub 操作? 我想将一些自述文件复制到 /docs文件夹(:= 同一个 repo,不是远程的!),它们将根据它们的 frontmatter 重命名
我有一个 .csv 文件,其中第一列包含用户名。它们采用 FirstName LastName 的形式。我想获取 FirstName 并将 LastName 的第一个字符添加到它上面,然后删除空格。然
Sitecore 根据 Sitecore 树中定义的项目名称生成 URL, http://samplewebsite/Pages/Sample Page 但我们的客户有兴趣降低所有 URL(页面/示例
我正在尝试进行一些计算,但是一旦我输入金额,它就会完成。我只是希望通过单击按钮而不是自动发生这种情况。 到目前为止我做了什么: Angular JS - programming-fr
我的公司创建了一种在环境之间移动文件的复杂方法,现在我们希望将某些构建的 JS 文件(已转换和缩小)从一个 github 存储库移动到另一个。使用 github 操作可以实现这一点吗? 最佳答案 最简
在我的代码中,我创建了一个 JSONArray 对象。并向 JSONArray 对象添加了两个 JSONObject。我使用的是 json-simple-1.1.jar。我的代码是 package j
我是一名优秀的程序员,十分优秀!