gpt4 book ai didi

apache-kafka - 去抖kafka事件

转载 作者:行者123 更新时间:2023-12-05 03:40:08 24 4
gpt4 key购买 nike

我正计划设置一个 MySQL 到 Kafka 的流程,最终目标是安排一个进程根据更改的数据重新计算 mongoDB 文档。

这可能涉及直接修补 mongoDB 文档,或运行将重新创建整个文档的进程。

我的问题是,如果对MySQL数据库的一组更改都与一个mongoDB文档相关,那么我不想实时重新运行每个更改的重新计算过程,我想等待对“解决”的更改,以便我仅根据需要运行重新计算过程。

有没有办法对 Kafka 流进行“反跳”?例如。是否有针对 Kafka 消费者的定义明确的模式,我可以使用它来实现我想要的逻辑?

最佳答案

目前没有简单的方法去抖动事件。

简而言之,问题在于 Kafka 不会根据“挂钟时间”进行操作。处理通常由传入事件(以及其中包含的数据)触发,而不是由系统时间等任意触发器触发。

我将介绍 Suppressed 和 SessionWindows 不起作用的原因、KIP-242 中建议的解决方案以及未经测试的解决方法。

抑制

Suppressed有一个 untilTimeLimit() 函数,但它不适合去抖动。

If another record for the same key arrives in the mean time, it replaces the first record in the buffer but does not re-start the timer.

session 窗口

我认为使用 SessionWindows.ofInactivityGapAndGrace()可能有用。

首先,我对输入 KStream 进行了分组、窗口化、聚合和抑制:

  val windowedData: KTable<Windowed<Key>, Value> = 
inputTopicKStream
.groupByKey()
.windowedBy(
SessionWindows.ofInactivityGapAndGrace(
WINDOW_INACTIVITY_DURATION,
WINDOW_INACTIVITY_DURATION,
)
)
.aggregate(...)
.suppress(
Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()
)
)

然后我聚合窗口,所以我可以有一个最终状态

  windowedData
.groupBy(...)
.reduce(
/* adder */
{ a, b -> a + b },
/* subtractor */
{ a, a -> a - a },
)

但是问题是 SessionWindows不会 在没有其他记录出现的情况下关闭。 Kafka 不会独立关闭窗口——它需要额外的记录来实现窗口可以关闭,并且 suppress() 可以转发一条新记录。

这在 Confluent 的博客中有说明 https://www.confluent.io/de-de/blog/kafka-streams-take-on-watermarks-and-triggers/

[I]f you stop getting new records wall-clock time will continue to advance, but stream time will freeze. Wall-clock time advances because that little quartz watch in your computer keeps ticking away, but stream time only advances when you get new records. With no new records, stream time is frozen.

KIP-424

KIP-424提出了一项改进,允许 Suppress 充当去抖动器,但几年来没有任何进展。

解决方法

Andrey Bratus 在 KIP-424 的 JIRA 票证中提供了一个简单的解决方法,KAFKA-7748 .我试过了,但它没有编译——我认为自解决方法发布以来,Kafka API 已经发展。我已经更新了代码,但我还没有测试它。

import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

/**
* THIS PROCESSOR IS UNTESTED
* <br>
* This processor mirrors the source, but waits for an inactivity gap before forwarding records.
* <br>
* The suppression is key based. Newer values will replace previous values, and reset the inactivity
* gap.
*/
public class SuppressProcessor<K, V> implements Processor<K, V, K, V> {

private final String storeName;
private final Duration debounceCheckInterval;
private final long suppressTimeoutMillis;

private TimestampedKeyValueStore<K, V> stateStore;
private ProcessorContext<K, V> context;

/**
* @param storeName The name of the {@link TimestampedKeyValueStore} which will hold
* records while they are being debounced.
* @param suppressTimeout The duration of inactivity before records will be forwarded.
* @param debounceCheckInterval How regularly all records will be checked to see if they are
* eligible to be forwarded. The interval should be shorter than
* {@code suppressTimeout}.
*/
public SuppressProcessor(
String storeName,
Duration suppressTimeout,
Duration debounceCheckInterval
) {
this.storeName = storeName;
this.suppressTimeoutMillis = suppressTimeout.toMillis();
this.debounceCheckInterval = debounceCheckInterval;
}

@Override
public void init(ProcessorContext<K, V> context) {
this.context = context;

stateStore = context.getStateStore(storeName);

context.schedule(debounceCheckInterval, PunctuationType.WALL_CLOCK_TIME, this::punctuate);
}

@Override
public void process(Record<K, V> record) {

final var key = record.key();
final var value = record.value();

final var storedRecord = stateStore.get(key);

final var isNewRecord = storedRecord == null;

final var timestamp = isNewRecord ? System.currentTimeMillis() : storedRecord.timestamp();

stateStore.put(key, ValueAndTimestamp.make(value, timestamp));
}

private void punctuate(long timestamp) {
try (var iterator = stateStore.all()) {
while (iterator.hasNext()) {
KeyValue<K, ValueAndTimestamp<V>> storedRecord = iterator.next();
if (timestamp - storedRecord.value.timestamp() > suppressTimeoutMillis) {

final var record = new Record<>(
storedRecord.key,
storedRecord.value.value(),
storedRecord.value.timestamp()
);

context.forward(record);
stateStore.delete(storedRecord.key);
}
}
}
}
}

关于apache-kafka - 去抖kafka事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68273851/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com