- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如此处推荐:Best Practices - Naming large TupleX types 。我在数据流中使用 POJO 而不是 Tuple。
这就是我的 POJO 的定义方式:
public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer>
如果我尝试将 PositionEvent
的数据流保存到 csv 文件,则会引发异常:
source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))
Exception in thread "main" java.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.
但是,如果我将 PositionEvent
显式转换为 Tuple8,它就会起作用:
source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
.map((PositionEvent e) ->
(Tuple8<Integer, String, Integer, Integer,
Integer, Integer, Integer, Integer>) e)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))
Flink 不应该检测数据流中的对象是 Tuple 子类吗?
====================
编辑:(感谢 twalthr)
好的,现在这是我的 POJO:
import org.apache.flink.api.java.tuple.Tuple8;
public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer> {
public PositionEvent() {
}
public PositionEvent(int timestamp, String vid, int speed, int xway,
int lane, int dir, int seg, int pos) {
super(timestamp, vid, speed, xway, lane, dir, seg, pos);
}
public int getSpeed() {
return f2;
}
}
这是我之前的 POJO:
public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer> {
public int timestamp;
public String vid;
public int speed;
public int xway;
public int lane;
public int dir;
public int seg;
public int pos;
public PositionEvent() {
}
public PositionEvent(int timestamp, String vid, int speed, int xway,
int lane, int dir, int seg, int pos) {
super(timestamp, vid, speed, xway, lane, dir, seg, pos);
}
}
现在我不需要显式转换我的 POJO。
最佳答案
看来您不仅扩展了 Tuple8
,还添加了 e.speed
等附加字段。这隐式地使您的类型成为 POJO。为了命名字段并保持高效的元组类型,您可以简单地实现 getter,但不要添加其他字段。否则,您可以简单地使用 POJO 而不是元组。
也许也值得研究一下 Flink's Table & SQL API 。它的目的是通过自动处理所有类型来简化开发。
关于java - Apache 弗林克 : can't use writeAsCsv() with a datastream of subclass tuple,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47510250/
Environment(执行环境) --> Source(数据源) --> Transform(转换操作) --> Sink(输出) 创建环境之后,就可以构建数据处理的业务逻
概述 Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持,如图所示,本节将主要讲解Flink中的Sink操作。之前已经了解Flink程序如何对数据进行读取、转换
我创建了一个带有配置的流媒体环境,并尝试在 RichMapFunction 的 open() 方法中访问此配置。 例子: Configuration conf = new Configurat
假设我有一个 Flink SourceFunction叫RequestsSource 。 对于来自该源的每个请求,我想订阅一个外部数据源(出于示例的目的,它可以启动一个单独的线程并开始在该线程上生成数
首先,感谢您阅读我的问题! 我目前正在研究 Hadoop 的复制模型,但我已无路可走。我从“Oreilly Hadoop 权威指南第 3 版 2012 年 1 月”一书中学习。要提出这个问题,我首先需
我只是在线程中使用普通的 DataInputStream 和 DataOutputStream 来接收、发送(在服务器上接受)来制作游戏,但它真的很慢。 >5 秒延迟。 这是我的制作方法(大部分看起来
我在启用持久性的 Kubernetes 集群中运行 Ignite。每台机器都有一个 24GB 的 Java 堆,其中 20GB 专门用于持久内存,内存限制为 110GB。我的相关 JVM 选项是 -X
执行环境(Execution Environment) Flink 程序可以在各种上下文环境中运行:可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。 不同的环境,代码的提交运行的过程会有
转换算子 数据源读入数据之后,就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream,如上所示。一个 Flink 程序的核心,其实就是所有的转换操作,它们决定了处理
这个问题已经有答案了: StreamCorruptedException: invalid type code: AC (1 个回答) 已关闭 9 年前。 我正在构建一个 4 人网络扑克游戏。我首先创
我有大量的 IgniteRunnable,我将使用 IgniteCompute 执行它们。它们看起来如下: public class MyIgniteRunnable implements Ignit
我在有机架可用的地方使用 Flink DataStream API,我想通过机架 ID 计算温度组的“平均值”。我的窗口持续时间是 40 秒,我的窗口每 10 秒滑动一次......下面是我的代码,我
我是 Flink 的新手,试图了解如何最有效地使用它。 我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换为案例类,因此 case class IncomingDataUni
我有一个来自 Kafka 的消息流,如下所示 DataStream messageStream = env .addSource(new FlinkKafkaConsumer09<>(topic,
我是 Flink 的新手,试图了解如何最有效地使用它。 我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换为案例类,因此 case class IncomingDataUni
我有一个来自 Kafka 的消息流,如下所示 DataStream messageStream = env .addSource(new FlinkKafkaConsumer09<>(topic,
我刚刚开始使用 Scala 来使用 Apache Flink。有人可以告诉我如何从我拥有的当前数据流创建滞后流(滞后于 k 个事件或 k 个时间单位)吗? 基本上,我想在数据流上实现一个自动回归模型(
我一直在为自己和一些 friend 开发聊天客户端,并决定尝试添加功能以允许客户端之间的文件传输。我可以发送文件,但它到达时的状态与发送的文件不同。例如,这里是一个比较发送图像之前和之后的链接: /i
我想实现一个具有两个输入流并从每个流中获取一个项目以同时处理两个输入流的运算符,例如加入。此外,如果两个输入之一没有任何数据,运算符(operator)将阻塞并等待它。 如果我必须这样做,涉及哪些类(
我正在使用 Royal Mail Shipping API 来“创建发货请求”和创建“打印标签”。 打印标签请求可以通过多种方式完成,我想以 PNG 格式获取此打印标签,使用 API 通过将“PNG”
我是一名优秀的程序员,十分优秀!