gpt4 book ai didi

java - Apache 弗林克 : Correctly make async webservice calls within MapReduce()

转载 作者:行者123 更新时间:2023-12-02 02:34:01 27 4
gpt4 key购买 nike

我有一个具有以下 mapPartition 函数的程序:

public void mapPartition(Iterable<Tuple> values, Collector<Tuple2<Integer, String>> out)

我从输入的中收集 100 个批处理,并将它们发送到网络服务进行转换。我将结果添加回 out 集合。

为了加快该过程,我通过使用 Executors 进行 Web 服务调用异步。这产生了问题,要么我得到 taskManager released exception ,或AskTimeoutException。我增加了内存和超时,但没有帮助。有相当多的输入数据。我相信这会导致大量作业与 ExecutorService 一起排队,从而占用大量内存。

最好的方法是什么?

我还查看了taskManager 与taskSlot 配置,但对两者之间的差异有点困惑(我猜它们类似于进程与线程?)。不确定在什么时候增加任务管理器和任务槽?例如如果我有三台机器,每台机器有 4 个 CPU,那么我的 taskManager=3 和我的 taskSlot=4 应该吗?

我还考虑单独增加 mapPartition 的并行度,例如 10 以获得更多线程访问 Web 服务。有意见或建议吗?

最佳答案

您应该查看Flink Asyncio这将使您能够在流应用程序中以异步方式查询您的 Web 服务。

需要注意的一点是,Asyncio 函数不称为多线程,而是按顺序每个分区的每个记录调用一次,因此您的 Web 应用程序需要确定性返回,并且可能快速返回,以免作业被阻塞。

此外,潜在的更多分区数量会对您的情况有所帮助,但您的网络服务需要足够快地满足这些请求

来自 Flinks 网站的示例代码块:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}

@Override
public void close() throws Exception {
client.close();
}

@Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {

// issue the asynchronous request, receive a future for result
Future<String> resultFuture = client.query(str);

// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
resultFuture.thenAccept( (String result) -> {

asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));

});
}
}

// create the original stream (In your case the stream you are mappartitioning)
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

编辑:

由于用户想要创建大小为 100 的批处理,而 asyncio 目前特定于 Streaming API,因此最好的方法是创建大小为 100 的 countwindows。

此外,要清除可能没有 100 个事件的最后一个窗口,自定义 Triggers可以与计数触发器和基于时间的触发器组合使用,以便在元素计数后或每隔几分钟后触发触发器。

Flink Mailing List 上可以找到很好的后续信息。用户“Kostya”创建了一个可用的自定义触发器 here

关于java - Apache 弗林克 : Correctly make async webservice calls within MapReduce(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46659043/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com