- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我正在尝试学习 Akka Actor 和 future ,但在阅读了 http://akka.io 上的文档之后做http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html我仍然有一些理解问题。我猜计算 Pi 的值很多人也可以联系起来,但不是我=)。我搜索了一下但还没有找到任何适合我的例子。因此,我想我会拿一些我的真实代码并把它扔在这里,并用它来交换一个如何用 Akka 做到这一点的例子。
好的,我们开始吧:
我有一个 java play2 应用程序,我需要从我的数据库中获取一些数据并将其索引到我的 elasticsearch 实例中。
我调用数据库并获取 field 的 ID。
然后我拆分列表并创建几个可调用的索引任务。
之后,我调用所有任务,其中每个任务为分配的 ID 收集场所来自数据库。
对于每个 field ,将其索引到 elasticsearch 实例并使其可搜索。
完成。
Application.java:
public class Application extends Controller {
private static final int VENUE_BATCH = 1000;
private static int size;
public static Result index() {
List<Long> venueIds = DbService.getAllVenueIds();
size = venueIds.size();
Logger.info("Will index " + size + " items in total.");
ExecutorService service = Executors.newFixedThreadPool(getRuntime().availableProcessors());
int startIx = 0;
Collection<Callable<Object>> indexTasks = new ArrayList<Callable<Object>>();
do {
int endIx = Math.min(startIx + VENUE_BATCH, size);
List<Long> subList = venueIds.subList(startIx, endIx);
VenueIndexTask indexTask = new VenueIndexTask(subList);
indexTasks.add(indexTask);
} while ((startIx += VENUE_BATCH) < size);
Logger.info("Invoking all tasks!");
try {
service.invokeAll(indexTasks);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ok(index.render("Done indexing."));
}
}
地点任务:
public class VenueIndexTask implements Callable<Object> {
private List<Long> idSubList;
public VenueIndexTask(List<Long> idSubList){
this.idSubList = idSubList;
Logger.debug("Creating task which will index " + idSubList.size() + " items. " +
"Range: " + rangeAsString() + ".");
}
@Override
public Object call() throws Exception {
List<Venue> venues = DbService.getVenuesForIds(idSubList);
Logger.debug("Doing some indexing: "+venues.size());
for(Venue venue : venues) {
venue.index();
}
return null;
}
private String rangeAsString() {
return "[" + idSubList.get(0) + "-" + idSubList.get(idSubList.size() - 1) + "]";
}
}
地点:
@IndexType(name = "venue")
public class Venue extends Index {
private String name;
// Find method static for request
public static Finder<Venue> find = new Finder<Venue>(Venue.class);
public Venue() {
}
public Venue(String id, String name) {
super.id = id;
this.name = name;
}
@Override
public Map toIndex() {
HashMap map = new HashMap();
map.put("id", super.id);
map.put("name", name);
return map;
}
@Override
public Indexable fromIndex(Map map) {
if (map == null) {
return this;
}
this.name = (String) map.get("name");
return this;
}
}
所以你们所有的 Akka 人都发疯了!请尽可能多地提出可以使用的酷 future 功能或我可以用来学习这些东西的任何其他知识/代码。
最佳答案
我喜欢将 Akka(或任何其他基于消息的系统)视为像传送带一样思考,就像在工厂中一样。 Actors 的一种简化思维方式可能是订购披萨。
你,饥饿的顾客( Actor /角色)向披萨店发送订单(一条消息)
客服( Actor /角色)接您的订单,给您订单号( future )
如果您不耐烦,您可能已经在电话/互联网/商店等到您拿到披萨(同步/阻塞交易),否则您会对订单号感到满意,稍后再检查(非阻塞)
客服在厨房经理( Actor )的监督下将消息发送给厨师( Actor )。这是一个流程很重的厨房,有层次感。 Akka 喜欢这样。见 Supervision
厨师创建一个新的比萨饼,并附上订单的详细信息(一条新消息),然后通过送货经理(Supervisor Actor)将其传递给送货员(Actor)。
在此过程中,您的订单详情没有改变,那将是一场噩梦。如果你想要纯奶酪,如果你有意大利辣香肠,你会不高兴!所有消息都应该是不可变的!但是,对于不同的参与者,消息可能会有所不同。送货员会期待披萨和附上的订单详细信息,厨师会期待订单。当消息需要更改时,会创建一条新消息。
每个 Actor 都擅长一个角色,如果一个人必须完成所有任务,效果如何?可能是某些 Actor 的数量超过了其他 Actor (例如,厨师 10 个线程,送货员 2 个线程,客户服务 1 个线程)。
阻止行为是一种痛苦,想象一下客户服务正在等待厨师和送货员在看到下一位客户之前?
希望我对您有所帮助,这是一个巨大的话题,也是一个很大的转变。祝你好运
关于java - Akka Actor 和 future : Understanding by example,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13805610/
我正在通过这个示例https://www.rusoto.org/futures.html学习Rust和Rusoto 而且我发现许多代码已经过时了。所以我改变了这样的代码: use rusoto_cor
这是一个理论问题。我有一个服务可以调用来完成工作,但该服务可能无法完成所有工作,因此我需要调用第二个服务来完成它。 我想知道是否有办法在没有 Await.result 的情况下做类似的事情map 函数
这个问题是关于如何阅读 Rust 文档并提高我对 Rust 的理解,从而了解如何解决这个特定的编译器错误。 我读过 tokio docs并试验了许多 examples .在编写自己的代码时,我经常遇到
我有一个使用分页的 HTTP api,我想将它包装到一个通用的 Rust 流中,以便所有端点都可以使用相同的接口(interface),这样我就可以使用 Stream 附带的特征函数特征。 我收到了这
我正在查看 AKKA 的 Java Futures API,我看到了很多处理同一类型的多个 future 的方法,但我没有看到任何处理不同类型的 future 的方法。我猜我让事情变得更加复杂了。 无
环境:Akka 2.1,scala 版本 2.10.M6,JDK 1.7,u5 现在是我的问题: 我有: future1 = Futures.future(new Callable>(){...});
我有一些代码可以将请求提交给另一个线程,该线程可能会也可能不会将该请求提交给另一个线程。这会产生 Future> 的返回类型.是否有一些非令人发指的方法可以立即将其变成 Future等待整个 futu
如果我有以下代码: Future a = new Future(() { print('a'); return 1; }); Future b = new Future.error('Error!')
我一直试图简化我在 Scala 中做 future 的方式。我有一次收到了 Future[Option[Future[Option[Boolean]]但我在下面进一步简化了它。有没有更好的方法来简化这
Scala 中从 Future[Option[Future[Int]]] 转换的最干净的方法是什么?至 Future[Option[Int]] ?甚至有可能吗? 最佳答案 有两个嵌套Future s
使用下面的示例,future2 如何在 future1 完成后使用 future1 的结果(不阻塞 future3 从被提交)? from concurrent.futures import Proc
这两个类代表了并发编程的优秀抽象,因此它们不支持相同的 API 有点令人不安。 具体根据docs : asyncio.Future is almost compatible with concurre
我正在尝试使用 wasm_bindgen 实现 API 类使用异步调用。 #![allow(non_snake_case)] use std::future::Future; use serde::{
这个问题在这里已经有了答案: Futures / Success race (3 个回答) 去年关闭。 所有的 future 最终可能会成功(有些可能会失败),但我们希望第一个成功。并希望将这一结果表
我在练习asyncio在编写多线程代码多年之后。 注意到一些我觉得很奇怪的东西。都在 asyncio在 concurrent有一个Future目的。 from asyncio import Futur
如何将Future[Option[Future[Option[X]]]]转换为Future[Option[X]]? 如果它是 TraversableOnce 而不是 Option 我会使用 Futur
我正在尝试同时发送 HTTP 请求。为此,我使用 concurrent.futures 这是简单的代码: import requests from concurrent import futures
我们在 vertx 中使用 Futures 的例子如下: Future fetchVehicle = getUserBookedVehicle(routingContext, client);
下面的函数,取自 here : fn connection_for( &self, pool_key: PoolKey, ) -> impl Future>, ClientError>
我正在围绕Java库编写一个小的Scala包装器。 Java库有一个对象QueryExecutor,它公开了2种方法: execute(query):结果 asyncExecute(query):Li
我是一名优秀的程序员,十分优秀!