作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有无穷无尽的事件流,我想:
我的代码如下所示:
Flux<SuchEvent> suchFlux = Flux.fromStream(events);
Scheduler parallel = Schedulers.newParallel("asd", 64);
suchFlux
.bufferTimeout(10000, Duration.ofMillis(100))
.map(rawEvents -> {
Map<UUID, List<SuchEvent>> groupedEvents = new HashMap<>();
for (SuchEvent stuff : rawEvents) {
if (!groupedEvents.containsKey(stuff.getProfileId())) {
groupedEvents.put(stuff.getProfileId(), new ArrayList<>());
}
groupedEvents.get(stuff.getProfileId()).add(stuff);
}
return groupedEvents.values();
})
.subscribe(groupedEvents -> {
for (List<SuchEvent> suchEvents : groupedEvents) {
Flux.fromIterable(suchEvents)
.subscribeOn(parallel)
.subscribe(suchEvent -> {
//do stuff (this is fairly slow, each call takes 50ms)
});
}
});
我希望内部 subscribe(suchEvent) 会对生产造成一些背压,但是,在运行一段时间后,一切似乎都停止了。我逻辑的谬误在哪里?
最佳答案
当您使用 lambda 进行订阅时,它会触发一个无限请求。要微调背压,您需要实现自己的 Subscriber .
推荐的解决方案是从 BaseSubscriber 扩展.
在 react 器文档中,他们提供了有关它的有用信息:
The bare minimum implementation is to implement both hookOnSubscribe(Subscription subscription) and hookOnNext(T value).
所以你可以这样做:
public static class SubscriberWithBackPressure<T> extends BaseSubscriber<T> {
private final int maxRequest;
private final Consumer<T> consumer;
public SubscriberWithBackPressure(int maxRequest, Consumer<T> consumer) {
this.maxRequest = maxRequest;
this.consumer = consumer;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(maxRequest);
}
@Override
protected void hookOnNext(T value) {
if (consumer != null) {
this.consumer.accept(value);
}
request(maxRequest);
}
}
像这样使用它:
suchFlux
.bufferTimeout(10000, Duration.ofMillis(100))
.map(rawEvents -> {
Map<UUID, List<SuchEvent>> groupedEvents = new HashMap<>();
for (SuchEvent stuff : rawEvents) {
if (!groupedEvents.containsKey(stuff.getProfileId())) {
groupedEvents.put(stuff.getProfileId(), new ArrayList<>());
}
groupedEvents.get(stuff.getProfileId()).add(stuff);
}
return groupedEvents.values();
})
.subscribe(groupedEvents -> {
for (List<SuchEvent> suchEvents : groupedEvents) {
Flux.fromIterable(suchEvents)
.subscribeOn(parallel)
.subscribe(new SubscriberWithBackPressure<>(100, suchEvent -> /*do stuff*/));
}
});
更新版本(不确定在每个转换级别处理背压):
suchFlux.bufferTimeout(10000, Duration.ofMillis(100))
.map(events -> events.stream().collect(Collectors.groupingBy(SuchEvent::getProfileId)))
.flatMap(group -> Flux.fromStream(group.values().stream()))
.flatMap(suchEvents -> Flux.fromIterable(suchEvents))
.subscribeOn(parallel)
.subscribe(new SubscriberWithBackPressure<>(100, suchEvent -> System.out.println(suchEvent)));
关于java - 未施加助焊剂背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47416317/
我是一名优秀的程序员,十分优秀!