- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我对以下拓扑的行为方式有一些问题:
String topic = config.topic();
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
.peek((k, v) -> L.info("Event:"+v.action))
// join the event with the according entry in the KTable and apply the state mutation
.leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
.peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
leftJoin
完成的然后由
to
写的方法。如果使用相同的键同时收到事件 1 和 2,我可能会发生以下情况:
event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic
event1
的更改。 ,所以我丢失了数据。
Processing:...
部分是从值连接器内部记录的):
Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1
Event1
可以认为是创建事件:它将在KTable中创建条目,因此状态是否为空都没有关系。
Event2
虽然需要将其更改应用于现有状态,但没有找到任何更改,因为第一个状态更改仍未写入 KTable(它仍未被
to
方法处理)
Transformer
找到解决方案.
public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {
private final String stateName;
private final ValueJoiner<V1, V2, V2> joiner;
private final boolean updateState;
private KeyValueStore<K, V2> state;
public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
this.stateName = stateName;
this.joiner = joiner;
this.updateState = updateState;
}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
}
@Override
public KeyValue<K, V2> transform(K key, V1 value) {
V2 stateValue = this.state.get(key); // Get current state
V2 updatedValue = joiner.apply(value, stateValue); // Apply join
if (updateState) {
this.state.put(key, updatedValue); // write new state
}
return new KeyValue<>(key, updatedValue);
}
@Override
public KeyValue<K, V2> punctuate(long timestamp) {
return null;
}
@Override
public void close() {}
}
String topic = config.topic();
String store = topic + "-store";
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
// join the event with the according entry in the KTable and apply the state mutation
.transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
put
直接在其中应用更改。方法事件应该总是选择更新的状态。
最佳答案
一个 KTable
分片到多个物理存储中,每个存储仅由单个线程更新。因此,您描述的场景不会发生。如果您有 2 条具有相同时间戳的记录,它们都更新同一个分片,则它们将一个接一个地进行处理(按偏移顺序)。因此,第二次更新将看到第一次更新后的状态。
所以也许你只是没有正确描述你的场景?
更新
进行连接时不能改变状态。因此,期望
event1 joins with state A => state A mutated to state X
event1
加入
state A
,它将访问
state A
处于只读模式和
state A
不会被修改。
event2
加入,它将看到与
event1
相同的状态.对于流表连接,表状态仅在从表输入主题读取新数据时更新。
transform()
构建自定义解决方案。 :
builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");
Transformer
之间不会出现竞争条件。适用于状态并记录
Transformer
状态更新后的处理。这部分将在单个线程中执行,并且记录将按照输入主题的偏移顺序进行处理。因此,可以确保状态更新可用于以后的记录。
关于apache-kafka-streams - KStream-KTable join 写入 KTable : How to sync the join with the ktable write?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46220663/
我正在尝试在 KStream-KStream 之间执行内部连接。我观察到当来自两个 KStreams 的消息都具有复合键(例如具有许多属性的 java pojo)时,连接不起作用,即使使用了 pojo
在 @StreamListener 的处理方法中,我将 school KStream 映射到 person KStream,并通过 .through() 方法填充主题“person”,我从中生成一个
我想用 KStream 接口(interface)批处理消息。 我有一个带有键/值的流 我试图在一个翻滚的窗口中收集它们,然后我想立即处理整个窗口。 builder.stream(longSerde,
希望有人知道这一点或者能给我指出正确的方向...... 我有一个通过 API REST 请求创建的数据主题。 REST 请求中收到的字段之一是记录 EventTime 的时间戳。这些记录将生成到 Ka
我遇到了基于KStreams的应用程序的问题:它将运行一次,当我停止/重新启动时,它会“卡住”并且不会再继续,直到我删除它所具有的各种主题创建的。这种情况不会每次发生,但往往会发生。 通常,当我将新版
我正在使用 KStream.to("outputtopic"); 编写输出主题在 apache 文档中提到它将自动创建传递给 to() 的主题。如何使用来自该主题的消息? 我可以使用 consumer
我的流是键/值对,我想将其作为“原始”并通过 60 秒聚合保存到数据库中。最初我是这样做的: ->foreach
我想在 Kafka 中使用 Java KStream 来过滤掉所有超过特定值的值。值以 JSON 格式交换,例如: ConsumerRecord(topic=u'test', partition=0,
由于我的基于 KStream 的应用程序不遵循传统的 Kafka 消费者路线,我应该如何跟踪消费者延迟?通常我会使用 ConsumerOffsetChecker (或类似的东西)但它需要一个消费者组名
我正在使用 Kafka Streams 开发 PoC。现在我需要获取流消费者中的偏移值,并使用它为每条消息生成一个唯一的键 (topic-offset)->hash。原因是:生产者是系统日志,只有少数
我创建了一个 Kafka 主题并向其推送了一条消息。 所以 bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic
我想创建一个基于 Kafka 流的应用程序,它处理一个主题并以大小为 X(即 50)的批处理获取消息,但如果流的流量较低,则在 Y 秒内(即5). 因此,我没有一条一条地处理消息,而是处理一个 Lis
有没有一种方法可以进行类似分支的操作,但将记录放在谓词计算结果为真的每个输出流中? Brach 将记录放入第一个匹配项(文档:在第一个匹配项中,一条记录被放置到一个且只有一个输出流中)。 最佳答案
我正在使用 Spring 云流,并且想稍微摆弄一下 KStreams/KTables。 我正在寻找从标准 Kafka 主题将其转变为流的方法。 我已经在 KSQL 中完成了此操作,但我试图弄清楚是否有
有没有办法计算 Java/scala 应用程序中 KTable/KStream 随着时间的推移大约将使用多少堆(或任何其他)内存? 我有一些具体的假设,我想知道它们是否正确: Kafka 流仅使用内部
有没有办法进行类似分支的操作,但将记录放置在谓词评估为 true 的每个输出流中? Brach 将记录放置到第一个匹配项(文档:在第一个匹配项上将记录放置到一个且仅一个输出流)。 最佳答案 您可以
似乎我的基于 KStream 的应用程序已经堆积了许多 GB 的文件(.sst、Log.old. 等)。 这些会自行清理还是我需要留意?设置一些参数来剔除它们? 最佳答案 关于这些本地/临时文件:其中
我希望将来自 KStream 的窗口化批处理输出组合在一起,并将它们写入辅助存储。 我期待看到 .punctuate() 大约每 30 秒被调用一次。我得到的反而被保存了here . (原文件几千行)
我读了,但我不能理解太多。我读到我可以使用 KTable 而不是日志压缩。或者它有更多的功能。但是,我找不到这方面的好例子。我也无法在解释工作逻辑的好来源中看到它。你能解释一个 ktable 和 ks
我有以下拓扑: 创建状态存储 根据 SOME_CONDITION 过滤记录,将其值映射到新实体,最后将这些记录发布到另一个主题 STATIONS_LOW_CAPACITY_TOPIC 但是我在 STA
我是一名优秀的程序员,十分优秀!