gpt4 book ai didi

java - 使用 RxJava 缓冲分组项目

转载 作者:行者123 更新时间:2023-11-30 12:05:27 24 4
gpt4 key购买 nike

我正在尝试使用 RxJava 实现流处理。我想处理不同的步骤。

  • 我从推送传感器数据的发布主题开始
  • 我想按传感器类型(标识符)对这些传感器数据进行分组
  • 我想为每个组缓冲这些传感器数据
  • 当缓冲区已满或超时时,我想计算该组中所有传感器值的平均值
  • 最后我想将所有这些组重新加入到一个输出流中

到目前为止,对于下面的代码示例,我有一个为所有传感器数据共享的缓冲区。我不明白如何为每个组创建缓冲区然后进行计算。由于我是 RxJava 的新手,我不理解所有的概念并且我被我的问题困住了。

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class Main {

private static final int SENSOR_TEMPERATURE = 1;
private static final int SENSOR_HUMIDITY = 2;

private PublishSubject<Sensor> publishSubject = PublishSubject.create();

static class Sensor {
int type;
float value;

Sensor(int type, float value) {
this.type = type;
this.value = value;
}
}

private PublishSubject<Sensor> listenSensors() {
return publishSubject;
}

private static Sensor getValueAverage(List<Sensor> sensors) {
int count = sensors.size();
float total = sensors.stream().map(sensor -> sensor.value).reduce(Float::sum).orElse(0f);
float avg = total / count;
return new Sensor(sensors.get(0).type, avg);
}

//Map type
private static String getStringType(int type) {
if (type == SENSOR_HUMIDITY) {
return "HUMIDITY";
}
else if (type == SENSOR_TEMPERATURE) {
return "TEMPERATURE";
}
return "OTHER";
}

private static void emitRandomValue(PublishSubject<Sensor> sensorPublishSubject) throws InterruptedException {

new Thread(() -> {
int randomDelay = 0;

while (true) {
int randomType = (int) ((Math.random() * 10 % 2) + 1);
randomDelay = (int) (Math.random() * 3000);
float randomValue = (float) (Math.random() * 100);
System.out.println("EMIT: " + getStringType(randomType) + " " + randomValue);
sensorPublishSubject.onNext(new Sensor(randomType, randomValue));
try {
Thread.sleep(randomDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}).start();
}
static Observable<List<Sensor>> flatpMapSensor(List<Sensor> sensors) {
return Observable
.fromIterable(sensors)
.groupBy(s -> s.type)
.flatMapSingle(Observable::toList);
}

// Testing code
static public void main(String args[]) throws InterruptedException {
Main main = new Main();
main.listenSensors()
.publish(p -> p
.buffer(20, TimeUnit.SECONDS, 10)
.filter(list -> !list.isEmpty()))
.flatMap(Main::flatpMapSensor)
.map(Main::getValueAverage)
.subscribe(sensor -> System.out.println("AVG " + getStringType(sensor.type) + " " + sensor.value));
emitRandomValue(main.publishSubject);

Thread.sleep(90000);

}
}

所以我的问题是:如何为每种传感器类型设置单独的缓冲区?

最佳答案

如果您移动 buffer()groupBy() 调用会怎样?

static public void main(String args[]) throws InterruptedException {
Main main = new Main();
main.listenSensors()
.groupBy(s -> s.type) // group by type
.flatMap(l -> l.buffer(20, SECONDS, 10).map(Main::getValueAverage)) // buffer groups by type and compute the average
.subscribe(sensor -> System.out.println("AVG " + getStringType(sensor.type) + " " + sensor.value));
emitRandomValue(main.publishSubject);

Thread.sleep(90000);
}

关于java - 使用 RxJava 缓冲分组项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56288622/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com