gpt4 book ai didi

java - Apache Flink 将数据流(源)转换为列表?

转载 作者:行者123 更新时间:2023-12-05 06:34:49 25 4
gpt4 key购买 nike

我的问题是如何从 DataStream 转换为 List,例如为了能够遍历它。

代码如下:

package flinkoracle;

//imports
//....

public class FlinkOracle {

final static Logger LOG = LoggerFactory.getLogger(FlinkOracle.class);

public static void main(String[] args) {
LOG.info("Starting...");
// get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};

RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
//get the source from Oracle DB
DataStream<?> source = env
.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("oracle.jdbc.driver.OracleDriver")
.setDBUrl("jdbc:oracle:thin:@localhost:1521")
.setUsername("user")
.setPassword("password")
.setQuery("select * from table1")
.setRowTypeInfo(rowTypeInfo)
.finish());

source.print().setParallelism(1);

try {
LOG.info("----------BEGIN----------");
env.execute();
LOG.info("----------END----------");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOG.info("End...");
}

}

非常感谢。溴化物塔玛斯

最佳答案

Flink 提供了一个迭代器接收器来收集 DataStream 结果以用于测试和调试目的。它可以按如下方式使用:

import org.apache.flink.contrib.streaming.DataStreamUtils;

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

您可以像这样将迭代器复制到新列表:

while (iter.hasNext())
list.add(iter.next());

Flink 还在 DataStream 上提供了一堆简单的 write*() 方法,主要用于调试目的。将数据刷新到目标系统取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中。注意:这些 write*() 方法不参与 Flink 的检查点,在失败的情况下,这些记录可能会丢失。

writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket

来源:link .

您可能需要添加以下依赖项才能使用 DataStreamUtils:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib</artifactId>
<version>0.10.2</version>
</dependency>

关于java - Apache Flink 将数据流(源)转换为列表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50022950/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com