gpt4 book ai didi

java - 使用 Flink DataStream 计算窗口持续时间的平均值

转载 作者:搜寻专家 更新时间:2023-10-31 20:09:31 28 4
gpt4 key购买 nike

我在有机架可用的地方使用 Flink DataStream API,我想通过机架 ID 计算温度组的“平均值”。我的窗口持续时间是 40 秒,我的窗口每 10 秒滑动一次......下面是我的代码,我每 10 秒为每个 rackID 计算温度的 sum,但现在我想计算 < strong>平均温度::

static Properties properties=new Properties();
public static Properties getProperties()
{
properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
//properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
//properties.setProperty("group.id", "akshay");
properties.setProperty("auto.offset.reset", "earliest");
return properties;
}

@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props=Program.getProperties();
DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
env.execute("Temperature Consumer");
}

如何计算上述示例的平均温度??

最佳答案

据我所知,您需要自己编写平均函数。你可以在这里找到一个例子:

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java

在你的情况下,你可能会更换.sum("温度");

类似的东西.apply(new Avg());并实现 Avg 类:

public class Avg implements WindowFunction<TemperatureEvent,  TemperatureEvent, Long, org.apache.flink.streaming.api.windowing.windows.Window> {

@Override
public void apply(Long key, Window window, Iterable<TemperatureEvent> values, Collector<TemperatureEvent> out) {
long sum = 0L;
int count = 0;
for (TemperatureEvent value : values) {
sum += value.getTemperature();
count ++;
}

TemperatureEvent result = values.iterator().next();
result.setTemperature(sum / count);
out.collect(result);
}
}

注意:如果您的函数有可能在空窗口中被调用(例如,通过使用自定义触发器),您需要在访问 elements.head 之前进行检查

关于java - 使用 Flink DataStream 计算窗口持续时间的平均值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37946673/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com