gpt4 book ai didi

java - RxJava 在 Observable 中收集 AMPQ 事件并使用缓冲区订阅

转载 作者:行者123 更新时间:2023-12-02 02:08:30 25 4
gpt4 key购买 nike

我必须收集一些 AMPQ 事件,然后使用缓冲区每 10 秒打印一次。

private Observable<Event> obs = Observable.empty(); 
private final Disposable disposable = obs.buffer(10, SECONDS)
.retry(t -> true)
.subscribe(System.out::println);

@Override
public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
obs = obs.concatWith(Observable.just(event));
}

Event 是消息,void handle 是消费者。

我调试这段代码,它只打印一个空列表,这是有道理的,因为 obs 是空的。

如何将事件添加(连接?)到此 Observable 中并连续执行一次性?谢谢。

最佳答案

您需要一个Subject您可以订阅。可以使用 next(T element)

将新元素推送到主题中
private Subject<Event> subject = ReplaySubject.create();

@Override
public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
subject.next(event);
}

public Observable<Event> getObservable() {
return subject.asObservable();
}

您可以订阅由 getObservable() 方法返回的可观察对象。

关于java - RxJava 在 Observable 中收集 AMPQ 事件并使用缓冲区订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50391202/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com