gpt4 book ai didi

apache-flink - Flink SQL : How can use a Long type column to Rowtime

转载 作者:行者123 更新时间:2023-12-04 10:57:53 27 4
gpt4 key购买 nike

Flink1.9.1
我读了一个csv文件。我想使用长类型的列来 TUMBLE。
我使用 UDF 将 Long 类型转换为 Timestamp 类型,但无法正常工作
错误消息:窗口只能在时间属性列上定义。

我尝试调试。 TimeIndicatorRelDataType 不是 Timestamp,我不知道如何传输,为什么?

def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
case ti: TimeIndicatorRelDataType => true
case _ => false
}

代码
 public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

// read csv
URL fileUrl = HotItemsSql.class.getClassLoader().getResource("UserBehavior-less.csv");
CsvTableSource csvTableSource = CsvTableSource.builder().path(fileUrl.getPath())
.field("userId", BasicTypeInfo.LONG_TYPE_INFO)
.field("itemId", BasicTypeInfo.LONG_TYPE_INFO)
.field("categoryId", BasicTypeInfo.LONG_TYPE_INFO)
.field("behavior", BasicTypeInfo.LONG_TYPE_INFO)
.field("optime", BasicTypeInfo.LONG_TYPE_INFO)
.build();

// trans to stream
DataStream<Row> csvDataStream=csvTableSource.getDataStream(env).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
@Override
public long extractAscendingTimestamp(Row element) {
return Timestamp.valueOf(element.getField(5).toString()).getTime();
}
}).broadcast();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerDataStream("T_UserBehavior",csvDataStream,"userId,itemId,categoryId,behavior,optime");
tableEnv.registerFunction("Long2DateTime",new DateTransFunction());

Table result = tableEnv.sqlQuery("select userId," +
"TUMBLE_START(Long2DateTime(optime), INTERVAL '10' SECOND) as window_start," +
"TUMBLE_END(Long2DateTime(optime), INTERVAL '10' SECOND) as window_end " +
"from T_UserBehavior " +
"group by TUMBLE(Long2DateTime(optime),INTERVAL '10' SECOND),userId");

tableEnv.toRetractStream(result, Row.class).print();

UDF
import java.sql.Timestamp;
public class DateTransFunction extends ScalarFunction {
public Timestamp eval(Long longTime) {
try {
Timestamp t = new Timestamp(longTime);
return t;
} catch (Exception e) {
return null;
}
}
}

错误堆栈
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:68)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
at org.apache.flink.table.plan.Optimizer.runHepPlannerSequentially(Optimizer.scala:194)
at org.apache.flink.table.plan.Optimizer.optimizeNormalizeLogicalPlan(Optimizer.scala:150)
at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:65)
at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)

最佳答案

由于您已经设法在 DataStream API 中分配了时间戳,因此您应该能够调用:

tableEnv.registerDataStream(
"T_UserBehavior",
csvDataStream,
"userId, itemId, categoryId, behavior, rt.rowtime");
.rowtime指示 API 使用存储在来自 DataStream API 的每个流记录中的时间戳创建列。

社区目前正在努力使您的程序更容易。在 Flink 1.10 中,您应该能够直接在 SQL DDL 中使用行时间表定义 CSV。

关于apache-flink - Flink SQL : How can use a Long type column to Rowtime,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59061872/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com