- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个具有以下 mapPartition
函数的程序:
public void mapPartition(Iterable<Tuple> values, Collector<Tuple2<Integer, String>> out)
我从输入的值
中收集 100 个批处理,并将它们发送到网络服务进行转换。我将结果添加回 out
集合。
为了加快该过程,我通过使用 Executors
进行 Web 服务调用异步
。这产生了问题,要么我得到 taskManager released exception ,或AskTimeoutException
。我增加了内存和超时,但没有帮助。有相当多的输入数据。我相信这会导致大量作业与 ExecutorService 一起排队,从而占用大量内存。
最好的方法是什么?
我还查看了taskManager 与taskSlot 配置,但对两者之间的差异有点困惑(我猜它们类似于进程与线程?)。不确定在什么时候增加任务管理器和任务槽?例如如果我有三台机器,每台机器有 4 个 CPU,那么我的 taskManager=3
和我的 taskSlot=4
应该吗?
我还考虑单独增加 mapPartition
的并行度,例如 10
以获得更多线程访问 Web 服务。有意见或建议吗?
最佳答案
您应该查看Flink Asyncio这将使您能够在流应用程序中以异步方式查询您的 Web 服务。
需要注意的一点是,Asyncio 函数不称为多线程,而是按顺序每个分区的每个记录调用一次,因此您的 Web 应用程序需要确定性返回,并且可能快速返回,以免作业被阻塞。
此外,潜在的更多分区数量会对您的情况有所帮助,但您的网络服务需要足够快地满足这些请求
来自 Flinks 网站的示例代码块:
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {
// issue the asynchronous request, receive a future for result
Future<String> resultFuture = client.query(str);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
resultFuture.thenAccept( (String result) -> {
asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));
});
}
}
// create the original stream (In your case the stream you are mappartitioning)
DataStream<String> stream = ...;
// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
编辑:
由于用户想要创建大小为 100 的批处理,而 asyncio 目前特定于 Streaming API,因此最好的方法是创建大小为 100 的 countwindows。
此外,要清除可能没有 100 个事件的最后一个窗口,自定义 Triggers可以与计数触发器和基于时间的触发器组合使用,以便在元素计数后或每隔几分钟后触发触发器。
在 Flink Mailing List 上可以找到很好的后续信息。用户“Kostya”创建了一个可用的自定义触发器 here
关于java - Apache 弗林克 : Correctly make async webservice calls within MapReduce(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46659043/
我有带皮肤的 DNN。我的 head 标签有 runat="server"所以我尝试在 head 标签内添加一个标签 "> 在后面的代码中,我在属性中设置了 var GoogleAPIkey。问题是它
我在 Node.JS 中有一个导出模块 exports.doSomethingImportant= function(req, res) { var id = req.params.id; Demo.
我是 F# 的新手,我一直在阅读 F# for Fun and Profit。在为什么使用 F#? 系列中,有一个 post描述异步代码。我遇到了 Async.StartChild函数,我不明白为什么
File 中有一堆相当方便的方法类,如 ReadAll***/WriteAll***/AppendAll***。 我遇到过很多情况,当我需要它们的异步对应物时,但它们根本不存在。 为什么?有什么陷阱吗
我最近开始做一个 Node 项目,并且一直在使用 async 库。我有点困惑哪个选项会更快。在某些数据上使用 async.map 并获取其结果,或使用 async.each 迭代一组用户并将他们的相应
您好,我正在试用 Springs 异步执行器,发现您可以使用 @Async。我想知道是否有可能在 @Async 中使用 @Async,要求是需要将任务委托(delegate)给 @Async 方法在第
我需要支持取消一个函数,该函数返回一个可以在启动后取消的对象。在我的例子中,requester 类位于我无法修改的第 3 方库中。 actor MyActor { ... func d
假设 asyncSendMsg不返回任何内容,我想在另一个异步块中启动它,但不等待它完成,这之间有什么区别: async { //(...async stuff...) for msg
我想用 Mocha 测试异步代码. 我跟着这个教程testing-promises-with-mocha .最后,它说最好的方法是 async/await。 以下是我的代码,我打算将 setTimeo
正如我有限(甚至错误)的理解,Async.StartImmediate 和 Async.RunSynchronously 在当前线程上启动异步计算。那么这两个功能究竟有什么区别呢?谁能帮忙解释一下?
我有一行使用await fetch() 的代码。我正在使用一些调用 eval("await fetch ...etc...") 的脚本注入(inject),但问题是 await 在执行时不会执行从ev
我正在尝试使用 nodeJS 构建一个网络抓取工具,它在网站的 HTML 中搜索图像,缓存图像源 URL,然后搜索最大尺寸的图像。 我遇到的问题是 deliverLargestImage() 在循环遍
我想结合使用 async.each 和 async.series,但得到了意想不到的结果。 async.each([1, 2], function(item, nloop) { async.s
我的代码有问题吗?我使用 async.eachSeries 但我的结果总是抛出 undefined。 这里是我的代码: async.eachSeries([1,2,3], function(data,
我想在 trait 中编写异步函数,但是因为 async fn in traits 还不被支持,我试图找到等效的方法接口(interface)。这是我在 Rust nightly (2019-01-0
async setMyPhotos() { const newPhotos = await Promise.all(newPhotoPromises); someOtherPromise();
async.js 中 async.each 与 async.every 的区别?似乎两者都相同,只是 async.every 返回结果。纠正我,我错了。 最佳答案 每个异步 .each(coll, i
我正在尝试对一组项目运行 async.each。 对于每个项目,我想运行一个 async.waterfall。请参阅下面的代码。 var ids = [1, 2]; async.each(ids,
我的目标是测试 API 调用,将延迟考虑在内。我的灵感来自 this post . 我设计了一个沙箱,其中模拟 API 需要 1000 毫秒来响应和更改全局变量 result 的值。测试检查 500
async.each 是否作为异步数组迭代工作? async.eachSeries 是否作为同步数组迭代工作?(它实际上等待响应) 我问这些是因为两者都有回调,但 async.each 的工作方式类似
我是一名优秀的程序员,十分优秀!