gpt4 book ai didi

java - Apache 弗林克 : can't use writeAsCsv() with a datastream of subclass tuple

转载 作者:行者123 更新时间:2023-12-02 11:57:22 29 4
gpt4 key购买 nike

如此处推荐:Best Practices - Naming large TupleX types 。我在数据流中使用 POJO 而不是 Tuple。

这就是我的 POJO 的定义方式:

public class PositionEvent extends Tuple8<Integer, String, Integer, 
Integer, Integer, Integer, Integer, Integer>

如果我尝试将 PositionEvent 的数据流保存到 csv 文件,则会引发异常:

source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))

Exception in thread "main" java.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.

但是,如果我将 PositionEvent 显式转换为 Tuple8,它就会起作用:

source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
.map((PositionEvent e) ->
(Tuple8<Integer, String, Integer, Integer,
Integer, Integer, Integer, Integer>) e)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))

Flink 不应该检测数据流中的对象是 Tuple 子类吗?

====================

编辑:(感谢 twalthr)

好的,现在这是我的 POJO:

import org.apache.flink.api.java.tuple.Tuple8;

public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer> {

public PositionEvent() {
}

public PositionEvent(int timestamp, String vid, int speed, int xway,
int lane, int dir, int seg, int pos) {
super(timestamp, vid, speed, xway, lane, dir, seg, pos);
}

public int getSpeed() {
return f2;
}
}

这是我之前的 POJO:

public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer> {

public int timestamp;

public String vid;

public int speed;

public int xway;

public int lane;

public int dir;

public int seg;

public int pos;

public PositionEvent() {
}

public PositionEvent(int timestamp, String vid, int speed, int xway,
int lane, int dir, int seg, int pos) {
super(timestamp, vid, speed, xway, lane, dir, seg, pos);
}
}

现在我不需要显式转换我的 POJO。

最佳答案

看来您不仅扩展了 Tuple8,还添加了 e.speed 等附加字段。这隐式地使您的类型成为 POJO。为了命名字段并保持高效的元组类型,您可以简单地实现 getter,但不要添加其他字段。否则,您可以简单地使用 POJO 而不是元组。

也许也值得研究一下 Flink's Table & SQL API 。它的目的是通过自动处理所有类型来简化开发。

关于java - Apache 弗林克 : can't use writeAsCsv() with a datastream of subclass tuple,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47510250/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com