gpt4 book ai didi

java - 弗林克 SQL : Use changelog stream to update rows in Dynamic Table

转载 作者:行者123 更新时间:2023-12-02 09:11:40 25 4
gpt4 key购买 nike

我有一个包含 JSON 消息的流,如下所示:

{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}

此流在 DataStream<Row> 中处理注册为TableSource .

我想使用这个流作为变更日志流来更新 Flink 表的内容,但我找不到方法来做到这一点。

我定义了一个StreamTableSource如:

public class MyTableSource implements StreamTableSource<Row>, ... {

@Override
public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream
.keyBy([SOME KEY]) // Aggregate by key
.map(new MyMapFunction()); // Map the update message with the correct encoding ?

return stream;
}

...
}

还有这个TableSource用于

public void process(final StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.registerTableSource("MyTableSource", new MyTableSource());

Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.

result.insertInto([SOME SINK]);
}

执行此操作的好方法是什么? (更具体地说,如何使用流从表中删除行?)

最佳答案

目前,内部变更日志处理功能未通过 API 公开。因此,没有可用的来源可以让您将传入的变更日志解释为表格。计划用于Flink 1.11 .

在那之前,您可以考虑使用用户定义的聚合函数来应用此处建议的更新:

Apache Flink: How to enable "upsert mode" for dynamic tables?

关于java - 弗林克 SQL : Use changelog stream to update rows in Dynamic Table,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59360243/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com