gpt4 book ai didi

java - 未施加助焊剂背压

转载 作者:行者123 更新时间:2023-11-29 04:23:56 26 4
gpt4 key购买 nike

我有无穷无尽的事件流,我想:

  1. 考虑微批处理,每个 100 毫秒,或 10000 个元素
  2. 根据分组函数对它们进行分组
  3. 并行处理每个分组项目列表

我的代码如下所示:

Flux<SuchEvent> suchFlux = Flux.fromStream(events);
Scheduler parallel = Schedulers.newParallel("asd", 64);
suchFlux
.bufferTimeout(10000, Duration.ofMillis(100))
.map(rawEvents -> {
Map<UUID, List<SuchEvent>> groupedEvents = new HashMap<>();
for (SuchEvent stuff : rawEvents) {
if (!groupedEvents.containsKey(stuff.getProfileId())) {
groupedEvents.put(stuff.getProfileId(), new ArrayList<>());
}
groupedEvents.get(stuff.getProfileId()).add(stuff);
}
return groupedEvents.values();
})
.subscribe(groupedEvents -> {
for (List<SuchEvent> suchEvents : groupedEvents) {
Flux.fromIterable(suchEvents)
.subscribeOn(parallel)
.subscribe(suchEvent -> {
//do stuff (this is fairly slow, each call takes 50ms)
});
}
});

我希望内部 subscribe(suchEvent) 会对生产造成一些背压,但是,在运行一段时间后,一切似乎都停止了。我逻辑的谬误在哪里?

最佳答案

当您使用 lambda 进行订阅时,它会触发一个无限请求。要微调背压,您需要实现自己的 Subscriber .

推荐的解决方案是从 BaseSubscriber 扩展.

在 react 器文档中,他们提供了有关它的有用信息:

The bare minimum implementation is to implement both hookOnSubscribe(Subscription subscription) and hookOnNext(T value).

所以你可以这样做:

public static class SubscriberWithBackPressure<T> extends BaseSubscriber<T> {
private final int maxRequest;
private final Consumer<T> consumer;

public SubscriberWithBackPressure(int maxRequest, Consumer<T> consumer) {
this.maxRequest = maxRequest;
this.consumer = consumer;
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(maxRequest);
}

@Override
protected void hookOnNext(T value) {
if (consumer != null) {
this.consumer.accept(value);
}
request(maxRequest);
}
}

像这样使用它:

suchFlux
.bufferTimeout(10000, Duration.ofMillis(100))
.map(rawEvents -> {
Map<UUID, List<SuchEvent>> groupedEvents = new HashMap<>();
for (SuchEvent stuff : rawEvents) {
if (!groupedEvents.containsKey(stuff.getProfileId())) {
groupedEvents.put(stuff.getProfileId(), new ArrayList<>());
}
groupedEvents.get(stuff.getProfileId()).add(stuff);
}
return groupedEvents.values();
})
.subscribe(groupedEvents -> {
for (List<SuchEvent> suchEvents : groupedEvents) {
Flux.fromIterable(suchEvents)
.subscribeOn(parallel)
.subscribe(new SubscriberWithBackPressure<>(100, suchEvent -> /*do stuff*/));
}
});

更新版本(不确定在每个转换级别处理背压):

suchFlux.bufferTimeout(10000, Duration.ofMillis(100))
.map(events -> events.stream().collect(Collectors.groupingBy(SuchEvent::getProfileId)))
.flatMap(group -> Flux.fromStream(group.values().stream()))
.flatMap(suchEvents -> Flux.fromIterable(suchEvents))
.subscribeOn(parallel)
.subscribe(new SubscriberWithBackPressure<>(100, suchEvent -> System.out.println(suchEvent)));

关于java - 未施加助焊剂背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47416317/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com