gpt4 book ai didi

quarkus - 重新启动 Reactive Messaging,例如重新配置后

转载 作者:行者123 更新时间:2023-12-04 04:07:49 28 4
gpt4 key购买 nike

如何重新启动或停止/恢复响应式(Reactive)消息传递,例如更改间隔时间后?此示例来自 Quarkus 指南:https://quarkus.io/guides/kafka-streams

@Outgoing("temperature-values")                             
public Flowable<KafkaRecord<Integer, String>> generate() {

return Flowable.interval(500, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.map(tick -> {
WeatherStation station = stations.get(random.nextInt(stations.size()));
double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
.setScale(1, RoundingMode.HALF_UP)
.doubleValue();

LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
});
}

最佳答案

您可以尝试将 Flowable 替换为 Subject 作为选项,并使用 Flowable 将值提供给 Subject本身。然后,当你想替换任何你需要的东西时,你将删除当前的 Flowable 并创建新的,它将提供给 Subject

class YourClass {

private Subject<KafkaRecord<Integer, String>> temperatureSubject = BehaviorSubject.create();
private Disposable currentSubscription;

public void setFlowable() {
if(currentSubscription != null && !currentSubscription.isDisposed()) {
currentSubscription.dispose();
}
currentSubscription = Flowable.interval(5, TimeUnit.SECONDS)
.map(it -> {
WeatherStation station = stations.get(random.nextInt(stations.size()));
double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
.setScale(1, RoundingMode.HALF_UP)
.doubleValue();

LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
}).subscribe(it -> {
temperatureSubject.onNext(it);
});
}

@Outgoing("temperature-values")
public Flowable<KafkaRecord<Integer, String>> generate() {
return temperatureSubject.toFlowable(BackpressureStrategy.LATEST);
}
}

关于quarkus - 重新启动 Reactive Messaging,例如重新配置后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62236650/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com