gpt4 book ai didi

java - Apache Flink writeAsCsv() 方法写入对象元组

转载 作者:行者123 更新时间:2023-12-02 11:52:59 25 4
gpt4 key购买 nike

我正在按照 Apache Flink 教程来清理 TaxiRide 事件流。生成的流将打印到控制台。现在我想将其写入 csv 文件。

        // configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
new TaxiRideSource(path, maxEventDelay, servingSpeedFactor));

DataStream<TaxiRide> filteredRides = rides
// filter out rides that do not start or stop in NYC
.filter(new RideCleansing.NYCFilter());

filteredRides.print();

我已尝试以下操作,但收到错误:java.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.

DataStreamSink<TaxiRide> rides = filteredRides.writeAsCsv("/resources").setParallelism(1);

当我制作DataSet<Tuple1<TaxiRide>> rides1 = filteredRides.writeAsCsv("/resources").setParallelism(1);时它会导致编译器错误。

我应该怎样做才能将清理后的 TaxiRide 对象流写入 csv 文件?

最佳答案

DataStreamDataSet属于单独的 API,不能混合使用。因此,编译错误。

错误消息“writeAsCsv() 方法只能用于元组的数据流。”意味着您必须转换 DataStream<TaxiRide>对象变成 DataStream元组以将其写入 CSV 文件。这可以通过简单的 MapFunction 来完成:

DataStream<Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> rideTuples = filteredRides
.map(new TupleConverter());

TupleConverter被定义为

class TupleConverter implements MapFunction<TaxiRide, Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> {

public Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short> map(TaxiRide ride) {
return Tuple9.of(ride.rideId, ride.isStart, ...);
}
}

一旦您拥有DataStream rideTuples ,您可以将其写入 CSV 文件。

关于java - Apache Flink writeAsCsv() 方法写入对象元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47762834/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com