gpt4 book ai didi

java - 具有太多 Observable 的 RxJava1 StackOverflow 异常

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:52:36 25 4
gpt4 key购买 nike

我正在使用 rxjava1 开发一个项目,我有一个 Observable 链,偶尔会包含数千个合并或连接在一起的 observable。当发生这种情况时,将发生 StackOverflow 异常,我们将得到如下信息:

java.lang.StackOverflowError
at java.util.HashMap.putVal(HashMap.java:631)
at java.util.HashMap.put(HashMap.java:612)
at rx.internal.operators.OnSubscribeToMap$ToMapSubscriber.onNext(OnSubscribeToMap.java:127)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

堆栈跟踪将持续数百行。我看到的唯一相关帖子是 github 中的这个问题:https://github.com/ReactiveX/RxJava/issues/3035 .但是,将可观察对象添加到列表中的建议解决方案是我们已经使用过的,但行不通。

我能做些什么来防止这些 StackOverflow 异常?我是否需要进行某种节流或背压?

这是当前代码的样子并导致计算器溢出的示例:

public Observable<Map<String, JsonObject>> extractTopLevelSummariesFromForms(JsonArray summaries, Func2<String, String, Observable<JsonObject>> summaryGatherer) {
List<Observable<JsonObject>> summaryObservables = new LinkedList<>();
summaries.stream()
.map(JsonUtil::safeJsonObject)
.filter(summary -> StringUtils.isNotEmpty(summary.getString(NAME))|| StringUtils.isNotEmpty(summary.getString(Form.TITLE)))
.forEach(summary -> {
if (StringUtils.isNotEmpty(summary.getString(TEXT)))
summaryObservables.add(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".hidden",
summary.getString(VALUE), summaryGatherer));
if (StringUtils.isNotEmpty(summary.getString(Form.TEXT)))
summaryObservables.add(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".title",
summary.getString(Summary.VALUE), summaryGatherer, true));
});
return Observable.merge(Observable.from(summaryObservables))
.filter(summaryResult -> summaryResult != null)
.toMap(summaryResult -> summaryResult.getString(KEY), summaryResult -> summaryResult.getJsonObject(TEXT));
}

private Observable<JsonObject> gatherSummariesFromElement(String parentName, String parentType, String elementName, String summaryValue, Func2<String, String, Observable<JsonObject>> summaryGatherer, Set<String> visited, boolean isList) {
if (visited.contains(elementName))
return Observable.just(null);
visited.add(elementName);

Map<String, JsonObject> summariesMap = new HashMap<>();

summariesMap.put(elementName, new JsonObject().put(Summary.SummaryValues.FORM, form).put(SUMMARY_TYPE, parentType));
Set<String> variables = TextEngine.getVariables(summariesMap);

Observable<JsonObject> elementSummaryObservable = Observable.just(getSummaryEntry(elementName, form, parentType, isList));

if (variables != null && !variables.isEmpty()) {
elementSummaryObservable = elementSummaryObservable.mergeWith(Observable.from(variables).flatMap(variable -> {
if (StringUtils.contains(variable, ".") && StringUtils.equals(parentName, StringUtils.split(variable, ".")[0]))
return Observable.just(null);
else
return summaryGatherer.call(parentName, variable).flatMap(variableEntry -> {
if (variableEntry == null)
return Observable.just(null);
else
return gatherSummariesFromElement(parentName, variableEntry.getString(SOURCE_TYPE), variable, variableEntry.getString(FORM), summaryGatherer, visited, variableEntry.getBoolean(Summary.SummaryValues.IS_LIST, false));
});
}));
}
return elementSummaryObservable;
}

我已经尝试在 Schedulers.computation() 调度程序中运行除网络请求之外的所有内容,那些在 Schedulers.io() 调度程序中运行,我是仍然得到计算器溢出:

Exception in thread "pool-26-thread-2" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

最佳答案

  1. 您对 gatherSummariesFromElement 进行了递归调用,这可能会进行得非常深入
  2. Observable.merge(Observable.from(summaryObservables)) 调用看起来很奇怪,你应该只使用 Observable.merge(summaryObservables)
  3. 您可以使用 Observable.empty()
  4. 而不是使用 Observable.just(null) 然后过滤空值
  5. summaryObservables 的构建看起来有点矫枉过正。您可以构建有效摘要列表而不是然后在 flatMap 中处理它们
  6. gatherSummariesFromElement 中的递归 Observable 创建替换为元素创建的递归列表,然后从该列表创建 observable

_

public Observable<Map<String, JsonObject>> extractTopLevelSummariesFromForms(JsonArray summaries, Func2<String, String, JsonObject> summaryGatherer) {
List<JsonObject> validSummaries = new LinkedList<>();
summaries.stream()
.map(JsonUtil::safeJsonObject)
.filter(summary -> StringUtils.isNotEmpty(summary.getString(NAME)) || StringUtils.isNotEmpty(summary.getString(Form.TITLE)))
.forEach(validSummaries::add);
Set<String> visited = new HashSet<>();
return Observable.from(validSummaries)
.flatMap(summary -> {
if (StringUtils.isNotEmpty(summary.getString(TEXT)))
Observable.from(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".hidden",
summary.getString(VALUE), visited, summaryGatherer)));
if (StringUtils.isNotEmpty(summary.getString(Form.TEXT)))
Observable.from(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".title",
summary.getString(Summary.VALUE), summaryGatherer, visited,true)));
})
.toMap(summaryResult -> summaryResult.getString(KEY), summaryResult -> summaryResult.getJsonObject(TEXT));
}

private List<JsonObject> gatherSummariesFromElement(String parentName, String parentType, String elementName, String summaryValue, Func2<String, String, JsonObject> summaryGatherer, Set<String> visited, boolean isList) {
if (visited.contains(elementName))
return Collections.emptyList();
visited.add(elementName);
List<JsonObject> result = new ArrayList<>()

Map<String, JsonObject> summariesMap = new HashMap<>();

summariesMap.put(elementName, new JsonObject().put(Summary.SummaryValues.FORM, form).put(SUMMARY_TYPE, parentType));
Set<String> variables = TextEngine.getVariables(summariesMap);

result.add(getSummaryEntry(elementName, form, parentType, isList));

if (variables != null && !variables.isEmpty()) {
for (String variable : variables) {
if (StringUtils.contains(variable, ".") && StringUtils.equals(parentName, StringUtils.split(variable, ".")[0])) {
// do nothing
} else {
JsonObject variableEntry = summaryGatherer.call(parentName, variable)
if (variableEntry != null) {
result.addAll(gatherSummariesFromElement(parentName, variableEntry.getString(SOURCE_TYPE), variable, variableEntry.getString(FORM), summaryGatherer, visited, variableEntry.getBoolean(Summary.SummaryValues.IS_LIST, false));
}
}
}
}
return result;
}

关于java - 具有太多 Observable 的 RxJava1 StackOverflow 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50753349/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com