gpt4 book ai didi

java - Flink Autojoin 与行时间列

转载 作者:行者123 更新时间:2023-12-02 09:34:31 25 4
gpt4 key购买 nike

我有一个具有以下结构的 Flink 表:

Id1, Id2, myTimestamp, value

其中行时间基于 myTimestamp

我有以下效果很好的处理:

Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable " +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

我想调整以前的代码,例如对于每个窗口,我仅使用每个 Id2 的最新记录。所以我认为按如下方式更改代码会起作用:

Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable, " +
"(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
"WHERE MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

但是当我这样做时,我收到以下错误:

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)

看起来 Flink 不“理解”我要加入的两个表是同一个表。

我怎样才能做我想做的事?

最佳答案

您的查询不起作用的原因有几个。

SELECT 
Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value
FROM
MyTable,
(SELECT Id2, MAX(myTimestamp) as latestTimestamp
FROM MyTable
GROUP BY Id2
) as RecordsLatest
WHERE
MyTable.Id2 = RecordsLatest.Id2
AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY
Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)

有些是由于 Flink 的限制,有些则更为根本。

  1. latestTimestamp 不再是行时间属性。这是因为,它是经过计算的。一旦您在表达式中使用行时间属性(包括 MAX 之类的聚合函数),它们就会失去其行时间属性并成为常规的 TIMESTAMP 属性。
  2. 内部查询生成一个更新其结果的动态表。它不是一个仅附加表。一旦 Id2 的最大时间戳发生变化,就需要撤回之前的结果行并插入新的结果行。
  3. 由于 RecordsLatest 是更新表(而不是仅追加表)并且 latestTimestamp 不是行时间属性,因此 RecordsLatest 的联接> 和 MyTable 是“常规联接”(而不是时间窗口联接),它还生成更新结果而不是仅追加结果。常规联接无法生成任何行时间属性,因为无法保证输出行的顺序(这是行时间属性的先决条件,因为它们需要与水印对齐),并且结果可能需要在将来删除它们。这导致您看到错误消息。
  4. 外部查询的 GROUP BY 子句需要具有行时间属性 rowtime 的仅追加输入表。但是,连接的输出不是仅追加而是更新,并且 rowtime 属性不能是前面解释的 rowtime 属性。

遗憾的是,解决您的任务并不简单,但应该是可能的。

首先,您应该返回一个查询,该查询为每个 (Id1, Id2) 窗口返回具有最大时间戳的行的值:

SELECT 
Id1, Id2,
MAX(myTimestamp) AS maxT
ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
MyTable
GROUP BY
Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)

ValOfMaxT 函数是一个用户定义的聚合函数,用于识别最大时间戳的值并返回该值。 rowtime 是新的 rowtime 属性,位于窗口结束时间戳之前 1 毫秒。

给定这个表,我们将其称为Temp,您可以将下一个查询定义为:


SELECT
Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)

此查询仅对 Id1TUMBLE 窗口进行分组。这是一个 TUMBLE 窗口,因为第一个 HOP 窗口已经将每条记录分组到三个窗口中,我们不应该再这样做。相反,我们将第一个查询的结果分组为 10 个第二窗口,因为这是第一个查询中 HOP 窗口的滑动长度。

关于java - Flink Autojoin 与行时间列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57656776/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com