- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
在 this blog ,他给this (复制/粘贴以下代码)回调 hell 的示例。但是,没有提到如何通过使用 Reactive Extensions 来消除该问题。
所以这里 F3 取决于 F1 完成,而 F4 和 F5 取决于 F2 完成。
注意:我目前正试图围绕 Rx 进行思考,所以在问这个问题之前我没有尝试解决这个例子。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class CallbackB {
/**
* Demonstration of nested callbacks which then need to composes their responses together.
* <p>
* Various different approaches for composition can be done but eventually they end up relying upon
* synchronization techniques such as the CountDownLatch used here or converge on callback design
* changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
*/
public static void run() throws Exception {
final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
/* the following are used to synchronize and compose the asynchronous callbacks */
final CountDownLatch latch = new CountDownLatch(3);
final AtomicReference<String> f3Value = new AtomicReference<String>();
final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();
try {
// get f3 with dependent result from f1
executor.execute(new CallToRemoteServiceA(new Callback<String>() {
@Override
public void call(String f1) {
executor.execute(new CallToRemoteServiceC(new Callback<String>() {
@Override
public void call(String f3) {
// we have f1 and f3 now need to compose with others
System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
// set to thread-safe variable accessible by external scope
f3Value.set(f3);
latch.countDown();
}
}, f1));
}
}));
// get f4/f5 after dependency f2 completes
executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {
@Override
public void call(Integer f2) {
executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {
@Override
public void call(Integer f4) {
// we have f2 and f4 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
// set to thread-safe variable accessible by external scope
f4Value.set(f4);
latch.countDown();
}
}, f2));
executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {
@Override
public void call(Integer f5) {
// we have f2 and f5 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
// set to thread-safe variable accessible by external scope
f5Value.set(f5);
latch.countDown();
}
}, f2));
}
}));
/* we must wait for all callbacks to complete */
latch.await();
System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
} finally {
executor.shutdownNow();
}
}
public static void main(String[] args) {
try {
run();
} catch (Exception e) {
e.printStackTrace();
}
}
private static final class CallToRemoteServiceA implements Runnable {
private final Callback<String> callback;
private CallToRemoteServiceA(Callback<String> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseA");
}
}
private static final class CallToRemoteServiceB implements Runnable {
private final Callback<Integer> callback;
private CallToRemoteServiceB(Callback<Integer> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(100);
}
}
private static final class CallToRemoteServiceC implements Runnable {
private final Callback<String> callback;
private final String dependencyFromA;
private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
this.callback = callback;
this.dependencyFromA = dependencyFromA;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseB_" + dependencyFromA);
}
}
private static final class CallToRemoteServiceD implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(140);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(40 + dependencyFromB);
}
}
private static final class CallToRemoteServiceE implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(55);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(5000 + dependencyFromB);
}
}
private static interface Callback<T> {
public void call(T value);
}
}
最佳答案
我是有关回调和 Java future 的引用博客文章的原作者。这是一个使用 flatMap、zip 和 merge 异步进行服务组合的示例。
它获取一个 User 对象,然后同时获取 Social 和 PersonalizedCatalog 数据,然后为 PersonalizedCatalog 中的每个视频同时获取一个书签、评级和元数据,将它们压缩在一起,并将所有响应合并到一个渐进式流输出中作为服务器- 发送事件。
return getUser(userId).flatMap(user -> {
Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
.flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
video -> {
Observable<Bookmark> bookmark = getBookmark(video);
Observable<Rating> rating = getRatings(video);
Observable<VideoMetadata> metadata = getVideoMetadata(video);
return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
}));
Observable<Map<String, Object>> social = getSocial(user).map(s -> {
return s.getDataAsMap();
});
return Observable.merge(catalog, social);
}).flatMap(data -> {
String json = SimpleJson.mapToJson(data);
return response.writeStringAndFlush("data: " + json + "\n");
});
这个例子可以在 https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33 的一个正常运行的应用程序的上下文中看到。
由于我无法在此处提供所有信息,您还可以在 https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32 以演示文稿形式(带有视频链接)找到解释。 .
关于java - 如何组合 Observables 以避免给定的嵌套和依赖回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28402376/
在下面的代码中,我得到一个 uninitialized value警告,但仅限于第二个 given/when例子。为什么是这样? #!/usr/bin/env perl use warnings; u
整个“开关”功能是否已成为实验性的?在没有 Perl 的 future 版本破坏我的代码的情况下,我可以依赖其中的某些部分吗?一般来说,将稳定功能更改为实验性的政策是什么? 背景use feature
有没有办法在一个条件语句中写出如下语句? a和b不能同时等于5。 (a可以是5,b可以是5,但是a AND b不能是5) 最佳答案 正如克里斯指出的那样,您要查找的是逻辑异或,相当于逻辑不等于 !=:
我正在寻找一种算法来找到给定 n 条线段的所有交点。以下是来自 http://jeffe.cs.illinois.edu/teaching/373/notes/x06-sweepline.pdf 的伪
数组中有 N 个元素。我可以选择第一项最多 N 次,第二项最多选择 N-1 次,依此类推。 我有 K 个 token 要使用并且需要使用它们以便我可以拥有最大数量的项目。 arr = [3, 4, 8
我正在尝试修复法语文本中的语法性别,想知道是否有办法从某个词条中获取所有单词的列表,以及是否可以在此类列表中进行查找? 最佳答案 尝试: import spacy lemma_lookup = spa
我正在为 Win32 编写一个简单的自动化测试应用程序。它作为一个单独的进程运行,并通过 Windows API 访问目标应用程序。我可以阅读窗口层次结构,查找标签和文本框,并通过发送/发布消息等来单
在 nodeJs 中使用 Sequelize 时,我从 Sequelize 收到此错误,如下所示: { [SequelizeUniqueConstraintError: Validation erro
本文https://arxiv.org/pdf/1703.10757.pdf使用回归激活映射 (RAM) - 而不是类激活映射 (CAM) 来解决问题。有几篇文章描述了如何实现 CAM。但是我找不到
我正在研究 Mach 动态链接器 dyld。这个问题适用于所有 Apple 平台,但很高兴得到特定于平台的答案;我正在使用 ObjC,但如果对你有用的话,我也很乐意翻译 Swift。 The rele
我有一个包含数千个 Instagram 用户 ID 的列表。我如何获得他们的 Instagram 用户名/句柄? 最佳答案 你必须使用这个 Instagram API: https://api.ins
我在下面的代码: def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Spark-Hbase").s
我有一个表格,其中包含从 1 到 10 的数字。(从 D2 到 M2) 假设A1中有03/09/2019 并且在B1中有06/09/2019 并且在C1中有Hello 在A 列中,我有多个系列的单词,
我想在给定服务对应的 URI 的情况下检索服务的注释(特别是 @RolesAllowed )。这是一个例子: 服务: @GET @Path("/example") @RolesAllowed({ "B
我看到 OraclePreparedStatementexecuteQuery() 表现出序列化。也就是说,我想使用相同的连接对 Oracle 数据库同时运行两个查询。然而,OraclePrepare
import java.util.Scanner; public class GeometricSumFromK { public static int geometricSum(int k,
我创建了一个抽象基类Page,它说明了如何构建动态网页。我正在尝试想出一种基于作为 HttpServletRequest 传入的 GET 请求生成 Page 的好方法。例如... public cla
我的字符串是一条短信,采用以下两种格式之一: 潜在客户短信: 您已收到 1 条线索 标题:我的领导 潜在客户 ID:12345-2365 警报设置 ID:890 短信回复: 您已收到 1 条回复 标题
我在 python 中有以下代码: class CreateMap: def changeme(listOne, lisrTwo, listThree, listFour, listfive):
这是在 Hibernate 上运行的 JPA2。 我想检索相同实体类型的多个实例,给定它们的 ID。其中许多已经在持久性上下文和/或二级缓存中。 我尝试了几种方法,但似乎都有其缺点: 当我使用 ent
我是一名优秀的程序员,十分优秀!