gpt4 book ai didi

java - 弗林克 : Rowtime attributes must not be in the input rows of a regular join

转载 作者:行者123 更新时间:2023-11-29 04:07:53 29 4
gpt4 key购买 nike

使用 flink SQL API,我想将多个表连接在一起并在时间窗口内进行一些计算。我有 3 个表来自 CSV 文件,一个来自 Kafka。在 Kafka 表中,我有一个字段 timestampMs,我想将其用于我的时间窗口操作。

为此我做了以下代码:

reamExecutionEnvironment env = ... ;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

TableSource table1 = CsvTableSource.builder()
.path("path/to/file1.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id1", Types.STRING)
.field("someInfo1", Types.FLOAT)
.build();

TableSource table2 = CsvTableSource.builder()
.path("path/to/file2.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id2", Types.STRING)
.field("someInfo2", Types.STRING)
.build();

TableSource table3 = CsvTableSource.builder()
.path("path/to/file3.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id2", Types.STRING)
.field("id1", Types.STRING)
.field("someInfo3", Types.FLOAT)
.build();

tableEnv.registerTableSource("Table1",table1);
tableEnv.registerTableSource("Table2",table2);
tableEnv.registerTableSource("Table3",table3);


Schema schemaExt = new Schema().schema(SOME_SCHEMA);
schemaExt = schemaExt.field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestampMs").watermarksPeriodicBounded(40000));

tableEnv.connect(new Kafka()
.version("universal")
.topic(MY_TOPIC)
.properties(MY_PROPERTIES)
.sinkPartitionerRoundRobin()
)
.withFormat(...)
.withSchema(schemaExt)
.inAppendMode()
.registerTableSource("KafkaInput");

Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
"join table3 on table1.id2 = table3.id2 " +
"join table2 on table3.id1 = table2.id1 " +
"join KafkaInput on table3.id2 = KafkaInput.id2");

tableEnv.registerTable("Joined", joined);

int windowWidth = 5;
int frequency = 2;
Table processed = tableEnv.sqlQuery("SELECT id1 FROM Joined " +
"GROUP BY id1, HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '30' SECOND)");



Sink s = createSink(this.esEndpoint, this.esPattern, this.schemaHandler.getSchemaStr());


tableEnv.registerTableSink("MySink", ...);

processed.insertInto("MySink");

env.execute();

但是当我运行它时,出现以下错误:

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

但我不明白解决方法提示部分。如何在加入表后创建时间属性并进行一些窗口化计算。

--- 编辑 ---

在上面的代码中,我替换了以下几行:

Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
"join table3 on table1.id2 = table3.id2 " +
"join table2 on table3.id1 = table2.id1 " +
"join KafkaInput on table3.id2 = KafkaInput.id2");

tableEnv.registerTable("Joined", joined);

作者:

Table staticJoined = tableEnv.sqlQuery("SELECT *, TIMESTAMP('1970-01-01 00:00:00') as rowtime FROM table1 " +
"join table3 on table1.id2 = table3.id2 " +
"join table2 on table3.id1 = table2.id1 ");

TemporalTableFunction temporalFunction = staticJoined.createTemporalTableFunction( "rowtime" , "id2");
tableEnv.registerFunction("CSVData", temporalFunction);

tableEnv.registerTable("Joined",
tableEnv.sqlQuery("SELECT * FROM KafkaInput, LATERAL TABLE (CSVData(KafkaInput.rowtime)) as Statics WHERE Statics.id2 = KafkaInput.id2")
);

但是我在使用 TemporalTableFunction 时遇到错误:

Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set:
set type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(3) rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
expression type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(0) NOT NULL rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
set is rel#26:LogicalCorrelate.NONE(left=HepRelVertex#24,right=HepRelVertex#25,correlation=$cor0,joinType=inner,requiredColumns={8})
expression is LogicalTemporalTableJoin#32

其中两个字段在“集合类型”和“表达式类型”之间不匹配。TIMESTAMP(3) rowtime0TIMESTAMP(0) NOT NULL rowtime0

问题是我没有名为 rowtime0 的字段。它看起来像是一个内部字段。我真的不明白这里发生了什么

最佳答案

您的查询定义了常规连接,即没有时间连接约束的连接。由于 Flink 将所有表视为动态的(即假设它们将来可能会发生变化),因此没有时间限制的常规连接不能保证行(大致)按时间戳顺序发出。但是,时间属性需要时间戳顺序,以确保可以在不完全具体化流的情况下执行后续操作(例如窗口聚合)​​。因此,Flink 不允许时间属性作为不保留时间顺序的常规连接的输入(因此也是输出)。

如果 Flink 知道 CSV 文件中的表是固定的而不是动态的,那么问题就不会存在。然而,这个推理还没有得到支持。

作为解决方法,您可以将 CSV 表建模为 temporal tables (没有改变)和 join them与 Kafka 表。

关于java - 弗林克 : Rowtime attributes must not be in the input rows of a regular join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57181771/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com