gpt4 book ai didi

java - 如何锁定线程直到 Observable 完成

转载 作者:行者123 更新时间:2023-11-30 02:32:02 25 4
gpt4 key购买 nike

我正在测试一个场景,我想发送一个事件,并观察消费者何时完成处理以继续流程,即当我触发事件时,我需要阻塞主线程直到事件结束消费者处理,使用rxJava Observable,我没有成功锁定主线程等待可观察结果。

我的制作人

@Service
public class Producer {

private MessageChannel output;

@Autowired
private Consumer consumer;

@Autowired
public Producer(Processor processor) {
this.output = processor.output();
}

public void send(String event) {

System.out.println("SENDING EVENT...");

output.send(MessageBuilder.withPayload(event).build());

//Observable<Boolean> obs = consumer.execute();
//obs.subscribe();

//Blocking process
BlockingObservable.from(consumer.execute()).subscribe();

//Continue to flow
System.out.println("EVENT PROCESSED...");

}
}

我的消费者

@Service
public class Consumer {

@StreamListener(target = Processor.INPUT)
public void receiver(@Payload String event){

System.out.println("EVENT RECEIVED, PROCESSING...");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

execute();

}

public Observable<Boolean> execute() {
return Observable.<Boolean>create(emitter -> {
try {
System.out.println("EVENT STILL PROCESSING...");
emitter.onNext(Boolean.TRUE);
} catch (Exception e) {
emitter.onError(new RuntimeException("ERROR"));
}
emitter.onCompleted();
});
}
}

最佳答案

您可以使用 BlockingObservable.toFuture(consumer.execute()).get() 来阻止线程,而不是 BlockingObservable.from(consumer.execute()).subscribe() .

关于java - 如何锁定线程直到 Observable 完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44049114/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com