- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要创建这个:
现在我有了一个工作解决方案,用于创建新线程并在新线程上完成所有 signHash(..) 时在主线程上恢复。 [请参阅帖子末尾的工作代码]
正如您在图中所看到的,我必须创建一种在新的独立线程上管理方法的一部分的“等待和恢复功能”的方法 signHash(..) 现在是通过 sleep 来模拟的。
特别是,在必须出现在该位置的String hash = doStuff(..)之后,我想填充一个像这样的bean:
import java.util.Hashtable;
import java.util.Map;
public class DocumentHashBucket {
private int numberNeededHashes;
private boolean completedStoreHashes;
private boolean completedStoreSignedHashes;
private Map<String,byte[]> map = new Hashtable<>();
}
为了存储由 String hash = doStuff(..) 创建的哈希,并且只有所有线程都完成其 doStuff(..) 到达 >numberNeededHashes 我必须在单个线程上只调用一次对互联网上签署哈希值的云服务的调用。当云上的签名哈希完成后,我将可以使用签名哈希更改映射,并允许恢复将执行 doStuff2(signedHash); 的 signHash(..) em> 然后关闭方法/线程。
要求:必须在该位置调用 doStuff(..) 和 doStuff2(signedHash) 并管理单个外部云他们之间通话。
问题:如何使用 RxJava 轻松做到这一点?
提前非常感谢
我正在使用的工作代码:
package com.example.unit;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executor;
import static org.junit.jupiter.api.Assertions.*;
public class UTestRxJava {
@Test
void uRxJavaSimple() {
// Create an iterable observables
List<Integer> calls = new LinkedList<>();
calls.add(1);
calls.add(2);
calls.add(3);
calls.add(4);
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
System.out.println("Starting parallel executions");
// Create an iterable observables
List<Observable<Integer>> observables = new LinkedList<>();
for (final Integer i: calls) {
System.out.println("Adding... "+i);
observables.add(Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return signHash(i);
}
}).subscribeOn(Schedulers.newThread())); // subscribeOn => specify the Scheduler on which an Observable will operate
}
final Map<String,String> mapResults = new HashMap<>();
Observable.zip(observables, new Function<Object[], Object>() {
@Override
public Object apply(Object[] objects) throws Throwable { // Zip observables
System.out.println("apply()");
List<String> observables = new LinkedList<>();
for (Object obj:objects) {
System.out.println("Applying... "+obj.toString());
observables.add(obj.toString());
}
return observables;
}
})
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object results) throws Throwable {
System.out.println("Ending parallel executions");
}
})
.observeOn(Schedulers.from(new Executor() {
@Override
public void execute(Runnable runnable) {
tasks.add(runnable);// Add a scheduler with executor from the current thread
}}))
.subscribe(new Consumer<Object>() {
// The Subscribe operator is the glue that connects an observer to an Observable
@Override
public void accept(Object results) throws Throwable { // Subscribe to the result.
// Put your code that needs to "wait"
for (String x : (List<String>)results) {
System.out.println("Results: "+x);
mapResults.put(x,"OK");
}
}
});
System.out.println("[START] TAKE-RUN");
try {
tasks.take().run();
} catch (InterruptedException e) {e.printStackTrace();fail("it's not possible that there is an exception");}
System.out.println("[END] TAKE-RUN");
assertTrue(mapResults.size()==4);
}
private Integer signHash(Integer number) {
String hash = doStuff(..)
Integer result = number * number;
System.out.println("Pre log \t"+Thread.currentThread().getName()+"\t"+ result);
try {
// TODO chage it with List<Hash> signHashesOnInternet(List<Hash>) only on one of all threads
Thread.sleep(number * 1000);
} catch (Exception e) {e.printStackTrace();}
System.out.println("Post log \t"+Thread.currentThread().getName()+"\t"+ result);
doStuff2(signedHash);
return result;
}
}
输出为:
Starting parallel executions
Adding... 1
Adding... 2
Adding... 3
Adding... 4
[START] TAKE-RUN
Pre log RxNewThreadScheduler-1 1
Pre log RxNewThreadScheduler-2 4
Pre log RxNewThreadScheduler-3 9
Pre log RxNewThreadScheduler-4 16
Post log RxNewThreadScheduler-1 1
Post log RxNewThreadScheduler-2 4
Post log RxNewThreadScheduler-3 9
Post log RxNewThreadScheduler-4 16
apply()
Applying... 1
Applying... 4
Applying... 9
Applying... 16
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
[END] TAKE-RUN
Process finished with exit code 0
最佳答案
这是我发现并测试的内容。我不知道这是否是最佳解决方案,但对我来说现在它正在发挥作用。
我使用了BehaviorSubject和他的方法blockingForEach来观察DocumentHashBucket<上的变化 (这是我创建的 bean)。
通过这种方法,blockingForEach允许我拥有一个阻塞代码,该代码不会继续执行其余代码,直到behaviorSubject.onComplete() 被调用。
package com.example.unit;
import com.example.DocumentHashBucket;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import org.apache.commons.codec.binary.Base64;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class UTestRxJavaStackOverflowSolution {
@Test
void uRxJavaSimple() {
// Create an iterable observables
List<Integer> calls = new LinkedList<>();
calls.add(1);
calls.add(2);
calls.add(3);
calls.add(4);
// create the Observable that will be used for collect all hashes and store the signed version of each
final DocumentHashBucket hashBucket = new DocumentHashBucket();
hashBucket.setNumberDocuments(calls.size());
final BehaviorSubject<DocumentHashBucket> behaviorSubject = BehaviorSubject.createDefault(hashBucket);
// create the BlockingQueue in order to have a blocking point for the main thread
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
System.out.println("Starting parallel executions");
// Create an iterable observables
List<Observable<Integer>> observables = new LinkedList<>();
for (final Integer i: calls) {
System.out.println("Adding... "+i);
observables.add(Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call(){
Integer res = signHash(behaviorSubject, i);
assertTrue(res==i*i,"The result must be the square of the variable");
return res;
}
}).subscribeOn(Schedulers.newThread())); // subscribeOn => specify the Scheduler on which an Observable will operate
}
final Map<String,String> mapResults = new HashMap<>();
Observable.zip(observables, new Function<Object[], Object>() {
@Override
public Object apply(Object[] objects) throws Throwable { // Zip observables
System.out.println("apply()");
List<String> observables = new LinkedList<>();
for (Object obj:objects) {
System.out.println("Applying... "+obj.toString());
observables.add(obj.toString());
}
return observables;
}
})
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object results){
System.out.println("Ending parallel executions");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable){
System.err.println("Error on execution of "+Thread.currentThread().getName()+" : "+ throwable.getMessage());
throwable.printStackTrace();
}
})
.observeOn(Schedulers.from(new Executor() {
@Override
public void execute(Runnable runnable) {
tasks.add(runnable);// Add a scheduler with executor from the current thread
}}))
.subscribe(new Consumer<Object>() {
// The Subscribe operator is the glue that connects an observer to an Observable
@Override
public void accept(Object onNext) { // Subscribe to the result.
// Put your code that needs to "wait"
for (String x : (List<String>) onNext) {
System.out.println("Results: " + x);
mapResults.put(x, "OK");
}
}
},
new Consumer<Throwable>() {
// The Subscribe operator is the glue that connects an observer to an Observable
@Override
public void accept(Throwable onError) { // Subscribe to the result.
System.err.println("Error on execution in one thread detected on this (main) thread : " + onError.getMessage());
onError.printStackTrace();
}
},
new Action() {
@Override
public void run(){
System.out.println("onComplete");
}
});
System.out.println("[START] TAKE-RUN");
try {
tasks.take().run();
} catch (InterruptedException e) {
System.err.println("Error on execution of zip: "+e.getMessage());
e.printStackTrace();
fail("it's not possible that there is an exception");
}
System.out.println("[END] TAKE-RUN");
assertTrue(mapResults.size()==4);
}
private Integer signHash(final BehaviorSubject<DocumentHashBucket> behaviorSubject, Integer number) {
System.out.println(Thread.currentThread().getName()+" [START] doStuff()");
try {
Thread.sleep(number * 1000);
} catch (Exception e) {e.printStackTrace();}
System.out.println(Thread.currentThread().getName()+" [END] doStuff()");
DocumentHashBucket hashBucket = behaviorSubject.getValue();
byte[] numberStringInBytes = ("" + number).getBytes();
String key = Base64.encodeBase64String(numberStringInBytes);
hashBucket.getMap().put(key, numberStringInBytes);
System.out.println(Thread.currentThread().getName()+" hashBucket.getMap().put new hash : "+key);
behaviorSubject.blockingForEach(new Consumer<DocumentHashBucket>() {
@Override
public void accept(DocumentHashBucket documentHashBucket) throws Throwable {
System.out.println(Thread.currentThread().getName()+" [blockingForEach] DocumentHashBucket is changed and now contains "+documentHashBucket.getMap().size()+" elements");
synchronized (documentHashBucket){
if(documentHashBucket.getNumberDocuments()==documentHashBucket.getMap().size() && documentHashBucket.isCompletedStoreHashes()==false){
System.out.println(Thread.currentThread().getName()+"|> --------- all hashes arrived ---------");
// all hashes arrived
documentHashBucket.setCompletedStoreHashes(true);
System.out.println(Thread.currentThread().getName()+" [START] simulate signHashesOnInternet(..)");
try {
// simulate network call
Thread.sleep(10 * 1000);
} catch (Exception e) {e.printStackTrace();}
// simulate signing hash on the DocumentHashBucket
for(String key : documentHashBucket.getMap().keySet()){
int value = Integer.parseInt(new String(documentHashBucket.getMap().get(key)));
documentHashBucket.getMap().put(key,(""+(value*value)).getBytes());
}
System.out.println(Thread.currentThread().getName()+" [END] simulate signHashesOnInternet(..)");
// DocumentHashBucket - now hashes are signed
documentHashBucket.setCompletedStoreSignedHashes(true);
// unlock the blockingForEach
behaviorSubject.onComplete();
System.out.println(Thread.currentThread().getName()+"|> --------- all hashes SAVED ---------");
}else{
System.out.println(Thread.currentThread().getName()+"changed but hashes are: "+documentHashBucket.getMap().size());
}
}// synchronized
}
});
// check that the signing process is applied correctly
assertTrue(Arrays.equals(hashBucket.getMap().get(key),(""+(number*number)).getBytes()));
System.out.println(Thread.currentThread().getName()+" [START] doStuff2()");
try {
Thread.sleep(number * 1000);
} catch (Exception e) {e.printStackTrace();}
System.out.println(Thread.currentThread().getName()+" [END] doStuff2()");
assertTrue(hashBucket.getMap().size()==4,"The map must contains 4 elements");
assertTrue(hashBucket.isCompletedStoreHashes(),"The flag completedStoreHashes must be true");
assertTrue(hashBucket.isCompletedStoreSignedHashes(),"The flag completedStoreSignedHashes must be true");
return number*number;
}
}
执行日志:
Starting parallel executions
Adding... 1
Adding... 2
Adding... 3
Adding... 4
RxNewThreadScheduler-1 [START] doStuff()
RxNewThreadScheduler-2 [START] doStuff()
RxNewThreadScheduler-3 [START] doStuff()
[START] TAKE-RUN
RxNewThreadScheduler-4 [START] doStuff()
RxNewThreadScheduler-1 [END] doStuff()
RxNewThreadScheduler-1 hashBucket.getMap().put new hash : MQ==
RxNewThreadScheduler-1 [blockingForEach] DocumentHashBucket is changed and now contains 1 elements
RxNewThreadScheduler-1changed but hashes are: 1
RxNewThreadScheduler-2 [END] doStuff()
RxNewThreadScheduler-2 hashBucket.getMap().put new hash : Mg==
RxNewThreadScheduler-2 [blockingForEach] DocumentHashBucket is changed and now contains 2 elements
RxNewThreadScheduler-2changed but hashes are: 2
RxNewThreadScheduler-3 [END] doStuff()
RxNewThreadScheduler-3 hashBucket.getMap().put new hash : Mw==
RxNewThreadScheduler-3 [blockingForEach] DocumentHashBucket is changed and now contains 3 elements
RxNewThreadScheduler-3changed but hashes are: 3
RxNewThreadScheduler-4 [END] doStuff()
RxNewThreadScheduler-4 hashBucket.getMap().put new hash : NA==
RxNewThreadScheduler-4 [blockingForEach] DocumentHashBucket is changed and now contains 4 elements
RxNewThreadScheduler-4|> --------- all hashes arrived ---------
RxNewThreadScheduler-4 [START] simulate signHashesOnInternet(..)
RxNewThreadScheduler-4 [END] simulate signHashesOnInternet(..)
RxNewThreadScheduler-4|> --------- all hashes SAVED ---------
RxNewThreadScheduler-2 [START] doStuff2()
RxNewThreadScheduler-1 [START] doStuff2()
RxNewThreadScheduler-4 [START] doStuff2()
RxNewThreadScheduler-3 [START] doStuff2()
RxNewThreadScheduler-1 [END] doStuff2()
RxNewThreadScheduler-2 [END] doStuff2()
RxNewThreadScheduler-3 [END] doStuff2()
RxNewThreadScheduler-4 [END] doStuff2()
apply()
Applying... 1
Applying... 4
Applying... 9
Applying... 16
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
onComplete
[END] TAKE-RUN
Process finished with exit code 0
关于java - 等到每个外部线程上的 RxJava 出现更多条件 - 跨线程执行单个操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58821297/
我正在进行 2 个 RX 调用,这些调用相互嵌套且相互依赖。服务器存在问题(由于各种原因目前无法解决),该问题在第二个嵌套调用中返回错误。 在这个问题得到解决之前,我需要确保如果第二次调用返回错误,则
这个问题在这里已经有了答案: How to resolve Duplicate files copied in APK META-INF/rxjava.properties (8 个答案) 关闭 5
我正在尝试将此 RxJava1 代码转换为 RxJava2 public static Observable listFolder(Path dir, String glob) { retur
这个问题在这里已经有了答案: RxJava 1 and RxJava 2 in the same project [duplicate] (1 个回答) How to resolve Duplica
有显示项目的RecyclerViewAdapter(该项目已经映射到数据库中)。 RecyclerViewAdapter 包含对 Presenter 的引用以加载项目。它还包含带有项目 ID 的 Ar
我想弄清楚如何在 Android 中使用 RxJava 将 Realm 对象保存在 Realm 中。到目前为止,结合所有这些的所有示例都是如何从 Realm 读取数据的。我想在 android 中使用
我在日志中收到此错误: Caused by java.lang.ClassCastException: java.net.UnknownHostException cannot be cast to
我有一个 API 服务类,其方法返回 Retrofit 提供的调用。 最近,Rx2Java 引入了 Single,所以我想将 Call 更改为 Single,但我不想更改逻辑。 例如 : 类接口(in
如何使用运算符让我始终获得以前和当前的值?如果可能的话,我想避免在管道外创建状态。 - time -> 1 2 3 4 | | | | Op
我正在努力实现以下目标。我加载了一个对象列表,我想获取稍后放入列表中的值。 首先,我使用 flatmap 将所有值收集到一个数组中(按山顺序),然后当一切完成后,我填充一个适配器。 我无法做的是每
是否可以选择使用 timeout 的变体不发射 Throwable ? 我要 complete事件发出。 最佳答案 您不需要使用 onErrorResumeNext 映射错误。您可以使用以下方法提供备
我们可以在 C# Rx 中异步执行一些代码,如下所示,使用 Observable.Start()。我想知道 RxJava 中的等价物是什么。 void Main() { AddTwoNum
问题:我有一个用户可以输入查询字符串的功能,我制作了 2 个可观察对象,一个用于查询我的本地数据库,另一个用于从 API 获取结果。这两个操作必须并行运行。我需要尽快显示来自数据库的结果,当 API
我正在尝试在 MVVM 中实现 ViewModel,将可观察对象作为“输入流”提供,将观察者作为“输出流”提供以供 View 绑定(bind)。 如果 getUser() 调用成功,下面的代码似乎可以
出于某种原因,我有时想使用 RxOperators 而不是普通的 java 方式来转换数据结构,因为它越来越干净。例如: Observable.from(listOfStrings) .filter(
我是 RxJava 新手,我需要以异步方式使用 Observable 功能。 我还需要使用超时:在我的示例中,我希望每个进程在 1 秒或更短的时间内结束。 这是我现在所做的: public stati
我正在尝试在网络请求期间在UI中显示进度条至少3秒钟。 此答案中描述的相同方法似乎不适用于Single。 RxJava Observable minimum execution time Single
我有一个可观察的(很热),它通过系统进程执行操作,并且我希望也运行一个间隔,直到该进程可观察达到 onComplete。 我看到区间运算符:http://reactivex.io/documentat
好吧,我是 RxJava2 的新手(嗯,我也不了解 RxJava),并且正在尝试使用 RxJava2 和 MVP 结构开发 Android 应用程序。 在该应用程序中,我正在对使用监听器的库进行异步调
如何将单个流拆分为单独的单个流,这样就可以执行以下操作而无需两次计算getUserId()? // getUserId() returns Single getUserId().flatMap { g
我是一名优秀的程序员,十分优秀!