- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
系列文章目录和关于我 。
最近笔者使用flink实现一些实时数据清洗(从kafka清洗数据写入到clickhouse)的功能,在编写flink作业后进行上传,发现运行的时候抛出:java.io.NotSerializableException,错误消息可能类似于 “org.apache.flink.streaming.api.functions.MapFunction implementation is not serializable”的错误。该错误引起了我的好奇:
Apache Flink 是一个开源的分布式流批一体化处理框架。它能高效地处理无界(例如:前端埋点数据,只要用户在使用那么会源源不断的产生数据)和有界(例如:2024年的所有交易数据)数据流,并且提供了准确的结果,即使在面对乱序或者延迟的数据时也能很好地应对。Flink 在大数据处理领域应用广泛,可用于实时数据分析、事件驱动型应用、数据管道等多种场景.
如下是一个典型数据管道应用 。
public class SimpleDataPipelineExample {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 定义数据源,这里简单模拟一个包含字符串的集合作为数据源
// 可以想象这里是从kafka中读取数据
DataStream<String> inputDataStream = env.fromElements("hello", "world", "flink");
// 3. 对数据进行转换操作,这里将每个字符串转换为大写形式
// 这里要去map(xxx),filter(xxx) 可以序列化
DataStream<String> transformedDataStream = inputDataStream
.map(String::toUpperCase)
.filter(s->s.length(s)>0);
// 4. 定义输出,
// flink中addSink就是定义数据最终存储到何处
transformedDataStream.addSink(new org.apache.flink.streaming.api.functions.sink.PrintSinkFunction<>());
// 5. 执行任务
env.execute("Simple Data Pipeline Example");
}
}
可以看到flink中的编程方式有点类似于java8中的stream,但是我们编写stream流代码的时候,并不需要刻意关注流中的function interface对象是否要序列化,那么flink为什么强制要求能序列化呢?
分布式环境下的任务分发与执行需求 。
DataStream
或DataSet
上应用map
、filter
等操作时,这些操作对应的函数(如MapFunction
、FilterFunction
)定义了具体的数据处理逻辑。DataStream
并应用了map
操作,其map
函数是对输入数据进行某种复杂的转换。这个map
函数需要被序列化,以便可以传输到其他节点,从而在整个集群中正确地执行数据转换任务。解释完为什么flink要序列化map,filter这些function interface对象,接下来用一个简单例子来分析下方法引用和lambda如何被序列化 。
public class SimpleTest {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3);
list.stream().filter(e -> e % 2 == 0)// 这是一个lambda表式
.map(String::valueOf)//这是一个方法引用
.forEach(System.out::println);
}
}
如下是一个Java对象使用ObjectOutputStream进行序列化,并打印序列化内容的例子 。
import java.io.*;
import java.util.Arrays;
import java.util.List;
public class SimpleTest2 {
static class Test implements Serializable {
private int a;
public int getA() {
return a;
}
public void setA(int a) {
this.a = a;
}
}
public static void main(String[] args) throws Exception {
Test t = new Test();
t.setA(1);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(10000);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(t);
objectOutputStream.flush();
objectOutputStream.close();
System.out.println(byteArrayOutputStream.toString());
}
}
可以看到会判断对象是不是实现了Serializable,没有实现会抛出异常 。
如果实现了那么先写类的描述信息(类名,是否可序列化,字段个数等等)进一步判断是否实现了Externalizable,Externalizable支持我们自定义序列化和反序列化的方法,接着会写每一个字段的值:
可以看到本质上类似于JSON序列化,有自己的对象序列化协议.
Java中一切皆对象,虽然方法引用和lambda看似和对象不同(没有被new出来)但是本质上仍然是一个对象。可以通过下面两张方式验证:
idea断点 。
可以看到是一个SimpleTest$$Lambda$xx类的实例对象 。
字节码层面 。
可以看到filter对应的lamda最终会调用SimpleTest.lambda$main$0(Ljava/lang/Integer;)Z,方法引用则有所不同调用并没有生成一个独特的方法?这是为什么呢?
main
方法(或其他非实例方法)内部定义的 Lambda 表达式,它会生成一个静态私有方法来实现 Lambda 表达式的逻辑。这是因为在这个场景下,没有合适的实例来关联这个 Lambda 表达式的逻辑。以filter(e -> e % 2 == 0)
为例,这个 Lambda 表达式的逻辑需要一个独立的方法来承载。lambda$main$0
,其中main
表示所在的主方法,0
表示这是在main
方法中生成的第一个 Lambda 表达式对应的方法。这种命名方式有助于编译器在内部管理和引用这些自动生成的方法。String::valueOf
),它不需要像 Lambda 表达式那样生成一个新的静态方法。这是因为方法引用本身就是指向一个已经存在的方法。在字节码生成过程中,字节码指令会直接利用这个已有的方法。INVOKEDYNAMIC apply()Ljava/util/function/Function;
部分为例,字节码通过java/lang/String.valueOf(Ljava/lang/Object;)Ljava/lang/String;
直接指向了String
类中已有的valueOf
方法,这个方法会在map
操作的实际执行过程中被调用,用于将流中的元素转换为字符串。它不需要像 Lambda 表达式那样额外生成一个新的方法来承载逻辑,因为方法引用所引用的方法已经有了明确的定义和实现。至此我们明白了方法引用和lambda是如何执行的——Lambda 表达式生成静态方法,方法引用则是调用INVOKESTATIC指令调用到对应的方法 。
那么lambda和方法引用对应生成的对象在哪里呢?
INVOKEDYNAMIC apply()Ljava/util/function/Function; [
// handle kind 0x6 : INVOKESTATIC
java/lang/invoke/LambdaMetafactory.metafactory(Ljava/lang/invoke/MethodHandles$Lookup;Ljava/lang/String;Ljava/lang/invoke/MethodType;Ljava/lang/invoke/MethodType;Ljava/lang/invoke/MethodHandle;Ljava/lang/invoke/MethodType;)Ljava/lang/invoke/CallSite;
// arguments:
(Ljava/lang/Object;)Ljava/lang/Object;,
// handle kind 0x6 : INVOKESTATIC
java/lang/String.valueOf(Ljava/lang/Object;)Ljava/lang/String;,
(Ljava/lang/Integer;)Ljava/lang/String;
]
如上的字节码对应stream中的map执行 。
INVOKEDYNAMIC 指令的核心作用之一就是在运行时动态地生成对象(准确说是生成调用点 CallSite 以及对应的可调用对象等相关机制来实现类似生成对象的效果),用于适配相应的函数式接口,比如这里的 Function 接口.
java/lang/invoke/LambdaMetafactory.metafactory 方法在这个过程中起着关键作用,下面来详细解析一下它相关参数对应的逻辑以及整体是如何实现生成符合要求对象的:
MethodHandles$Lookup
参数:它提供了一种查找和访问方法的机制,决定了可以访问哪些类以及这些类中的哪些方法等权限相关内容。简单来说,它用于定位后续所涉及方法的 “查找上下文”,确保能够正确找到要使用的方法。String
参数:通常是一个名称,用于标识生成的这个调用点(CallSite
)相关的逻辑等,不过在实际常见使用场景下,它的作用相对不是特别直观地体现给开发者。MethodType
参数(多个):
MethodType
描述了所生成的函数式接口实现的方法整体的类型签名,比如对于 Function
接口对应的这里就是 (Ljava/lang/Object;)Ljava/lang/Object;
,意味着生成的实现 Function
接口的对象其 apply
方法接收一个 Object
类型的对象作为输入,然后返回一个 Object
类型的对象作为输出(这是从通用、抽象层面描述的接口方法签名情况)。MethodType
对应着具体实现逻辑的方法(也就是实际指向的那个已有方法或者对应的 Lambda 表达式转化后的方法等)的类型签名,像此处指向 java/lang/String.valueOf
方法,其签名是 (Ljava/lang/Object;)Ljava/lang/String;
,表明它接收一个 Object
类型的输入并返回一个 String
类型的输出。MethodType
则再次强调了在具体使用场景下(结合当前流中元素类型等实际情况)的方法签名,比如这里针对 map
操作中流里是 Integer
类型元素,所以是 (Ljava/lang/Integer;)Ljava/lang/String;
,也就是说明这个动态生成的 Function
接口实现对象在应用于当前 map
操作时,其 apply
方法接收 Integer
类型的输入并返回 String
类型的输出。MethodHandle
参数:它用于指向具体实现逻辑的方法,在这个例子中就是指向 java/lang/String.valueOf
这个已有的静态方法,相当于告诉 LambdaMetafactory
具体通过调用哪个方法来实现 Function
接口的 apply
方法所要求的逻辑。LambdaMetafactory.metafactory
方法基于这些参数,在运行时会根据函数式接口(这里是 Function
接口)的定义以及所指定的具体实现逻辑(通过 String.valueOf
方法),动态地构造出一个符合该接口要求的对象(也就是实现了 Function
接口,并且其 apply
方法在调用时会按照指向的 String.valueOf
方法来执行相应逻辑)。这个生成的对象随后就能被用于像 map
操作这样的场景中,作为 Stream
中 map
方法的参数,使得流里的元素可以按照这个 Function
接口实现对象所定义的逻辑进行转换。类似地,对于 filter 操作对应的 Predicate 接口,也是通过同样的机制,只是具体的参数(比如方法签名、指向的实现逻辑对应的方法等)会根据对应的 Lambda 表达式或具体实现方法有所不同,来生成符合 Predicate 接口要求的对象,进而用于流元素的筛选操作。 所以说,INVOKEDYNAMIC 结合 LambdaMetafactory.metafactory 的这套机制就是在字节码层面实现了在运行时动态生成适配函数式接口对象的关键所在.
import java.lang.invoke.CallSite;
import java.lang.invoke.LambdaMetafactory;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.function.Function;
public class LambdaMetafactoryCallSiteExample {
public static void main(String[] args) throws Throwable {
// 1. 获取查找上下文(caller),代表调用者的查找上下文及访问权限
MethodHandles.Lookup lookup = MethodHandles.lookup();
// 2. 定义invokedName,即要实现的方法名称,这里对应Function接口的apply方法名
String invokedName = "apply";
// 3. 定义invokedType,CallSite预期的签名,返回类型是要实现的接口(这里是Function接口)
// 参数类型(这里无捕获变量,所以为空),返回类型为Function接口类型
MethodType invokedType = MethodType.methodType(Function.class);
// 4. 定义samMethodType,函数对象要实现的方法的签名和返回类型
// 对于Function接口的apply方法,接收Object类型参数,返回Object类型结果
MethodType samMethodType = MethodType.methodType(Object.class, Object.class);
// 5. 定义implMethod,指向具体实现逻辑的方法句柄,即String类的静态方法valueOf
MethodHandle implMethodHandle = lookup.findStatic(String.class, "valueOf", MethodType.methodType(String.class, Object.class));
// 6. 定义instantiatedMethodType,调用时动态强制执行的签名和返回类型,这里和samMethodType保持一致
MethodType instantiatedMethodType = samMethodType;
// 7. 使用LambdaMetafactory.metafactory生成CallSite
CallSite callSite = LambdaMetafactory.metafactory(
lookup,
invokedName,
invokedType,
samMethodType,
implMethodHandle,
instantiatedMethodType
);
// 8. 获取生成的函数式接口实例(这里是Function接口实例)
Function<Object, String> function = (Function<Object, String>) callSite.getTarget().invoke();
// 9. 使用生成的函数式接口实例进行操作
String result = function.apply(42);
System.out.println("Result: " + result);
}
}
至此我们明白了Stream.map传入方法引用的时候,其实是使用LambdaMetafactory.metafactory生成callSite然后生成Function,这个Function保存在流的内部,当流开始执行的时候会调用Function对应的方法 。
如上字节码对应filter的执行逻辑 。
可以看到这里其实是用了INVOKESTATIC来调用SimpleTest.lambda$main$0方法,也就说说filter的执行类似map,也是用LambdaMetafactory.metafactory生成callSite然后生成Function,但是这个Function的执行是使用INVOKESTATIC来执行生成的SimpleTest.lambda$main$0方法.
INVOKESTATIC指令的核心功能就是发起对一个类中静态方法的调用操作。它允许在字节码层面直接指定要调用的类以及对应的静态方法,并且按照方法定义传递相应的参数,执行完该静态方法后,根据方法的返回类型获取返回结果(如果有返回值的话) 。
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
public class SimpleTest {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3);
list.stream().filter(e -> e % 2 == 0)
.map(String::valueOf)
.forEach(System.out::println);
for (Method method : SimpleTest.class.getDeclaredMethods()) {
System.out.println(method.getName());
}
}
}
执行这段程序可以看到输出了 。
2//流的打印
main//SimpleTest中有main方法
lambda$main$0//还有个叫lambda$main$0的方法
该类的字节码也可以看到存在lambda$main$0(表示是main方法中第一个lambda) 。
在 Java 中,Lambda 表达式本质上是一种匿名函数的语法糖,编译器会将其转换为一个对应的方法,并在合适的地方生成相应的字节码来调用这个方法.
具体是如何生成方法对应字节码的,这就是JVM对应功能实现了,笔者还没有进一步查看JVM源码.
lambda和方法引用是Java8新增的语法糖,针对Java开发者来说提供了函数式编程更加简洁的写法,虽然看起来和原来面向命令编程有很大的区别,但是底层还是Java 方法调用那一套.
新语法糖的引入并没有打破底层原有逻辑,而是通过引入新的INVOKEDYNAMIC和LambdaMetafactory.metafactory 将新语法糖嫁接到原来的方法调用实现上,这也是一种开闭原则的体现,这样实现的好处是:影响面可控,如果开发一个新功能要打破原有架构,原有代码,那么回归覆盖测试的范围将不可控。另外lambda和方法底层的使用对开发者完全透明,对开发者友好.
最后此篇关于方法引用与lambda底层原理&Java方法引用、lambda能被序列化么?的文章就讲到这里了,如果你想了解更多关于方法引用与lambda底层原理&Java方法引用、lambda能被序列化么?的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
本文全面深入地探讨了Docker容器通信技术,从基础概念、网络模型、核心组件到实战应用。详细介绍了不同网络模式及其实现,提供了容器通信的技术细节和实用案例,旨在为专业从业者提供深入的技术洞见和实
📒博客首页:崇尚学技术的科班人 🍣今天给大家带来的文章是《Dubbo快速上手 -- 带你了解Dubbo使用、原理》🍣 🍣希望各位小伙伴们能够耐心的读完这篇文章🍣 🙏博主也在学习阶段,如若发
一、写在前面 我们经常使用npm install ,但是你是否思考过它内部的原理是什么? 1、执行npm install 它背后帮助我们完成了什么操作? 2、我们会发现还有一个成为package-lo
Base64 Base64 是什么?是将字节流转换成可打印字符、将可打印字符转换为字节流的一种算法。Base64 使用 64 个可打印字符来表示转换后的数据。 准确的来说,Base64 不算
目录 协程定义 生成器和yield语义 Future类 IOLoop类 coroutine函数装饰器 总结 tornado中的
切片,这是一个在go语言中引入的新的理念。它有一些特征如下: 对数组抽象 数组长度不固定 可追加元素 切片容量可增大 容量大小成片增加 我们先把上面的理念整理在这
文章来源:https://sourl.cn/HpZHvy 引 言 本文主要论述的是“RPC 实现原理”,那么首先明确一个问题什么是 RPC 呢?RPC 是 Remote Procedure Call
源码地址(包含所有与springmvc相关的,静态文件路径设置,request请求入参接受,返回值处理converter设置等等): spring-framework/WebMvcConfigurat
请通过简单的java类向我展示一个依赖注入(inject)原理的小例子虽然我已经了解了spring,但是如果我需要用简单的java类术语来解释它,那么你能通过一个简单的例子向我展示一下吗?提前致谢。
1、背景 我们平常使用手机和电脑上网,需要访问公网上的网络资源,如逛淘宝和刷视频,那么手机和电脑是怎么知道去哪里去拿到这个网络资源来下载到本地的呢? 就比如我去食堂拿吃的,我需要
大家好,我是飞哥! 现在 iptables 这个工具的应用似乎是越来越广了。不仅仅是在传统的防火墙、NAT 等功能出现,在今天流行的的 Docker、Kubernets、Istio 项目中也经
本篇涉及到的所有接口在公开文档中均无,需要下载 GitHub 上的源码,自己创建私有类的文档。 npm run generateDocumentation -- --private yarn gene
我最近在很多代码中注意到人们将硬编码的配置(如端口号等)值放在类/方法的深处,使其难以找到,也无法配置。 这是否违反了 SOLID 原则?如果不是,我是否可以向我的团队成员引用另一个“原则”来说明为什
我是 C#、WPF 和 MVVM 模式的新手。很抱歉这篇很长的帖子,我试图设定我所有的理解点(或不理解点)。 在研究了很多关于 WPF 提供的命令机制和 MVVM 模式的文本之后,我在弄清楚如何使用这
可比较的 jQuery 函数 $.post("/example/handler", {foo: 1, bar: 2}); 将创建一个带有 post 参数 foo=1&bar=2 的请求。鉴于 $htt
如果Django不使用“延迟查询执行”原则,主要问题是什么? q = Entry.objects.filter(headline__startswith="What") q = q.filter(
我今天发现.NET框架在做计算时遵循BODMAS操作顺序。即计算按以下顺序进行: 括号 订单 部门 乘法 添加 减法 但是我四处搜索并找不到任何文档确认 .NET 绝对 遵循此原则,是否有此类文档?如
已结束。此问题不符合 Stack Overflow guidelines .它目前不接受答案。 我们不允许提出有关书籍、工具、软件库等方面的建议的问题。您可以编辑问题,以便用事实和引用来回答它。 关闭
API 回顾 在创建 Viewer 时可以直接指定 影像供给器(ImageryProvider),官方提供了一个非常简单的例子,即离屏例子(搜 offline): new Cesium.Viewer(
As it currently stands, this question is not a good fit for our Q&A format. We expect answers to be
我是一名优秀的程序员,十分优秀!