gpt4 book ai didi

java - 正确终止卡住的 Couchbase Observable

转载 作者:行者123 更新时间:2023-11-30 23:47:39 25 4
gpt4 key购买 nike

我正在尝试根据某些约束以快速方式删除一批 couchbase 文档(或者如果不满足约束则更新文档)。根据我的术语,每次删除都被称为一个“包裹”。

执行时,我遇到了一个非常奇怪的行为 - 负责此任务的线程开始按预期进行几次迭代(最多)。在这个“宽限期”之后,couchbase 会“卡住”并且 Observable 不会调用它的任何 Subscriber 方法(onNextonComplete, onError) 在定义的 30 秒内。

latch 超时发生时(见下面的实现),该方法返回但 Observable 继续执行(我注意到当它停止时继续打印调试消息此方法范围之外的断点)。我怀疑 couchbase 被卡住了,因为几秒钟后,许多 Observable 处于某种“幽灵”状态 - 活着并向它们的 Subscriber 报告,这反过来无事可做,因为创建它们的方法已经完成,最终导致 java.lang.OutOfMemoryError: GC overhead limit exceeded

我不知道我在这里声明的内容是否合理,但我想不出造成这种行为的其他原因。我应该如何在超时时正确终止 Observable?我是不是该?还有别的办法吗?

public List<InfoParcel> upsertParcels(final Collection<InfoParcel> parcels) {
final CountDownLatch latch = new CountDownLatch(parcels.size());

final List<JsonDocument> docRetList = new LinkedList<JsonDocument>();
Observable<JsonDocument> obs = Observable
.from(parcels)
.flatMap(parcel ->
Observable.defer(() ->
{
return bucket.async().get(parcel.key).firstOrDefault(null);
})
.map(doc -> {
// In-memory manipulation of the document
return updateDocs(doc, parcel);
})
.flatMap(doc -> {
boolean shouldDelete = ... // Decide by inner logic
if (shouldDelete) {
if (doc.cas() == 0) {
return Observable.just(doc);
}
return bucket.async().remove(doc);
}
return (doc.cas() == 0 ? bucket.async().insert(doc) : bucket.async().replace(doc));
})
);

obs.subscribe(new Subscriber<JsonDocument>() {
@Override
public void onNext(JsonDocument doc) {
docRetList.add(doc);
latch.countDown();
}

@Override
public void onCompleted() {
// Due to a bug in RxJava, onError() / retryWhen() does not intercept exceptions thrown from within the map/flatMap methods.
// Therefore, we need to recalculate the "conflicted" parcels and send them for update again.
while(latch.getCount() > 0) {
latch.countDown();
}
}

@Override
public void onError(Throwable e) {
// Same reason as above
while (latch.getCount() > 0) {
latch.countDown();
}
}
};
);

latch.await(30, TimeUnit.SECONDS);

// Recalculating remaining failed parcels and returning them for another cycle of this method (there's a loop outside)
}

最佳答案

我认为这确实是因为使用倒计时锁存器不会向源发出数据处理流应该停止的信号。

您可以使用更多的 rxjava,通过使用 toList().timeout(30, TimeUnit.SECONDS).toBlocking().single() 而不是收集(不同步因此不安全) 外部列表和使用 countdownLatch。

这将阻塞,直到返回您的文档列表。

关于java - 正确终止卡住的 Couchbase Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34482448/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com