- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个具有以下结构的 Flink 表:
Id1, Id2, myTimestamp, value
其中行时间基于 myTimestamp
。
我有以下效果很好的处理:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable " +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
我想调整以前的代码,例如对于每个窗口,我仅使用每个 Id2
的最新记录。所以我认为按如下方式更改代码会起作用:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable, " +
"(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
"WHERE MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
但是当我这样做时,我收到以下错误:
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
看起来 Flink 不“理解”我要加入的两个表是同一个表。
我怎样才能做我想做的事?
最佳答案
您的查询不起作用的原因有几个。
SELECT
Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value
FROM
MyTable,
(SELECT Id2, MAX(myTimestamp) as latestTimestamp
FROM MyTable
GROUP BY Id2
) as RecordsLatest
WHERE
MyTable.Id2 = RecordsLatest.Id2
AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY
Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)
有些是由于 Flink 的限制,有些则更为根本。
latestTimestamp
不再是行时间属性。这是因为,它是经过计算的。一旦您在表达式中使用行时间属性(包括 MAX
之类的聚合函数),它们就会失去其行时间属性并成为常规的 TIMESTAMP
属性。Id2
的最大时间戳发生变化,就需要撤回之前的结果行并插入新的结果行。RecordsLatest
是更新表(而不是仅追加表)并且 latestTimestamp
不是行时间属性,因此 RecordsLatest
的联接> 和 MyTable
是“常规联接”(而不是时间窗口联接),它还生成更新结果而不是仅追加结果。常规联接无法生成任何行时间属性,因为无法保证输出行的顺序(这是行时间属性的先决条件,因为它们需要与水印对齐),并且结果可能需要在将来删除它们。这导致您看到错误消息。GROUP BY
子句需要具有行时间属性 rowtime
的仅追加输入表。但是,连接的输出不是仅追加而是更新,并且 rowtime
属性不能是前面解释的 rowtime 属性。遗憾的是,解决您的任务并不简单,但应该是可能的。
首先,您应该返回一个查询,该查询为每个 (Id1, Id2
) 窗口返回具有最大时间戳的行的值:
SELECT
Id1, Id2,
MAX(myTimestamp) AS maxT
ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
MyTable
GROUP BY
Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)
ValOfMaxT
函数是一个用户定义的聚合函数,用于识别最大时间戳的值并返回该值。 rowtime
是新的 rowtime 属性,位于窗口结束时间戳之前 1 毫秒。
给定这个表,我们将其称为Temp
,您可以将下一个查询定义为:
SELECT
Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)
此查询仅对 Id1
和 TUMBLE
窗口进行分组。这是一个 TUMBLE
窗口,因为第一个 HOP
窗口已经将每条记录分组到三个窗口中,我们不应该再这样做。相反,我们将第一个查询的结果分组为 10 个第二窗口,因为这是第一个查询中 HOP
窗口的滑动长度。
关于java - Flink Autojoin 与行时间列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57656776/
猫f1.txt阿曼维沙尔阿杰贾伊维杰拉胡尔曼尼什肖比特批评塔夫林现在输出应该符合上面给定的条件 最佳答案 您可以在文件读取循环中设置一个计数器并打印它, 计数=0 读取行时做 让我们数一数++ if
我正在尝试查找文件 1 和文件 2 中的共同行。如果公共(public)行存在,我想写入文件 2 中的行,否则打印文件 1 中的非公共(public)行。fin1 和 fin2 是这里的文件句柄。它读
我有这个 SQL 脚本: CREATE TABLE `table_1` ( `IDTable_1` int(11) NOT NULL, PRIMARY KEY (`IDTable_1`) );
我有 512 行要插入到数据库中。我想知道提交多个插入内容是否比提交一个大插入内容有任何优势。例如 1x 512 行插入 -- INSERT INTO mydb.mytable (id, phonen
如何从用户中选择user_id,SUB(row, row - 1),其中user_id=@userid我的表用户,id 为 1、3、4、10、11、23...(不是++) --id---------u
我曾尝试四处寻找解决此问题的最佳方法,但我找不到此类问题的任何先前示例。 我正在构建一个基于超本地化的互联网购物中心,该区域分为大约 3000 个区域。每个区域包含大约 300 个项目。它们是相似的项
preg_match('|phpVersion = (.*)\n|',$wampConfFileContents,$result); $phpVersion = str_replace('"','',
我正在尝试创建一个正则表达式,使用“搜索并替换全部”删除 200 个 txt 文件的第一行和最后 10 行 我尝试 (\s*^(\h*\S.*)){10} 删除包含的前 10 行空白,但效果不佳。 最
下面的代码从数据库中获取我需要的信息,但没有打印出所有信息。首先,我知道它从表中获取了所有正确的信息,因为我已经在 sql Developer 中尝试过查询。 public static void m
很难说出这里问的是什么。这个问题是含糊的、模糊的、不完整的、过于宽泛的或修辞性的,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开它,visit the help center 。 已关
我试图在两个表中插入记录,但出现异常。您能帮我解决这个问题吗? 首先我尝试了下面的代码。 await _testRepository.InsertAsync(test); await _xyzRepo
这个基本的 bootstrap CSS 显示 1 行 4 列: Text Text Text
如果我想从表中检索前 10 行,我将使用以下代码: SELECT * FROM Persons LIMIT 10 我想知道的是如何检索前 10 个结果之后的 10 个结果。 如果我在下面执行这段代码,
今天我开始使用 JexcelApi 并遇到了这个:当您尝试从特定位置获取元素时,不是像您通常期望的那样使用sheet.getCell(row,col),而是使用sheet.getCell(col,ro
我正在尝试在我的网站上开发一个用户个人资料系统,其中包含用户之前发布的 3 个帖子。我可以让它选择前 3 条记录,但它只会显示其中一条。我是不是因为凌晨 2 点就想编码而变得愚蠢? query($q)
我在互联网上寻找答案,但找不到任何答案。 (我可能问错了?)我有一个看起来像这样的表: 我一直在使用查询: SELECT title, date, SUM(money) FROM payments W
我有以下查询,我想从数据库中获取 100 个项目,但 host_id 多次出现在 urls 表中,我想每个 host_id 从该表中最多获取 10 个唯一行。 select * from urls j
我的数据库表中有超过 500 行具有特定日期。 查询特定日期的行。 select * from msgtable where cdate='18/07/2012' 这将返回 500 行。 如何逐行查询
我想使用 sed 从某一行开始打印 n 行、跳过 n 行、打印 n 行等,直到文本文件结束。例如在第 4 行声明,打印 5-9,跳过 10-14,打印 15-19 等 来自文件 1 2 3 4 5 6
我目前正在执行验证过程来检查用户的旧密码,但问题是我无法理解为什么我的查询返回零行,而预期它有 1 行。另一件事是,即使我不将密码文本转换为 md5,哈希密码仍然得到正确的答案,但我不知道为什么会发生
我是一名优秀的程序员,十分优秀!