- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想将 DataStream 转换为带有架构信息的 DataStream
输入
args[0] 数据流
{"fields":["China","Beijing"]}
args[1] 架构
message spark_schema {
optional binary country (UTF8);
optional binary city (UTF8);
}
预期输出
{"country":"china", "city":"beijing"}
我的代码是这样的
public DataStream<String> convert(DataStream source, MessageType messageType) {
SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
JSONObject data = new JSONObject();
this.fields = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
for (int i = 0; i < fields.size(); i++) {
data.put(fields.get(i), row.getField(i));
}
return data.toJSONString();
});
return dataWithSchema;
}
异常错误
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.xxxx.ParquetDataSourceReader$$Lambda$64/1174881426@d78795 is not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1823)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
但是下面的代码工作正常
public DataStream<String> convert(DataStream source, MessageType messageType) {
if (this.fields == null) {
throw new RuntimeException("The schema of AbstractRowStreamReader is null");
}
List<String> field = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
JSONObject data = new JSONObject();
for (int i = 0; i < field.size(); i++) {
data.put(field.get(i), row.getField(i));
}
return data.toJSONString();
});
return dataWithSchema;
}
Flink的map算子如何组合外部复杂的POJO?
最佳答案
为了让 Flink 跨任务分发代码,代码需要完整 Serializable
。在你的第一个例子中,它不是;第二个是。特别是Type::getName
将生成一个不是 Serializable
的 lambda .
获得一个 lambda Serializable
,您需要将其显式转换为可序列化接口(interface)(例如 Flink MapFunction
)或将其与 (Serializable & Function)
一起使用
由于第二个也可以节省计算量,因此无论如何它都会更好。 Convert 在作业编译期间只会执行一次,而 DataStream#map
为每条记录调用。如果不清楚,我建议在 IDE 中执行它并使用断点。
关于java - Flink DataStream如何将自定义的POJO合并到另一个DataStream中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60573782/
Environment(执行环境) --> Source(数据源) --> Transform(转换操作) --> Sink(输出) 创建环境之后,就可以构建数据处理的业务逻
概述 Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持,如图所示,本节将主要讲解Flink中的Sink操作。之前已经了解Flink程序如何对数据进行读取、转换
我创建了一个带有配置的流媒体环境,并尝试在 RichMapFunction 的 open() 方法中访问此配置。 例子: Configuration conf = new Configurat
假设我有一个 Flink SourceFunction叫RequestsSource 。 对于来自该源的每个请求,我想订阅一个外部数据源(出于示例的目的,它可以启动一个单独的线程并开始在该线程上生成数
首先,感谢您阅读我的问题! 我目前正在研究 Hadoop 的复制模型,但我已无路可走。我从“Oreilly Hadoop 权威指南第 3 版 2012 年 1 月”一书中学习。要提出这个问题,我首先需
我只是在线程中使用普通的 DataInputStream 和 DataOutputStream 来接收、发送(在服务器上接受)来制作游戏,但它真的很慢。 >5 秒延迟。 这是我的制作方法(大部分看起来
我在启用持久性的 Kubernetes 集群中运行 Ignite。每台机器都有一个 24GB 的 Java 堆,其中 20GB 专门用于持久内存,内存限制为 110GB。我的相关 JVM 选项是 -X
执行环境(Execution Environment) Flink 程序可以在各种上下文环境中运行:可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。 不同的环境,代码的提交运行的过程会有
转换算子 数据源读入数据之后,就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream,如上所示。一个 Flink 程序的核心,其实就是所有的转换操作,它们决定了处理
这个问题已经有答案了: StreamCorruptedException: invalid type code: AC (1 个回答) 已关闭 9 年前。 我正在构建一个 4 人网络扑克游戏。我首先创
我有大量的 IgniteRunnable,我将使用 IgniteCompute 执行它们。它们看起来如下: public class MyIgniteRunnable implements Ignit
我在有机架可用的地方使用 Flink DataStream API,我想通过机架 ID 计算温度组的“平均值”。我的窗口持续时间是 40 秒,我的窗口每 10 秒滑动一次......下面是我的代码,我
我是 Flink 的新手,试图了解如何最有效地使用它。 我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换为案例类,因此 case class IncomingDataUni
我有一个来自 Kafka 的消息流,如下所示 DataStream messageStream = env .addSource(new FlinkKafkaConsumer09<>(topic,
我是 Flink 的新手,试图了解如何最有效地使用它。 我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换为案例类,因此 case class IncomingDataUni
我有一个来自 Kafka 的消息流,如下所示 DataStream messageStream = env .addSource(new FlinkKafkaConsumer09<>(topic,
我刚刚开始使用 Scala 来使用 Apache Flink。有人可以告诉我如何从我拥有的当前数据流创建滞后流(滞后于 k 个事件或 k 个时间单位)吗? 基本上,我想在数据流上实现一个自动回归模型(
我一直在为自己和一些 friend 开发聊天客户端,并决定尝试添加功能以允许客户端之间的文件传输。我可以发送文件,但它到达时的状态与发送的文件不同。例如,这里是一个比较发送图像之前和之后的链接: /i
我想实现一个具有两个输入流并从每个流中获取一个项目以同时处理两个输入流的运算符,例如加入。此外,如果两个输入之一没有任何数据,运算符(operator)将阻塞并等待它。 如果我必须这样做,涉及哪些类(
我正在使用 Royal Mail Shipping API 来“创建发货请求”和创建“打印标签”。 打印标签请求可以通过多种方式完成,我想以 PNG 格式获取此打印标签,使用 API 通过将“PNG”
我是一名优秀的程序员,十分优秀!