- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我有一个编译时 directed acyclic graph异步任务。 DAG 显示了任务之间的依赖关系:通过分析它,可以了解哪些任务可以并行运行(在单独的线程中)以及哪些任务需要等待其他任务完成才能开始(依赖关系)。
我想从 DAG 生成一个回调链,使用 boost::future
和 .then(...)
, when_all(...)
延续辅助函数。这一生成的结果将是一个函数,当调用该函数时,将启动回调链并执行 DAG 所描述的任务,并行运行尽可能多的任务。
但是,我很难找到适用于所有情况的通用算法。
我画了几张图,让问题更容易理解。这是一个图例,将向您展示图中符号的含义:
让我们从一个简单的线性 DAG 开始:
这个依赖图由三个任务组成( A
、 B
和 C
)。 C
取决于 B
. B
取决于 A
.这里没有并行的可能性 - 生成算法将构建类似于以下内容的内容:
boost::future<void> A, B, C, end;
A.then([]
{
B.then([]
{
C.get();
end.get();
});
});
end
取决于 B
和 D
. (加入)D
取决于 C
. B
和 C
依赖 A
. (叉)boost::future<void> A, B, C, D, end;
A.then([]
{
boost::when_all(B, C.then([]
{
D.get();
}))
.then([]
{
end.get();
});
});
when_all
的事实,还存在额外的困难。需要将其参数移入其中。
E
:
E
可以与
[B, C, D]
中的任何一个并行运行.
boost::future<void> A, B, C, D, E, F, end;
A.then([]
{
boost::when_all(boost::when_all(B, C).then([]
{
D.get();
}),
E)
.then([]
{
F.then([]
{
end.get();
});
});
});
.then(...)
建立链延续。这不适用于连接,因为目标连接任务会重复多次。 when_all(...)
生成链延续。这对于 fork 失败了,因为创建 fork 的节点会重复多次。 .then(...)
和
when_all(...)
延续。
future
基于任务依赖关系的 DAG 的回调链,其中每个任务在回调链中只出现一次? ([dependencies...] -> [dependents...])
从 DAG 映射数据结构,并从该映射生成回调链。
len(dependencies...) > 1
,然后
value
是一个连接节点。
len(dependents...) > 1
,然后
key
是一个 fork 节点。
when_all(keys...).then(values...)
延续。
depenendencies | dependents
----------------|-------------
[F] : [end]
[D, E] : [F]
[B, C] : [D]
[A] : [E, C, B]
[begin] : [A]
// First pass:
// Convert everything to `when_all(...).then(...)` notation
when_all(F).then(end)
when_all(D, E).then(F)
when_all(B, C).then(D)
when_all(A).then(E, C, B)
when_all(begin).then(A)
// Second pass:
// Solve linear (trivial) transformations
when_all(D, E).then(
when_all(F).then(end)
)
when_all(B, C).then(D)
when_all(
when_all(begin).then(A)
).then(E, C, B)
// Third pass:
// Solve fork/join transformations
when_all(
when_all(begin).then(A)
).then(
when_all(
E,
when_all(B, C).then(D)
).then(
when_all(F).then(end)
)
)
[B, C]
必须在
[E, C, B]
内找到列表,以及如何,在
[D, E]
中依赖列表,
D
必须解释为
when_all(B, C).then(D)
的结果并与
E
链接在一起在
when_all(E, when_all(B, C).then(D))
.
[dependencies...] -> [dependents...]
组成的 map 键值对,如何将这些对转换为 when_all(...)
的算法/.then(...)
延续链能实现吗?
最佳答案
如果可能出现冗余依赖项,请先删除它们(参见例如 https://mathematica.stackexchange.com/questions/33638/remove-redundant-dependencies-from-a-directed-acyclic-graph )。
然后执行以下图形转换(在合并节点中构建子表达式),直到您得到单个节点(以类似于计算电阻网络的方式):
*
:额外的传入或传出依赖项,取决于位置(...)
: 单个节点中的表达式
Java 代码,包括更复杂示例的设置:
public class DirectedGraph {
/** Set of all nodes in the graph */
static Set<Node> allNodes = new LinkedHashSet<>();
static class Node {
/** Set of all preceeding nodes */
Set<Node> prev = new LinkedHashSet<>();
/** Set of all following nodes */
Set<Node> next = new LinkedHashSet<>();
String value;
Node(String value) {
this.value = value;
allNodes.add(this);
}
void addPrev(Node other) {
prev.add(other);
other.next.add(this);
}
/** Returns one of the next nodes */
Node anyNext() {
return next.iterator().next();
}
/** Merges this node with other, then removes other */
void merge(Node other) {
prev.addAll(other.prev);
next.addAll(other.next);
for (Node on: other.next) {
on.prev.remove(other);
on.prev.add(this);
}
for (Node op: other.prev) {
op.next.remove(other);
op.next.add(this);
}
prev.remove(this);
next.remove(this);
allNodes.remove(other);
}
public String toString() {
return value;
}
}
/**
* Merges sequential or parallel nodes following the given node.
* Returns true if any node was merged.
*/
public static boolean processNode(Node node) {
// Check if we are the start of a sequence. Merge if so.
if (node.next.size() == 1 && node.anyNext().prev.size() == 1) {
Node then = node.anyNext();
node.value += " then " + then.value;
node.merge(then);
return true;
}
// See if any of the next nodes has a parallel node with
// the same one level indirect target.
for (Node next : node.next) {
// Nodes must have only one in and out connection to be merged.
if (next.prev.size() == 1 && next.next.size() == 1) {
// Collect all parallel nodes with only one in and out connection
// and the same target; the same source is implied by iterating over
// node.next again.
Node target = next.anyNext().next();
Set<Node> parallel = new LinkedHashSet<Node>();
for (Node other: node.next) {
if (other != next && other.prev.size() == 1
&& other.next.size() == 1 && other.anyNext() == target) {
parallel.add(other);
}
}
// If we have found any "parallel" nodes, merge them
if (parallel.size() > 0) {
StringBuilder sb = new StringBuilder("allNodes(");
sb.append(next.value);
for (Node other: parallel) {
sb.append(", ").append(other.value);
next.merge(other);
}
sb.append(")");
next.value = sb.toString();
return true;
}
}
}
return false;
}
public static void main(String[] args) {
Node a = new Node("A");
Node b = new Node("B");
Node c = new Node("C");
Node d = new Node("D");
Node e = new Node("E");
Node f = new Node("F");
f.addPrev(d);
f.addPrev(e);
e.addPrev(a);
d.addPrev(b);
d.addPrev(c);
b.addPrev(a);
c.addPrev(a);
boolean anyChange;
do {
anyChange = false;
for (Node node: allNodes) {
if (processNode(node)) {
anyChange = true;
// We need to leave the inner loop here because changes
// invalidate the for iteration.
break;
}
}
// We are done if we can't find any node to merge.
} while (anyChange);
System.out.println(allNodes.toString());
}
}
A then all(E, all(B, C) then D) then F
关于c++ - 从编译时依赖图 (DAG) 构建异步 `future` 回调链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35778864/
我按如下方式创建我的 Airflow DAG: dag = DAG(...) 但在多个教程和类(class)中,我看到他们像这样使用 with ... as 子句: with DAG(...) as
我对 DAG、Airflow 和 Python 语法有点陌生(我从 Java 学习编码),但我有一个 DAG,其中包含大约 10 个相互独立的任务,而我有另一个 DAG,只有在所有 10 个任务都运行
下面是 Airflow DAG 代码。当 Airflow 在本地托管和在云 Composer 上托管时,它都能完美运行。但是,DAG 本身在 Composer UI 中不可单击。我发现了一个类似的问题
我有兴趣在使用 https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#passing-parameters-when-t
我有一个 DAG(有向无环图),其顶点具有黑色或白色两种颜色中的任何一种。我需要将尽可能多的黑色顶点与图形应保持非循环的约束合并在一起。因此最终的 DAG 应该有最小值。的黑色顶点。这个问题的最佳算法
我正在尝试根据用户输入在 Airflow 中生成动态工作流。我知道可以根据文件和数据库中的数据选择它,但在所有这些情况下,工作流不会直接依赖于用户输入,如果多个用户使用相同的 dag,那么在这种情况下
我正在尝试拥有一个主 dag,它将根据我的需要创建更多 dags。我在 airflow.cfg 的 dags_folder 中有以下 python 文件。此代码在数据库中创建主 dag。该主 dag
我根据教程在 dags 文件夹中放置了一个 dag 文件,稍作修改,但它没有显示在 GUI 中或运行 airflow dags list 时。 最佳答案 回答我自己的问题:通过直接运行来检查 pyth
我根据教程在 dags 文件夹中放置了一个 dag 文件,稍作修改,但它没有显示在 GUI 中或运行 airflow dags list 时。 最佳答案 回答我自己的问题:通过直接运行来检查 pyth
有调用主 dag 中不同 dags 的任务列表。我正在使用 TriggerDagrunoperator 来完成此操作。但面临一些问题。 TriggerDagrunoperator 不会等待外部 dag
我设置了 Airflow 并运行一些 DAG,计划每天一次“0 0 * * *”。 我想检查下一次安排运行特定 dag 的时间,但我看不到我可以在管理员中的什么地方执行此操作。 最佳答案 如果你想使用
我通过包管理器在我的计算机上安装了 llc 程序(当然我已经安装了 LLVM,6.0.0 版本)。另外,我从源代码构建了它。我想要的是查看由 llvm 生成的 DAG。但是,不幸的是,我在 llc-d
我在 spark 中有一个操作,应该对数据框中的几列执行。通常,有 2 种可能性来指定此类操作 硬编码 handleBias("bar", df) .join(handleBias("baz",
Airflow 似乎跳过了我添加到/usr/local/airflow/dags 的 dags。 当我跑 airflow list_dags 输出显示 [2017-08-06 17:03:47,220
非常喜欢 Airflow 工作流调度程序,但在运行一个简单的 DAG 时遇到错误:“{jobs.py:538} 错误 - DAG 运行因 DAG 陷入僵局:TEST_SCHEDULER_DAG”。 这
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我想查找特定执行日期的特定 dag 的所有 dag 运行。 当我阅读文档时,有这个功能:dag_runs = DagRun.find(dag_id=self.dag_name, execution_d
我有一个 python DAG Parent Job和 DAG Child Job . Child Job中的任务应该在成功完成 Parent Job 时触发每天运行的任务。如何添加外部作业触发器?
我有一个由 TriggerDagRunOperator 触发的 DAG。它似乎运行良好,除非我尝试从 Airflow GUI 中“标记失败”或“标记成功”。当我这样做时,它总是尝试将更改应用到所有以前
Airflow 正在将所有 dags 加载到数据库中,但不会触发它们。 日志文件显示以下错误 [2020-01-05 02:55:06,226] {{dagbag.py:436}} [2020-0
我是一名优秀的程序员,十分优秀!