gpt4 book ai didi

apache-kafka-streams - KStream-KTable join 写入 KTable : How to sync the join with the ktable write?

转载 作者:行者123 更新时间:2023-12-04 08:20:08 24 4
gpt4 key购买 nike

我对以下拓扑的行为方式有一些问题:

String topic = config.topic();

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
.peek((k, v) -> L.info("Event:"+v.action))
// join the event with the according entry in the KTable and apply the state mutation
.leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
.peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

当我同时收到不同的事件时,就会发生我的问题。因为我的状态突变是由 leftJoin 完成的然后由 to写的方法。如果使用相同的键同时收到事件 1 和 2,我可能会发生以下情况:
event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic

因此,状态 Y 没有来自 event1 的更改。 ,所以我丢失了数据。

这是我所看到的日志( Processing:... 部分是从值连接器内部记录的):
Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1
Event1可以认为是创建事件:它将在KTable中创建条目,因此状态是否为空都没有关系。 Event2虽然需要将其更改应用于现有状态,但没有找到任何更改,因为第一个状态更改仍未写入 KTable(它仍未被 to 方法处理)

有没有办法确保我的 leftJoin 和我对 ktable 的写入是原子完成的?

谢谢

更新和当前解决方案

感谢@Matthias 的回应,我能够使用 Transformer 找到解决方案.

代码如下所示:

那是变压器

public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {

private final String stateName;
private final ValueJoiner<V1, V2, V2> joiner;
private final boolean updateState;

private KeyValueStore<K, V2> state;

public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
this.stateName = stateName;
this.joiner = joiner;
this.updateState = updateState;
}

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
}

@Override
public KeyValue<K, V2> transform(K key, V1 value) {
V2 stateValue = this.state.get(key); // Get current state
V2 updatedValue = joiner.apply(value, stateValue); // Apply join
if (updateState) {
this.state.put(key, updatedValue); // write new state
}
return new KeyValue<>(key, updatedValue);
}

@Override
public KeyValue<K, V2> punctuate(long timestamp) {
return null;
}

@Override
public void close() {}
}

这是调整后的拓扑:

String topic = config.topic();
String store = topic + "-store";

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);

// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
// join the event with the according entry in the KTable and apply the state mutation
.transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

因为我们正在使用 KTable 的 KV StateStore 并通过 put 直接在其中应用更改。方法事件应该总是选择更新的状态。
我仍然想知道的一件事是:如果我有持续的高事件吞吐量怎么办。

我们对 KTable 的 KV 存储所做的 put 与在 KTable 的主题中完成的写入之间是否仍然存在竞争条件?

最佳答案

一个 KTable分片到多个物理存储中,每个存储仅由单个线程更新。因此,您描述的场景不会发生。如果您有 2 条具有相同时间戳的记录,它们都更新同一个分片,则它们将一个接一个地进行处理(按偏移顺序)。因此,第二次更新将看到第一次更新后的状态。

所以也许你只是没有正确描述你的场景?

更新

进行连接时不能改变状态。因此,期望

event1 joins with state A => state A mutated to state X

是错的。独立于任何处理顺序,当 event1加入 state A ,它将访问 state A处于只读模式和 state A不会被修改。

因此,当 event2加入,它将看到与 event1 相同的状态.对于流表连接,表状态仅在从表输入主题读取新数据时更新。

如果您希望从两个输入更新共享状态,则需要使用 transform() 构建自定义解决方案。 :

builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");

这将创建一个由两个处理器共享的存储,并且两者都可以根据需要进行读/写。因此,对于表输入,您可以只更新状态而不向下游发送任何内容,而对于流输入,您可以进行连接、更新状态并向下游发送结果。

更新 2

关于解决方案,更新 Transformer 之间不会出现竞争条件。适用于状态并记录 Transformer状态更新后的处理。这部分将在单个线程中执行,并且记录将按照输入主题的偏移顺序进行处理。因此,可以确保状态更新可用于以后的记录。

关于apache-kafka-streams - KStream-KTable join 写入 KTable : How to sync the join with the ktable write?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46220663/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com