gpt4 book ai didi

java - 使用 Kotlin 的 RxJava - 如何同步 2 个异步方法,从 Java 重构

转载 作者:行者123 更新时间:2023-12-02 12:06:37 25 4
gpt4 key购买 nike

我有 2 个集合,它们缓冲位置更新事件:

     private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>();
private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>();

我的代码中也存在:

        private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor();
private boolean mSaveDataScheduled;
private final Object mEventsMonitor = new Object();

private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture;
private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor();

我将事件添加到这个集合中,如下所示:

    public void appendGeoEvent(LocationGeoEvent event) {
synchronized (mEventsMonitor) {
mUpdateGeoEvents.add(event);
scheduleSaveEvents();
}
}

RSSI 事件也是如此

现在,scheduleSaveEvents 方法如下所示:

      private void scheduleSaveEvents() {

synchronized (mSaveDataExecutor) {
if (!mSaveDataScheduled) {
mSaveDataScheduled = true;
mSaveDataExecutor.schedule(
new Runnable() {
@Override
public void run() {
synchronized (mSaveDataExecutor) {
saveEvents(false);
mSaveDataScheduled = false;
}
}
},
30,
TimeUnit.SECONDS);
}
}

}

问题是,我需要同步另一个停止更新的方法。它是这样触发的:

      private void scheduleStopLocationUpdates() {

synchronized (mStopLocationUpdatesExecutor) {
if (mScheduledStopLocationUpdatesFuture != null)
mScheduledStopLocationUpdatesFuture.cancel(true);

mScheduledStopLocationUpdatesFuture = mStopLocationUpdatesExecutor.schedule(
new Runnable() {
@Override
public void run() {
synchronized (mStopLocationUpdatesExecutor) {
stopLocationUpdates();
saveEvents(true);
cleanAllReadingsData();
}
}
},
45,
TimeUnit.SECONDS);
}

}

在 saveEvents 方法中我这样做:

    private void saveEvents(boolean locationUpdatesAboutToStop) {

synchronized (mEventsMonitor) {
if (mUpdateGeoEvents.size() > 0 || mUpdateRSSIEvents.size() > 0) {

//do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop

mUpdateGeoEvents.clear();
mUpdateRSSIEvents.clear();
}

}

}

有没有办法使用 Kotlin 将其更简单地重构为 RxJava?

更新

这是我的appendRSSIevents方法:

    private fun appendRSSIEvent(event: LocationRSSIEvent) {
synchronized(mEventsMonitor) {
if (!shouldSkipRSSIData(event.nexoIdentifier)) {
mUpdateRSSIEvents.add(event)
acknowledgeDevice(event.nexoIdentifier)
scheduleSaveEvents()
startLocationUpdates()
} else
removeExpiredData()
}
}

最佳答案

您可以缓冲两个数据流,然后将它们合并保存。此外,您还可以使用缓冲区触发器来停止更新。

PublishSubject<LocationGeoEvent> mUpdateGeoEventsSubject = PublishSubject.create();
PublishSubject<LocationRSSIEvent> mUpdateRSSIEventsSubject = PublishSubject.create();

public void appendGeoEvent(LocationGeoEvent event) {
mUpdateGeoEventsSubject.onNext( event );
triggerSave.onNext( Boolean.TRUE );
}

对于 RSS feed 也是如此。

现在我们需要用于驱动保存步骤的触发器。

PublishSubject<Boolean> triggerSave = PublishSubject.create();
PublishSubject<Boolean> triggerStopAndSave = PublishSubject.create();

Observable<Boolean> normalSaveTrigger = triggerSave.debounce( 30, TimeUnit.SECONDS );
Observable<Boolean> trigger = Observable.merge( normalSaveTrigger, triggerStopAndSave );

当正常保存过程触发或我们停止保存时,trigger observable 就会触发。

private void saveEvents(
List<LocationGeoEvent> geo,
List<LocationRSSIEvent> rss,
boolean locationUpdatesAboutToStop) {

synchronized (mEventsMonitor) {
if (geo.size() > 0 || rss.size() > 0) {
//do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop
}
}
}
private void scheduleStopLocationUpdates() {
stopLocationUpdates();
triggerStopAndSave.onNext( Boolean.FALSE );
cleanAllReadingsData();
}

Observable.zip( mUpdateGeoEventsSubject.buffer( trigger ),
mUpdateRSSIEventsSubject.buffer( trigger ),
trigger, (geo, rss, trgr) -> saveEvents(geo, rss, trgr) )
.subscribe();

您仍然需要对多线程和安全性进行一些调整。第一步是将各种主题转换为 SerializedSubject ,以便多个线程可以发出事件。

如果您希望 saveEvents 在特定调度程序上运行,您需要添加一个中间数据结构(一个三元组),以通过 observeOn() 传递参数运算符,或将 observeOn() 运算符应用于每个 zip() 参数。

关于java - 使用 Kotlin 的 RxJava - 如何同步 2 个异步方法,从 Java 重构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46873977/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com