gpt4 book ai didi

android - 将 Firebase 实时数据监听器与 RxJava 相结合

转载 作者:塔克拉玛干 更新时间:2023-11-01 21:24:56 24 4
gpt4 key购买 nike

我正在使用 Firebase在我的应用程序中,还有 RxJava。每当后端数据发生变化(添加、删除、更改……)时,Firebase 能够通知您的应用程序。我正在尝试将 Firebase 的功能与 RxJava 相结合。

我正在监听的数据称为 LeisureObservable 发出 LeisureUpdate,其中包含一个 Leisure以及更新的类型(添加、删除、移动、更改)。

这是我允许订阅此事件的方法。

private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;

@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {

@Override
public void call(final Subscriber<? super LeisureUpdate> subscriber) {
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
}

@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
}

@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
}

@Override
public void onChildMoved(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
}

@Override
public void onCancelled(FirebaseError firebaseError) {
subscriber.onError(new Error(firebaseError.getMessage()));
}
});
}
});
}
leisureUpdatesSubscriptionsCount++;
return leisureUpdatesObservable;
}

首先,我想使用 Observable.fromCallable() 方法来创建 Observable,但我想这是不可能的,因为 Firebase 使用回调,对吧?

我保留了 Observable 的单个实例,以便始终有一个 Observable,其中多个 Subscriber 可以订阅。

当每个人都取消订阅时,问题就来了,我需要停止监听 Firebase 中的事件。无论如何,我没有找到让 Observable 了解是否还有任何订阅的方法。所以我一直在计算我用 leisureUpdatesSubscriptionsCount 调用了多少次 subscribeToLeisuresUpdates()

然后每次有人想取消订阅时都必须调用

@Override
public void unsubscribeFromLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
return;
}
leisureUpdatesSubscriptionsCount--;
if (leisureUpdatesSubscriptionsCount == 0) {
firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
leisureUpdatesObservable = null;
}
}

这是我发现在有订阅者时让 Observable 发出项目的唯一方法,但我觉得必须有更简单的方法,特别是当没有更多订阅者收听时可观察的。

谁遇到过类似的问题或者有不同的方法?

最佳答案

您可以使用 Observable.fromEmitter,类似这些

    return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
@Override
public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
final ValueEventListener listener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
// process update
LeisureUpdate leisureUpdate = ...
leisureUpdateEmitter.onNext(leisureUpdate);
}

@Override
public void onCancelled(DatabaseError databaseError) {
leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
mDatabaseReference.removeEventListener(this);
}
};
mDatabaseReference.addValueEventListener(listener);
leisureUpdateEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
mDatabaseReference.removeEventListener(listener);
}
});
}
}, Emitter.BackpressureMode.BUFFER);

关于android - 将 Firebase 实时数据监听器与 RxJava 相结合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36330776/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com