- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
在 this blog ,他给this (复制/粘贴以下代码)回调 hell 的示例。但是,没有提到如何通过使用 Reactive Extensions 来消除该问题。
所以这里 F3 取决于 F1 完成,而 F4 和 F5 取决于 F2 完成。
注意:我目前正试图围绕 Rx 进行思考,所以在问这个问题之前我没有尝试解决这个例子。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class CallbackB {
/**
* Demonstration of nested callbacks which then need to composes their responses together.
* <p>
* Various different approaches for composition can be done but eventually they end up relying upon
* synchronization techniques such as the CountDownLatch used here or converge on callback design
* changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
*/
public static void run() throws Exception {
final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
/* the following are used to synchronize and compose the asynchronous callbacks */
final CountDownLatch latch = new CountDownLatch(3);
final AtomicReference<String> f3Value = new AtomicReference<String>();
final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();
try {
// get f3 with dependent result from f1
executor.execute(new CallToRemoteServiceA(new Callback<String>() {
@Override
public void call(String f1) {
executor.execute(new CallToRemoteServiceC(new Callback<String>() {
@Override
public void call(String f3) {
// we have f1 and f3 now need to compose with others
System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
// set to thread-safe variable accessible by external scope
f3Value.set(f3);
latch.countDown();
}
}, f1));
}
}));
// get f4/f5 after dependency f2 completes
executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {
@Override
public void call(Integer f2) {
executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {
@Override
public void call(Integer f4) {
// we have f2 and f4 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
// set to thread-safe variable accessible by external scope
f4Value.set(f4);
latch.countDown();
}
}, f2));
executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {
@Override
public void call(Integer f5) {
// we have f2 and f5 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
// set to thread-safe variable accessible by external scope
f5Value.set(f5);
latch.countDown();
}
}, f2));
}
}));
/* we must wait for all callbacks to complete */
latch.await();
System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
} finally {
executor.shutdownNow();
}
}
public static void main(String[] args) {
try {
run();
} catch (Exception e) {
e.printStackTrace();
}
}
private static final class CallToRemoteServiceA implements Runnable {
private final Callback<String> callback;
private CallToRemoteServiceA(Callback<String> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseA");
}
}
private static final class CallToRemoteServiceB implements Runnable {
private final Callback<Integer> callback;
private CallToRemoteServiceB(Callback<Integer> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(100);
}
}
private static final class CallToRemoteServiceC implements Runnable {
private final Callback<String> callback;
private final String dependencyFromA;
private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
this.callback = callback;
this.dependencyFromA = dependencyFromA;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseB_" + dependencyFromA);
}
}
private static final class CallToRemoteServiceD implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(140);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(40 + dependencyFromB);
}
}
private static final class CallToRemoteServiceE implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(55);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(5000 + dependencyFromB);
}
}
private static interface Callback<T> {
public void call(T value);
}
}
最佳答案
我是有关回调和 Java future 的引用博客文章的原作者。这是一个使用 flatMap、zip 和 merge 异步进行服务组合的示例。
它获取一个 User 对象,然后同时获取 Social 和 PersonalizedCatalog 数据,然后为 PersonalizedCatalog 中的每个视频同时获取一个书签、评级和元数据,将它们压缩在一起,并将所有响应合并到一个渐进式流输出中作为服务器- 发送事件。
return getUser(userId).flatMap(user -> {
Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
.flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
video -> {
Observable<Bookmark> bookmark = getBookmark(video);
Observable<Rating> rating = getRatings(video);
Observable<VideoMetadata> metadata = getVideoMetadata(video);
return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
}));
Observable<Map<String, Object>> social = getSocial(user).map(s -> {
return s.getDataAsMap();
});
return Observable.merge(catalog, social);
}).flatMap(data -> {
String json = SimpleJson.mapToJson(data);
return response.writeStringAndFlush("data: " + json + "\n");
});
这个例子可以在 https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33 的一个正常运行的应用程序的上下文中看到。
由于我无法在此处提供所有信息,您还可以在 https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32 以演示文稿形式(带有视频链接)找到解释。 .
关于java - 如何组合 Observables 以避免给定的嵌套和依赖回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28402376/
所以我试图设置“内容”类的高度,但它似乎不起作用。我对嵌套 DIV 非常陌生,我已经尝试了我在谷歌搜索中发现的修复程序,但似乎没有任何效果。帮助?
好的,所以我一直在四处寻找,但找不到这个问题的答案。但是,我需要将一个 View 嵌套在另一个 View 中。 我有一个 $layout 正在使用我拥有的 default.layout Blade 文
好的,所以我一直在四处寻找,但找不到这个问题的答案。但是,我需要将一个 View 嵌套在另一个 View 中。 我有一个 $layout 正在使用我拥有的 default.layout Blade 文
基本上,我的问题很简单,但它需要知道 Struts 1.1 并且还活着的人。 我尝试构建的伪代码看起来像这样: IF element.method1 = true THEN IF element
我正在尝试将 Excel 嵌套 IF 语句转换为代码语言,但我不确定我是否正确执行此操作,希望能得到一些帮助 这是Excel语句: =IF(D3="Feather",IF(OR(I3>1000,R3=
如果我们创建两个或三个评论并对其进行多次回复,则“有用”链接在单击时会导致问题,它会对具有相同编号的索引执行 ng-click 操作,从而显示具有相同索引的所有文本。如何解决此嵌套问题,以便在单击链接
我在项目中使用Scala,想与Stripe集成,但它只提供Java API。例如,要创建 session ,我使用: val params = new util.HashMap[String, Any
以下代码有一个 Div,其中连续包含四个较小的 Div。四个 Div 中的每一个还包含一个较小的 Div,但此 Div 未显示。我尝试了各种显示和位置组合,看看 div 是否会出现。 classGoa
我在这里有一个问题,循环是: for (i=0; i < n; ++i) for (j = 3; j < n; ++j) { ...
我正在尝试编写代码来显示具有奇数宽度的形状。形状完成后,将其放置在外部形状内。用户将能够输入用于形状的字符和行数。我希望生成一个形状,并通过 for 循环生成一个外部形状。 ***** .
$(".globalTabs").each(function(){ var $globalTabs = $(this); var parent = $globalTabs.parent
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 9 年前。 Improve th
所以我在这个问题上遇到了一些麻烦,因为变量 i。我只是不确定在第二个 while 循环中如何处理它。对于我的外循环,我知道它将运行 log_4(n^2) 次迭代。对于内部 while 循环,我计算的迭
我似乎找不到在枚举上应用多个 if/then 逻辑的工作方式。 anyOf 不应用条件逻辑,而是表示如果其中任何一个匹配则很好。 allOf 再次不应用条件逻辑,而是测试属性/必填字段的超集。 这是一
如何访问 ReaderT 的内部 monad。 在我的例子中,我有类型: newtype VCSSetupAction a = VCSSetupAction (ReaderT (Maybe VCSCo
这个问题在这里已经有了答案: Add leading zeroes/0's to existing Excel values to certain length (7 个回答) 7年前关闭。 我正在寻
我已经绑定(bind)了很多 AND/OR 函数的组合并且没有运气。 这是我需要创建的: 在 B 列中,我有公司 ID,范围从两个数字字符到六个数字字符。 我需要在 B 列中的每个公司 ID 之前的每
我是 VBA 新手,在尝试编写的宏中使用 If 语句时遇到了一些困难。每个月我都会收到一份 Excel 报告,其中列出了我们公司的哪些员工执行了某些任务。我正在编写的宏旨在将每个员工的数据复制并粘贴到
如果在 B 列中找到单元格 A1 中的值,则使用文本 321 填充除非在 C 列中找到单元格 A1 中的值,在这种情况下填充文本 121反而。如果单元格 A1 的内容不在 B 列或 C 列中,则使用
我有几十万个地址。其中一些在整数之后有粒子。如 4356 A Horse Avenue , 其他格式正常4358 Horse Avenue .有些有“A”,有些有“B”。我正在尝试删除整数和粒子之间的
我是一名优秀的程序员,十分优秀!