gpt4 book ai didi

java - RxJava2 批处理项

转载 作者:行者123 更新时间:2023-11-29 08:30:31 25 4
gpt4 key购买 nike

我有一个连续生成的日志流,即只要系统中有新日志可用就会调用的方法。我不想在每次生成日志时都对其进行处理(因为日志每毫秒左右生成一次)。

我想收集在一段时间内发出的日志,比如 5 秒,然后批量处理它们。

我如何使用 rxjava 实现此目的。

我试过类似的东西

private static void logResults(LogData logData) {
Observable.create((ObservableOnSubscribe<LogData>) e -> {
e.onNext(logData);
}).buffer(5, TimeUnit.SECONDS).subscribeWith(new DisposableObserver<List<LogData>>() {
@Override
public void onNext(List<LogData> logData) {
System.out.print(logData.toString()));
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});
}


/**
This method get called every time when new log is there
*/
public static void logGenerated(LogData log) {
logResults(log);
}

最佳答案

您需要创建一个流,该流在对 logResults 的多次调用中保持 Activity 状态。最简单的方法是使用静态 PublishSubject:

private static final Subject<LogData> subject =
PublishSubject.<LogData>create(); // .toSerialized();

private static final Disposable logProcessing =
subject.buffer(5, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<List<LogData>>() {
@Override
public void onNext(List<LogData> logData) {
System.out.print(logData.toString()));
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});

private static void logResults(LogData logData) {
subject.onNext(logData);
}


/**
* This method get called every time when new log is there
*/
public static void logGenerated(LogData log) {
logResults(log);
}

关于java - RxJava2 批处理项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48705223/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com