- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
使用 flink SQL API,我想将多个表连接在一起并在时间窗口内进行一些计算。我有 3 个表来自 CSV 文件,一个来自 Kafka。在 Kafka 表中,我有一个字段 timestampMs
,我想将其用于我的时间窗口操作。
为此我做了以下代码:
reamExecutionEnvironment env = ... ;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableSource table1 = CsvTableSource.builder()
.path("path/to/file1.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id1", Types.STRING)
.field("someInfo1", Types.FLOAT)
.build();
TableSource table2 = CsvTableSource.builder()
.path("path/to/file2.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id2", Types.STRING)
.field("someInfo2", Types.STRING)
.build();
TableSource table3 = CsvTableSource.builder()
.path("path/to/file3.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id2", Types.STRING)
.field("id1", Types.STRING)
.field("someInfo3", Types.FLOAT)
.build();
tableEnv.registerTableSource("Table1",table1);
tableEnv.registerTableSource("Table2",table2);
tableEnv.registerTableSource("Table3",table3);
Schema schemaExt = new Schema().schema(SOME_SCHEMA);
schemaExt = schemaExt.field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestampMs").watermarksPeriodicBounded(40000));
tableEnv.connect(new Kafka()
.version("universal")
.topic(MY_TOPIC)
.properties(MY_PROPERTIES)
.sinkPartitionerRoundRobin()
)
.withFormat(...)
.withSchema(schemaExt)
.inAppendMode()
.registerTableSource("KafkaInput");
Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
"join table3 on table1.id2 = table3.id2 " +
"join table2 on table3.id1 = table2.id1 " +
"join KafkaInput on table3.id2 = KafkaInput.id2");
tableEnv.registerTable("Joined", joined);
int windowWidth = 5;
int frequency = 2;
Table processed = tableEnv.sqlQuery("SELECT id1 FROM Joined " +
"GROUP BY id1, HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '30' SECOND)");
Sink s = createSink(this.esEndpoint, this.esPattern, this.schemaHandler.getSchemaStr());
tableEnv.registerTableSink("MySink", ...);
processed.insertInto("MySink");
env.execute();
但是当我运行它时,出现以下错误:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
但我不明白解决方法提示部分。如何在加入表后创建时间属性并进行一些窗口化计算。
--- 编辑 ---
在上面的代码中,我替换了以下几行:
Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
"join table3 on table1.id2 = table3.id2 " +
"join table2 on table3.id1 = table2.id1 " +
"join KafkaInput on table3.id2 = KafkaInput.id2");
tableEnv.registerTable("Joined", joined);
作者:
Table staticJoined = tableEnv.sqlQuery("SELECT *, TIMESTAMP('1970-01-01 00:00:00') as rowtime FROM table1 " +
"join table3 on table1.id2 = table3.id2 " +
"join table2 on table3.id1 = table2.id1 ");
TemporalTableFunction temporalFunction = staticJoined.createTemporalTableFunction( "rowtime" , "id2");
tableEnv.registerFunction("CSVData", temporalFunction);
tableEnv.registerTable("Joined",
tableEnv.sqlQuery("SELECT * FROM KafkaInput, LATERAL TABLE (CSVData(KafkaInput.rowtime)) as Statics WHERE Statics.id2 = KafkaInput.id2")
);
但是我在使用 TemporalTableFunction 时遇到错误:
Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set:
set type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(3) rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
expression type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(0) NOT NULL rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
set is rel#26:LogicalCorrelate.NONE(left=HepRelVertex#24,right=HepRelVertex#25,correlation=$cor0,joinType=inner,requiredColumns={8})
expression is LogicalTemporalTableJoin#32
其中两个字段在“集合类型”和“表达式类型”之间不匹配。TIMESTAMP(3) rowtime0
和 TIMESTAMP(0) NOT NULL rowtime0
问题是我没有名为 rowtime0
的字段。它看起来像是一个内部字段。我真的不明白这里发生了什么
最佳答案
您的查询定义了常规连接,即没有时间连接约束的连接。由于 Flink 将所有表视为动态的(即假设它们将来可能会发生变化),因此没有时间限制的常规连接不能保证行(大致)按时间戳顺序发出。但是,时间属性需要时间戳顺序,以确保可以在不完全具体化流的情况下执行后续操作(例如窗口聚合)。因此,Flink 不允许时间属性作为不保留时间顺序的常规连接的输入(因此也是输出)。
如果 Flink 知道 CSV 文件中的表是固定的而不是动态的,那么问题就不会存在。然而,这个推理还没有得到支持。
作为解决方法,您可以将 CSV 表建模为 temporal tables (没有改变)和 join them与 Kafka 表。
关于java - 弗林克 : Rowtime attributes must not be in the input rows of a regular join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57181771/
我正在测试设置SQLAlchemy以映射现有数据库。这个数据库是很久以前自动建立的,它是由我们不再使用的先前的第三方应用程序创建的,因此 undefined 某些预期的事情,例如外键约束。该软件将管理
这个问题在这里已经有了答案: What is the difference between "INNER JOIN" and "OUTER JOIN"? (28 个答案) 关闭 7 年前。 INNE
这个问题在这里已经有了答案: What is the difference between "INNER JOIN" and "OUTER JOIN"? (29 个回答) 关闭7年前. INNER J
假设有两个表: table1.c1 table1.c2 1 1 A 2 1 B 3 1 C 4 2
假设有两个表: table1.c1 table1.c2 1 1 A 2 1 B 3 1 C 4 2
一.先看一些最简单的例子 例子 Table A aid adate 1 a1 2&nb
数据库操作语句 7. 外连接——交叉查询 7.1 查询 7.2 等值连接 7.3 右外
我有两个表 'users' 和 'lms_users' class LmsUser belongs_to :user end class User has_one :lms_user
我试图避免在 Rails 中对我的 joins 进行字符串插值,因为我注意到将查询器链接在一起时灵活性会降低。 也就是说,我觉得 joins(:table1) 比 joins('inner join
我有这个代码 User.find(:all, :limit => 10, :joins => :user_points, :select => "users.*, co
我刚刚开始探索 Symfony2,我很惊讶它拥有如此多的强大功能。我开始做博客教程在: http://tutorial.symblog.co.uk/ 但使用的是 2.1 版而不是 2.0 我的问题是我
什么是 SQL JOIN什么是不同的类型? 最佳答案 插图来自 W3schools : 关于SQL JOIN 和不同类型的 JOIN,我们在Stack Overflow上找到一个类似的问题: http
我有两个 Hive 表,我正在尝试加入它们。这些表没有被任何字段聚集或分区。尽管表包含公共(public)键字段的记录,但连接查询始终返回 0 条记录。所有数据类型都是“字符串”数据类型。 连接查询很
我正在使用 Solr 的(4.0.0-beta)连接功能来查询包含具有父/子关系的文档的索引。连接查询效果很好,但我只能在搜索结果中获得父文档。我相信这是预期的行为。 但是,是否有可能在搜索结果中同时
我正在使用可用的指南/api/书籍自学 Rails,但我无法理解通过三种方式/嵌套 has_many :through 关联进行的连接。 我有用户与组相关联:通过成员(member)资格。 我在多对多
什么是 SQL JOIN,有哪些不同的类型? 最佳答案 插图来自 W3schools : 关于SQL JOIN 和不同类型的 JOIN,我们在Stack Overflow上找到一个类似的问题: htt
我正在尝试访问数据库的两个表。在商店里,我保留了一个事件列表,其中包含 Table Event id, name,datei,houri, dateF,Hourf ,capacity, age ,de
我有 4 个表:booking、address、search_address 和 search_address_log 表:(相关列) 预订:(pickup_address_id, dropoff_a
我在YML中有以下结构:。我正试着创造一个这样的结构:。作业名称和脚本用~分隔,作业用;分隔。。我可以使用以下命令使其正常工作。然而,我想知道是否可以用一个yq表达式来完成,而不是通过管道再次使用yq
我在YML中有以下结构:。我正试着创造一个这样的结构:。作业名称和脚本用~分隔,作业用;分隔。。我可以使用以下命令使其正常工作。然而,我想知道是否可以用一个yq表达式来完成,而不是通过管道再次使用yq
我是一名优秀的程序员,十分优秀!