gpt4 book ai didi

java - 计算给定窗口的流统计信息

转载 作者:行者123 更新时间:2023-11-30 06:21:49 25 4
gpt4 key购买 nike

我有一个经常滴答的 KStream(想想秒),我想计算 24 小时窗口内的各种统计数据。例如,24 小时变化,即给定点与 24 小时前的点之间的价格差异。

我所需输入的输出是:

t1 -> t1c1
t2 -> t1c2
t3 -> t1c3

其中 t1 是输入股票代码,t1c1 是输入股票代码以及针对其之前的 24 小时窗口计算的附加统计数据。

我考虑过一些方法但没有奏效:* 按 24 小时大小和 1 秒跃点对我的股票流进行窗口化。

builder.stream(rawPriceTickerTopic, ...)
.groupByKey()
.windowedBy(
TimeWindows.of(TimeUnit.DAYS.toMillis(1))
.advanceBy(TimeUnit.SECONDS.toMillis(1))
.reduce((value1, value2) ->
value1.tickerWithStatsFrom(value2), ...)
.toStream();

但是,这会生成大量输出点,因为每个输入代码都会为其所属的每个窗口生成一个输出代码。

  • 保持某种时间序列存储的最新状态,从存储中获取 24 小时前的值,并据此计算我的统计数据,但这似乎违背了流的观点。

最佳答案

我的最终解决方案是放弃窗口并简单地聚合我的股票,在聚合器中维护我自己的 24 小时窗口。这仍然不是最好的方法,而且有一种挥之不去的感觉,我可以用 Kafka 内置的窗口概念来解决它。

如上所述,我在聚合器中使用简单聚合:

streamBuilder.stream(tickerTopic, Consumed.with(...)
.groupByKey()
.aggregate(MyAggregator::new,
(key, value, aggregate) -> aggregate.addTicker(value),
Materialized.with(...)
.toStream()

结果是,对于原始股票流中的每条记录,我都会在输出流中获得一个聚合值。我的聚合器逻辑很简单:

  • 将新的代码添加到已排序的集合中。
  • 丢弃所有比这个新的最新股票代码早 24 小时以上的股票代码。
  • 计算新的 24 小时变化。

(此技术可用于给定窗口上的任何类型的计算,例如移动平均值。)

聚合器的示例代码:

public class MyAggregator {

private BigDecimal change;

private TreeSet<Ticker> orderedTickers = new TreeSet<>(MyAggregator::tickerTimeComparator);

public MyAggregator () {
this.windowMilis = 86400000;
}

public MyAggregator addTicker(Ticker ticker) {
orderedTickers.add(ticker);
cleanOldTickers();
change = getLatest().getAsk().subtract(getEarliest().getAsk());
return this;
}

public BigDecimal getChange() {
return change;
}

public Ticker getEarliest() {
return orderedTickers.first();
}

public Ticker getLatest() {
return orderedTickers.last();
}

private void cleanOldTickers() {
Date endOfWindow = latestWindow();

Iterator<Ticker> iterator = orderedTickers.iterator();
while(iterator.hasNext()) {
Ticker next = iterator.next();
if (next.getTimestamp().before(endOfWindow)) {
iterator.remove();
}
// The collection is sorted by time so if we get here we can break.
break;
}
}

private Date latestWindow() {
return new Date(getLatest().getTimestamp().getTime() - windowMilis);
}

private static int tickerTimeComparator(Ticker t1, Ticker t2) {
return t1.getTimestamp().compareTo(t2.getTimestamp());
}

}

关于java - 计算给定窗口的流统计信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47990151/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com