作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
当至少其中一个表具有时间属性列时,我在使用 Flink 的 SQL 语法连接多个表时遇到了一些麻烦。
当 rowtime 用作 flink rowtime 时,我有一个使用架构(id、value1、rowtime)的表 Table1
。
我想将此表与使用架构 (id, value2) 的表 Table2
连接起来。连接必须在匹配的 id
上完成。
最后,我想通过使用翻滚时间窗口对这次连接的结果进行分组。
是否可以仅使用 SQL 语法来完成此操作?
这是我想做的一个例子:
SELECT
Table1.id as id,
TUMBLE_END(rowtime, INTERVAL '10' SECOND),
MAX(value1) as value1,
MAX(value2) as value2
FROM Table1 JOIN TABLE2 ON Table1.id = Table2.id
GROUP BY Table1.id, TUMBLE(rowtime, INTERVAL '10' SECOND)
但它给了我以下错误:
2019-11-12 16:37:57.191 [main] ERROR - Cannot generate a valid execution plan for the given query:
FlinkLogicalCalc(expr#0..6=[{inputs}], id=[$t0], EXPR$1=[$t4], value1=[$t1], value2=[$t2])
FlinkLogicalWindowAggregate(group=[{0}], value1=[MAX($2)], value2=[MAX($3)])
FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[0], proj#0..1=[{exprs}], value1=[$t3], value2=[$t3])
FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
FlinkLogicalTableSourceScan(table=[[Table1]], fields=[id, value1, rowtime], source=[KafkaTableSource(id, value1, rowtime)])
FlinkLogicalTableSourceScan(table=[[Table2]], fields=[id, value2], source=[Table2_Type(id, value2)])
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
FlinkLogicalCalc(expr#0..6=[{inputs}], id=[$t0], EXPR$1=[$t4], value1=[$t1], value2=[$t2])
FlinkLogicalWindowAggregate(group=[{0}], value1=[MAX($2)], value2=[MAX($3)])
FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[0], proj#0..1=[{exprs}], value1=[$t3], value2=[$t3])
FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
FlinkLogicalTableSourceScan(table=[[kafkaDataStream]], fields=[id, value1, rowtime], source=[KafkaTableSource(id, value1, rowtime)])
FlinkLogicalTableSourceScan(table=[[SensorConfigurationUpdateHTTP]], fields=[id, value2], source=[Table2_Type(id, value2)])
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:379)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
...
我还尝试将我的 rowtime
转换为 TIMESTAMP
类型(如错误消息所建议),但随后我无法再处理时间窗口。它会导致以下错误:
2019-11-12 16:44:52.473 [main] ERROR - Window can only be defined over a time attribute column.
org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:813)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:379)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
最佳答案
连接结果不能包含时间属性,因为连接不能保证保留时间戳的顺序。 Flink 假设两个表都是动态的并且可以在任何时间点发生变化。表 Table2
中的新记录可能会与 Table1
的第一条记录连接,以“随机”顺序生成带有时间戳的结果。
您可以通过向连接添加时间约束来更改这一点。您可以使用 time-windowed join 定义查询或者您将 Table2
建模为 temporal table and join Table1
与它。
关于java - 弗林克 SQL : Joining Tables with timestamp in pure SQL syntax,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58822169/
我是一名优秀的程序员,十分优秀!