- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个包含 JSON 消息的流,如下所示:
{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}
此流在 DataStream<Row>
中处理注册为TableSource
.
我想使用这个流作为变更日志流来更新 Flink 表的内容,但我找不到方法来做到这一点。
我定义了一个StreamTableSource
如:
public class MyTableSource implements StreamTableSource<Row>, ... {
@Override
public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream
.keyBy([SOME KEY]) // Aggregate by key
.map(new MyMapFunction()); // Map the update message with the correct encoding ?
return stream;
}
...
}
还有这个TableSource
用于
public void process(final StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerTableSource("MyTableSource", new MyTableSource());
Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.
result.insertInto([SOME SINK]);
}
执行此操作的好方法是什么? (更具体地说,如何使用流从表中删除行?)
最佳答案
目前,内部变更日志处理功能未通过 API 公开。因此,没有可用的来源可以让您将传入的变更日志解释为表格。计划用于Flink 1.11 .
在那之前,您可以考虑使用用户定义的聚合函数来应用此处建议的更新:
Apache Flink: How to enable "upsert mode" for dynamic tables?
关于java - 弗林克 SQL : Use changelog stream to update rows in Dynamic Table,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59360243/
首先我要说的是,我主要是一名 iOS 开发人员,最近才开始探索全栈开发的惊心动魄的冒险。我是 PHP 和 MySQL 的新手。话虽如此,我有一个问题,我希望需要一个直接的解决方案。 我正在为一个拥有预
我的表单上有 GridEx 对象并且... 我想用 for...next 循环在其中添加一些项目。实际上我找不到任何方法来添加带有自定义数据的新行。 我想在那个 GridEx 对象中选择一个特定的行。
我有以下数据框 df1 = DataFrame([['OBJ1', 10, 'BX', 'pool1', 'OBJ2'],['OBJ2', 0, '', '', 'OBJ1'],['OBJ3', 1
所以我有以下成员(member)历史表 User_ID | Start date | End Date | Type(0-7) | ---------------------------
我不擅长sql。我查看了 stackoverflow,但似乎对我没有任何用处。所以,我正在寻求帮助。是否可以根据行中的第一个字段将 2 行合并为 1 行。 我在 mysql 中执行此操作。我将展示示例
我了解 flex sm、md、lg 列的概念,但不了解应用于行的概念。弹性行有什么作用? sm、md、lg 尺寸应用于弹性行时意味着什么? 最佳答案 简答 - .row只是网格的容器 col .然而,
我遇到麻烦的地方 我相信我需要使用 COUNT;但是,我不知道如何将一行与同一列中的每一行进行比较,然后计算有多少行比相应行少/便宜。提前谢谢您! 这是我试图解决的官方问题: “使用示例架构,编写一条
我有以下 3 个相关表 Schools Departments Classes --------------- ------------------ --
我有代码: g, g_err = data[:, 4:6].T 我不知道[:, 4:6]的含义 尤其是第一个: .T 表示转置吗? 最佳答案 您有一个名为 data 的二维矩阵,您的代码从第一维获取所
我在行单击上附加了一个事件,在行内的复选框更改上附加了一个事件。 如何防止首先触发行单击? $(document).on('click', 'table tr', function() { con
我有一个场景,我需要连接两个 SQL 表并且正在为如何做而苦恼。假设在表 A 中我有这个: ColA ColB ColC ColD 45 55 17 45 45
我用谷歌表格记录我们俱乐部的出席率。该表格链接到另一个谷歌表格,因此可以自动创建一个名字列表,并按字母顺序排序。在这张表格中,我还根据我们所做的活动,在人的名字旁边手动输入点数。。问题是,由于姓名列表
这个问题在这里已经有了答案: What is a NullPointerException, and how do I fix it? (12 个回答) 5年前关闭。 编辑:我正在使用此代码读取 Ex
我是 R 的初学者。我希望你能帮助我解决我的问题。我的数据集中的文件名包含大量信息。我必须提取这些信息来创建单独的变量。 首先我使用 splits <- t(as.data.frame(strspli
假设我有一个 CSR 格式的矩阵,将一行(或多行)设置为零的最有效方法是什么? 下面的代码运行得很慢: A = A.tolil() A[indices, :] = 0 A = A.tocsr() 我不
我遇到了一个相当复杂的问题。我有一个包含三行的数据框:id、info 和 rownum。数据如下所示: id info row 1 a 1 1 b 2
我正在阅读learnjsdata.com并遇到了这种不熟悉的 JavaScript 语法。语法如下: lookupIndex[row[lookupKey]] = row; 有人知道这里发生了什么吗?我
我有一个表格,收集每周足球比赛的结果。 根据每场比赛的结果,我根据 ID 为玩家输入 3、1 或 0 分,具体取决于他们是赢、平还是输。 id Name A1 B1 C1 A2 B2
我有一个 mysql 表: 在表1中,我有3列:Bidang、Keahlian、Nilai。在 saran 列的表结果中,我想根据 bidang 组合 keahlian,但如果 nilai > 0,则
我有一个看起来像这样的表 | ID | item1 | item 2 | | 1 | A1 | B1 | | 2 | A2 |
我是一名优秀的程序员,十分优秀!