gpt4 book ai didi

hadoop - Oozie 自定义异步操作

转载 作者:可可西里 更新时间:2023-11-01 16:13:50 27 4
gpt4 key购买 nike

我在 Oozie 中实现自定义异步操作时遇到问题。我的类扩展自 ActionExecutor,并覆盖了方法 initActionType、start、end、check、kill 和 isCompleted。

在启动方法中,我想启动一个 YARN 作业,它是通过我的 BiohadoopClient 类实现的。为了使调用异步,我将 client.run() 方法包装在 Callable 中:

public void start(final Context context, final WorkflowAction action) {
...
Callable<String> biohadoop = new Callable<String>() {
BiohadoopClient client = new BiohadoopClient();
client.run();
}

// submit callable to executor
executor.submit(biohadoop);

// set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html
context.setStartData(externalId, callBackUrl, callBackUrl);
...
}

这很好用,例如,当我以 fork/join 方式使用我的自定义操作时,操作的执行是并行运行的。

现在的问题是,Oozie 对于此操作仍处于 RUNNING 状态。似乎不可能将其更改为完成状态。 Oozie 永远不会调用 check() 方法, end() 方法也是如此。在 Callable 中手动设置 context.setExternalStatus()、context.setExecutionData() 和 context.setEndData() 没有帮助(在 client.run() 完成后)。我也尝试手动排队 ActionEndXCommand,但没有成功。

当我在 start() 方法中等待 Callable 完成时,状态得到正确更新,但 fork/join 中的执行不再并行(这看起来合乎逻辑,因为执行等待 Callable 完成).

How external clients notify Oozie workflow with HTTP callback没有帮助,因为使用回调似乎没有任何改变(好吧,我可以看到它发生在日志文件中,但除此之外,什么都没有......)。此外,提到的答案是 SSH 操作异步运行,但我还没有找到这是如何完成的。 Callable 内部有一些包装,但最后直接调用 Callable 的 call() 方法(不提交给 Executor)。

到目前为止,我还没有找到任何如何编写异步自定义操作的示例。谁能帮帮我?

谢谢

编辑

这里是initActionType()、start()、check()、end()的实现,可调用的实现可以在start() Action 中找到。

可调用对象在 start() 操作中提交给执行程序,之后调用其 shutdown() 方法 - 因此执行程序在可调用对象完成后关闭。下一步,调用 context.setStartData(externalId, callBackUrl, callBackUrl)。

private final AtomicBoolean finished = new AtomicBoolean(false);

public void initActionType() {
super.initActionType();
log.info("initActionType() invoked");
}

public void start(final Context context, final WorkflowAction action)
throws ActionExecutorException {
log.info("start() invoked");

// Get parameters from Node configuration
final String parameter = getParameters(action.getConf());

Callable<String> biohadoop = new Callable<String>() {
@Override
public String call() throws Exception {
log.info("Starting Biohadoop");

// No difference if check() is called manually
// or if the next line is commented out
check(context, action);

BiohadoopClient client = new BiohadoopClient();
client.run(parameter);
log.info("Biohadoop finished");

finished.set(true);
// No difference if check() is called manually
// or if the next line is commented out
check(context, action);

return null;
}
};

ExecutorService executor = Executors.newCachedThreadPool();
biohadoopResult = executor.submit(biohadoop);
executor.shutdown();

String externalId = action.getId();
String callBackUrl = context.getCallbackUrl("finished");
context.setStartData(externalId, callBackUrl, callBackUrl);
}

public void check(final Context context, final WorkflowAction action)
throws ActionExecutorException {
// finished is an AtomicBoolean, that is set to true,
// after Biohadoop has finished (see implementation of Callable)
if (finished.get()) {
log.info("check(Context, WorkflowAction) invoked -
Callable has finished");
context.setExternalStatus(Status.OK.toString());
context.setExecutionData(Status.OK.toString(), null);
} else {
log.info("check(Context, WorkflowAction) invoked");
context.setExternalStatus(Status.RUNNING.toString());
}
}

public void end(Context context, WorkflowAction action)
throws ActionExecutorException {
log.info("end(Context, WorkflowAction) invoked");
context.setEndData(Status.OK, Status.OK.toString());
}

最佳答案

一件事 - 我可以看到您在提交作业后立即关闭执行程序 - executor.shutdown();。这可能是导致问题的原因。您能否尝试将此语句移至 end() 方法?

关于hadoop - Oozie 自定义异步操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26181512/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com