gpt4 book ai didi

unit-testing - 为 Flink SQL 添加单元测试

转载 作者:行者123 更新时间:2023-12-05 05:12:08 25 4
gpt4 key购买 nike

我正在使用 Flink v1.7.1。当我使用 tableSource、SQL 和 tableSink 完成 Flink 流式作业时,我不知道如何为其添加单元测试。

最佳答案

我找到了一个很好的例子,关于如何在用户邮件列表的帮助下测试 flink sql,这里是一个例子。

package org.apache.flink.table.runtime.stream.sql;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.utils.JavaStreamTestData;
import org.apache.flink.table.runtime.utils.StreamITCase;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
* Integration tests for streaming SQL.
*/
public class JavaSqlITCase extends AbstractTestBase {

@Test
public void testRowRegisterRowWithNames() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
StreamITCase.clear();

List<Row> data = new ArrayList<>();
data.add(Row.of(1, 1L, "Hi"));
data.add(Row.of(2, 2L, "Hello"));
data.add(Row.of(3, 2L, "Hello world"));

TypeInformation<?>[] types = {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
String[] names = {"a", "b", "c"};

RowTypeInfo typeInfo = new RowTypeInfo(types, names);

DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);

Table in = tableEnv.fromDataStream(ds, "a,b,c");
tableEnv.registerTable("MyTableRow", in);

String sqlQuery = "SELECT a,c FROM MyTableRow";
Table result = tableEnv.sqlQuery(sqlQuery);

DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
env.execute();

List<String> expected = new ArrayList<>();
expected.add("1,Hi");
expected.add("2,Hello");
expected.add("3,Hello world");

StreamITCase.compareWithList(expected);
}
}

相关代码为here

关于unit-testing - 为 Flink SQL 添加单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54900843/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com