- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
RPC 是远过程调用(Remote Procedure Call)的缩写形式,其区别于一个程序内部基本的过程调用(或者叫函数/方法调用).
随着应用程序变得越来越复杂,在单个机器上中仅通过一个进程来运行整个应用程序的方式已经难以满足现实中日益增长的需求。 开发者对应用程序进行模块化的拆分,以分布式部署的方式来降低程序整体的复杂度和提升性能方面的可拓展性(分而治之的思想).
拆分后部署在不同机器上的各个模块无法像之前那样通过内存寻址的方式来互相访问,而是需要通过网络来进行通信。 RPC最主要的功能就是在提供不同模块服务间的网络通信能力的同时,又尽可能的不丢失本地调用时语义的简洁性。rpc可以认为是分布式系统中类似人体经络一样的基础设施,因此有必要对其工作原理有一定的了解.
要学习rpc的原理,理论上最好的办法就是去看流行的开源框架源码。但dubbo这样成熟的rpc框架由于已经迭代了很多年,为了满足多样的需求而有着复杂的架构和庞大的代码量。对于普通初学者来说往往很难从层层抽象封装中把握住关于rpc框架最核心的内容.
MyRpc是我最近在学习MIT6.824分布式系统公开课时,使用java并基于netty实现的一个简易rpc框架,实现的过程中许多地方都参考了dubbo以及一些demo级别的rpc框架。 MyRpc是demo级别的框架,理解起来会轻松不少。在对基础的rpc实现原理有一定了解后,能对后续研究dubbo等开源rpc框架带来很大的帮助.
目前MyRpc实现了以下功能 。
限于篇幅,以上功能会拆分为两篇博客分别介绍。其中前3个功能实现了基本的点对点通信的rpc功能,将在本篇博客中结合源码详细分析.
MyRpc架构图 。
MyRpc是以netty为基础的,下面展示一个最基础的netty客户端/服务端交互的demo.
netty服务端:
/**
* 最原始的netty服务端demo
* */
public class PureNettyServer {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
EventLoopGroup workerGroup = new NioEventLoopGroup(8,new DefaultThreadFactory("NettyServerWorker", true));
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 实际调用业务方法的处理器
.addLast("serverHandler", new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf requestByteBuf) {
String requestStr = requestByteBuf.toString(CharsetUtil.UTF_8);
System.out.println("PureNettyServer read request=" + JsonUtil.json2Obj(requestStr, User.class));
// 服务端响应echo
ByteBuf byteBuf = Unpooled.copiedBuffer("echo:" + requestStr,CharsetUtil.UTF_8);
channelHandlerContext.writeAndFlush(byteBuf);
}
})
;
}
});
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 8888).sync();
System.out.println("netty server started!");
// 一直阻塞在这里
channelFuture.channel().closeFuture().sync();
}
}
netty客户端:
/**
* 最原始的netty客户端demo
* */
public class PureNettyClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8,
new DefaultThreadFactory("NettyClientWorker", true));
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
.addLast("clientHandler", new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf responseByteBuf) {
String responseStr = responseByteBuf.toString(CharsetUtil.UTF_8);
System.out.println("PureNettyClient received response=" + responseStr);
}
})
;
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
Channel channel = channelFuture.sync().channel();
// 发送一个user对象的json串
User user = new User("Tom",10);
ByteBuf requestByteBuf = Unpooled.copiedBuffer(JsonUtil.obj2Str(user), CharsetUtil.UTF_8);
channel.writeAndFlush(requestByteBuf);
System.out.println("netty client send request success!");
channelFuture.channel().closeFuture().sync();
}
}
上面展示了一个最基础的netty网络通信的demo,似乎一个点对点的传输功能已经得到了良好的实现。 但作为一个rpc框架,还需要解决tcp传输层基于字节流的消息黏包/拆包问题.
操作系统实现的传输层tcp协议中,向上层的应用保证尽最大可能的(best effort delivery)、可靠的传输字节流,但并不关心实际传输的数据包是否总是符合应用层的要求.
一个数据包中可能同时存在黏包问题和拆包问题(如下图所示) 黏包拆包示意图 。
解决黏包/拆包问题最核心的思路是,如何知道一个应用层完整请求的边界。 对于黏包问题,基于边界可以独立的拆分出每一个请求;对于拆包问题,如果发现收到的数据包末尾没有边界,则继续等待新的数据包,直到发现边界后再一并上交给应用程序.
主流的解决黏包拆包的应用层协议设计方案有三种:
介绍 | 优点 | 缺点 | |
---|---|---|---|
1.基于固定长度的协议 | 每个消息都是固定的大小,如果实际上小于固定值,则需要填充 | 简单;易于实现 | 固定值过大,填充会浪费大量传输带宽;固定值过小则限制了可用的消息体大小 |
2.基于特殊分隔符的协议 | 约定一个特殊的分隔符,以这个分割符为消息边界 | 简单;且消息体长度是可变的,性能好 | 消息体的业务数据不允许包含这个特殊分隔符,否则会错误的拆分数据包。因此兼容性较差 |
3.基于业务数据长度编码的协议 | 设计一个固定大小的消息请求头(比如固定16字节、20字节大小),在消息请求头中包含实际的业务消息体长度 | 消息体长度可变,性能好;对业务数据内容无限制,兼容性也好 | 实现起来稍显复杂 |
对于流行的rpc框架,一般都是选用性能与兼容性皆有的方案3:即自己设计一个固定大小的、包含了请求体长度字段的请求头。MyRpc参考dubbo,也设计了一个固定16字节大小的请求头(里面有几个字段暂时没用上).
请求头: MessageHeader 。
/**
* 共16字节的请求头
* */
public class MessageHeader implements Serializable {
public static final int MESSAGE_HEADER_LENGTH = 16;
public static final int MESSAGE_SERIALIZE_TYPE_LENGTH = 5;
public static final short MAGIC = (short)0x2233;
// ================================ 消息头 =================================
/**
* 魔数(占2字节)
* */
private short magicNumber = MAGIC;
/**
* 消息标识(0代表请求事件;1代表响应事件, 占1位)
* @see MessageFlagEnums
* */
private Boolean messageFlag;
/**
* 是否是双向请求(0代表oneWay请求;1代表twoWay请求)
* (双向代表客户端会等待服务端的响应,单向则请求发送完成后即向上层返回成功)
* */
private Boolean twoWayFlag;
/**
* 是否是心跳消息(0代表正常消息;1代表心跳消息, 占1位)
* */
private Boolean eventFlag;
/**
* 消息体序列化类型(占5位,即所支持的序列化类型不得超过2的5次方,32种)
* @see MessageSerializeType
* */
private Boolean[] serializeType;
/**
* 响应状态(占1字节)
* */
private byte responseStatus;
/**
* 消息的唯一id(占8字节)
* */
private long messageId;
/**
* 业务数据长度(占4字节)
* */
private int bizDataLength;
}
完整的消息对象: MessageProtocol 。
public class MessageProtocol<T> implements Serializable {
/**
* 请求头
* */
private MessageHeader messageHeader;
/**
* 请求体(实际的业务消息对象)
* */
private T bizDataBody;
}
MyRpc消息示例图 。
/**
* rpc请求对象
* */
public class RpcRequest implements Serializable {
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
/**
* 消息的唯一id(占8字节)
* */
private final long messageId;
/**
* 接口名
* */
private String interfaceName;
/**
* 方法名
* */
private String methodName;
/**
* 参数类型数组(每个参数一项)
* */
private Class<?>[] parameterClasses;
/**
* 实际参数对象数组(每个参数一项)
* */
private Object[] params;
public RpcRequest() {
// 每个请求对象生成时都自动生成单机全局唯一的自增id
this.messageId = INVOKE_ID.getAndIncrement();
}
}
/**
* rpc响应对象
* */
public class RpcResponse implements Serializable {
/**
* 消息的唯一id(占8字节)
* */
private long messageId;
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;
}
在上一节的netty demo中的消息处理器中,一共做了两件事情;一是将原始数据包的字节流转化成了应用程序所需的String对象;二是拿到String对象后进行响应的业务处理(比如打印在控制台上)。 而netty框架允许配置多个消息处理器组成链条,按约定的顺序处理出站/入站的消息;因此从模块化的出发,应该将编码/解码的逻辑和实际业务的处理拆分成多个处理器.
在自定义的消息编码器、解码器中进行应用层请求/响应数据的序列化/反序列化,同时处理上述的黏包/拆包问题.
编解码工具类 。
public class MessageCodecUtil {
/**
* 报文协议编码
* */
public static <T> void messageEncode(MessageProtocol<T> messageProtocol, ByteBuf byteBuf) {
MessageHeader messageHeader = messageProtocol.getMessageHeader();
// 写入魔数
byteBuf.writeShort(MessageHeader.MAGIC);
// 写入消息标识
byteBuf.writeBoolean(messageHeader.getMessageFlag());
// 写入单/双向标识
byteBuf.writeBoolean(messageHeader.getTwoWayFlag());
// 写入消息事件标识
byteBuf.writeBoolean(messageHeader.getEventFlag());
// 写入序列化类型
for(boolean b : messageHeader.getSerializeType()){
byteBuf.writeBoolean(b);
}
// 写入响应状态
byteBuf.writeByte(messageHeader.getResponseStatus());
// 写入消息uuid
byteBuf.writeLong(messageHeader.getMessageId());
// 序列化消息体
MyRpcSerializer myRpcSerializer = MyRpcSerializerManager.getSerializer(messageHeader.getSerializeType());
byte[] bizMessageBytes = myRpcSerializer.serialize(messageProtocol.getBizDataBody());
// 获得并写入消息正文长度
byteBuf.writeInt(bizMessageBytes.length);
// 写入消息正文内容
byteBuf.writeBytes(bizMessageBytes);
}
/**
* 报文协议header头解码
* */
public static MessageHeader messageHeaderDecode(ByteBuf byteBuf){
MessageHeader messageHeader = new MessageHeader();
// 读取魔数
messageHeader.setMagicNumber(byteBuf.readShort());
// 读取消息标识
messageHeader.setMessageFlag(byteBuf.readBoolean());
// 读取单/双向标识
messageHeader.setTwoWayFlag(byteBuf.readBoolean());
// 读取消息事件标识
messageHeader.setEventFlag(byteBuf.readBoolean());
// 读取序列化类型
Boolean[] serializeTypeBytes = new Boolean[MessageHeader.MESSAGE_SERIALIZE_TYPE_LENGTH];
for(int i=0; i<MessageHeader.MESSAGE_SERIALIZE_TYPE_LENGTH; i++){
serializeTypeBytes[i] = byteBuf.readBoolean();
}
messageHeader.setSerializeType(serializeTypeBytes);
// 读取响应状态
messageHeader.setResponseStatus(byteBuf.readByte());
// 读取消息uuid
messageHeader.setMessageId(byteBuf.readLong());
// 读取消息正文长度
int bizDataLength = byteBuf.readInt();
messageHeader.setBizDataLength(bizDataLength);
return messageHeader;
}
/**
* 报文协议正文body解码
* */
public static <T> T messageBizDataDecode(MessageHeader messageHeader, ByteBuf byteBuf, Class<T> messageBizDataType){
// 读取消息正文
byte[] bizDataBytes = new byte[messageHeader.getBizDataLength()];
byteBuf.readBytes(bizDataBytes);
// 反序列化消息体
MyRpcSerializer myRpcSerializer = MyRpcSerializerManager.getSerializer(messageHeader.getSerializeType());
return (T) myRpcSerializer.deserialize(bizDataBytes,messageBizDataType);
}
}
自定义编码器: NettyEncoder 。
public class NettyEncoder<T> extends MessageToByteEncoder<MessageProtocol<T>> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol<T> messageProtocol, ByteBuf byteBuf) {
// 继承自MessageToByteEncoder中,只需要将编码后的数据写入参数中指定的byteBuf中即可
// MessageToByteEncoder源码逻辑中会自己去将byteBuf写入channel的
MessageCodecUtil.messageEncode(messageProtocol,byteBuf);
}
}
自定义解码器: NettyDecoder 。
/**
* netty 解码器
*/
public class NettyDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(NettyDecoder.class);
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){
// 保存读取前的读指针
int beforeReadIndex = byteBuf.readerIndex();
do{
try {
MessageDecodeResult messageDecodeResult = decodeHeader(byteBuf);
if (messageDecodeResult.isNeedMoreData()) {
// 出现拆包没有读取到一个完整的rpc请求,还原byteBuf读指针,等待下一次读事件
byteBuf.readerIndex(beforeReadIndex);
break;
} else {
// 正常解析完一个完整的message,交给后面的handler处理
list.add(messageDecodeResult.getMessageProtocol());
}
}catch (Exception e){
// 比如decodeHeader里json序列化失败了等等.直接跳过这个数据包不还原了
logger.error("NettyDecoder error!",e);
}
// 循环,直到整个ByteBuf读取完
}while(byteBuf.isReadable());
}
private MessageDecodeResult decodeHeader(ByteBuf byteBuf){
int readable = byteBuf.readableBytes();
if(readable < MessageHeader.MESSAGE_HEADER_LENGTH){
// 无法读取到一个完整的header,说明出现了拆包,等待更多的数据
return MessageDecodeResult.needMoreData();
}
// 读取header头
MessageHeader messageHeader = MessageCodecUtil.messageHeaderDecode(byteBuf);
int bizDataLength = messageHeader.getBizDataLength();
if(byteBuf.readableBytes() < bizDataLength){
// 无法读取到一个完整的正文内容,说明出现了拆包,等待更多的数据
return MessageDecodeResult.needMoreData();
}
// 基于消息类型标识,解析rpc正文对象
boolean messageFlag = messageHeader.getMessageFlag();
if(messageFlag == MessageFlagEnums.REQUEST.getCode()){
RpcRequest rpcRequest = MessageCodecUtil.messageBizDataDecode(messageHeader,byteBuf,RpcRequest.class);
MessageProtocol<RpcRequest> messageProtocol = new MessageProtocol<>(messageHeader,rpcRequest);
// 正确的解析完一个rpc请求消息
return MessageDecodeResult.decodeSuccess(messageProtocol);
}else{
RpcResponse rpcResponse = MessageCodecUtil.messageBizDataDecode(messageHeader,byteBuf,RpcResponse.class);
MessageProtocol<RpcResponse> messageProtocol = new MessageProtocol<>(messageHeader,rpcResponse);
// 正确的解析完一个rpc响应消息
return MessageDecodeResult.decodeSuccess(messageProtocol);
}
}
}
demo的服务示例
public class User implements Serializable {
private String name;
private Integer age;
}
public interface UserService {
User getUserFriend(User user, String message);
}
public class UserServiceImpl implements UserService {
@Override
public User getUserFriend(User user, String message) {
System.out.println("execute getUserFriend, user=" + user + ",message=" + message);
// demo返回一个不同的user对象回去
return new User(user.getName() + ".friend", user.getAge() + 1);
}
}
netty服务端:
public class RpcServer {
private static final Map<String,Object> interfaceImplMap = new HashMap<>();
static{
/**
* 简单一点配置死实现
* */
interfaceImplMap.put(UserService.class.getName(), new UserServiceImpl());
}
public static void main(String[] args) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
EventLoopGroup workerGroup = new NioEventLoopGroup(8,new DefaultThreadFactory("NettyServerWorker", true));
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 编码、解码处理器
.addLast("encoder", new NettyEncoder<>())
.addLast("decoder", new NettyDecoder())
// 实际调用业务方法的处理器
.addLast("serverHandler", new SimpleChannelInboundHandler<MessageProtocol<RpcRequest>>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol<RpcRequest> msg) {
// 找到本地的方法进行调用,并获得返回值(demo,简单起见直接同步调用)
MessageProtocol<RpcResponse> result = handlerRpcRequest(msg);
// 将返回值响应给客户端
ctx.writeAndFlush(result);
}
});
}
});
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 8888).sync();
System.out.println("netty server started!");
// 一直阻塞在这里
channelFuture.channel().closeFuture().sync();
}
private static MessageProtocol<RpcResponse> handlerRpcRequest(MessageProtocol<RpcRequest> rpcRequestMessageProtocol){
long requestMessageId = rpcRequestMessageProtocol.getMessageHeader().getMessageId();
MessageHeader messageHeader = new MessageHeader();
messageHeader.setMessageId(requestMessageId);
messageHeader.setMessageFlag(MessageFlagEnums.RESPONSE.getCode());
messageHeader.setTwoWayFlag(false);
messageHeader.setEventFlag(false);
messageHeader.setSerializeType(rpcRequestMessageProtocol.getMessageHeader().getSerializeType());
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessageId(requestMessageId);
try {
// 反射调用具体的实现方法
Object result = invokeTargetService(rpcRequestMessageProtocol.getBizDataBody());
// 设置返回值
rpcResponse.setReturnValue(result);
}catch (Exception e){
// 调用具体实现类时,出现异常,设置异常的值
rpcResponse.setExceptionValue(e);
}
return new MessageProtocol<>(messageHeader,rpcResponse);
}
private static Object invokeTargetService(RpcRequest rpcRequest) throws Exception {
String interfaceName = rpcRequest.getInterfaceName();
Object serviceImpl = interfaceImplMap.get(interfaceName);
// 按照请求里的方法名和参数列表找到对应的方法
final Method method = serviceImpl.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterClasses());
// 传递参数,反射调用该方法并返回结果
return method.invoke(serviceImpl, rpcRequest.getParams());
}
}
netty客户端:
public class RpcClientNoProxy {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8,
new DefaultThreadFactory("NettyClientWorker", true));
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 编码、解码处理器
.addLast("encoder", new NettyEncoder<>())
.addLast("decoder", new NettyDecoder())
.addLast("clientHandler", new SimpleChannelInboundHandler<MessageProtocol>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol) {
System.out.println("PureNettyClient received messageProtocol=" + messageProtocol);
}
})
;
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
Channel channel = channelFuture.sync().channel();
// 构造消息对象
MessageProtocol<RpcRequest> messageProtocol = buildMessage();
// 发送消息
channel.writeAndFlush(messageProtocol);
System.out.println("RpcClientNoProxy send request success!");
channelFuture.channel().closeFuture().sync();
}
private static MessageProtocol<RpcRequest> buildMessage(){
// 构造请求
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setInterfaceName("myrpc.demo.common.service.UserService");
rpcRequest.setMethodName("getUserFriend");
rpcRequest.setParameterClasses(new Class[]{User.class,String.class});
User user = new User("Jerry",10);
String message = "hello hello!";
rpcRequest.setParams(new Object[]{user,message});
// 构造协议头
MessageHeader messageHeader = new MessageHeader();
messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode());
messageHeader.setTwoWayFlag(false);
messageHeader.setEventFlag(true);
messageHeader.setSerializeType(MessageSerializeType.JSON.getCode());
messageHeader.setMessageId(rpcRequest.getMessageId());
return new MessageProtocol<>(messageHeader,rpcRequest);
}
}
netty处理流程图 。
截止目前,我们已经实现了一个点对点rpc客户端/服务端交互的功能,但是客户端这边的逻辑依然比较复杂(buildMessage方法)。 前面提到,rpc中很重要的功能就是保持本地调用时语义的简洁性,即客户端实际使用时是希望直接用以下这种方式来进行调用,而不是去繁琐的处理底层的网络交互逻辑.
User user = new User("Jerry",10);
String message = "hello hello!";
// 发起rpc调用并获得返回值
User userFriend = userService.getUserFriend(user,message);
System.out.println("userService.getUserFriend result=" + userFriend);
rpc框架需要屏蔽掉构造底层消息发送/接受,序列化/反序列化相关的复杂性,而这时候就需要引入代理模式(动态代理)了。 在MyRpc的底层,我们将客户端需要调用的一个服务(比如UserService)抽象为Consumer对象,服务端的一个具体服务实现抽象为Provider对象。 其中包含了对应的服务的类以及对应的服务地址,客户端这边使用jdk的动态代理生成代理对象,将复杂的、需要屏蔽的消息处理/网络交互等逻辑都封装在这个代理对象中.
public class Consumer<T> {
private Class<?> interfaceClass;
private T proxy;
private Bootstrap bootstrap;
private URLAddress urlAddress;
public Consumer(Class<?> interfaceClass, Bootstrap bootstrap, URLAddress urlAddress) {
this.interfaceClass = interfaceClass;
this.bootstrap = bootstrap;
this.urlAddress = urlAddress;
ClientDynamicProxy clientDynamicProxy = new ClientDynamicProxy(bootstrap,urlAddress);
this.proxy = (T) Proxy.newProxyInstance(
clientDynamicProxy.getClass().getClassLoader(),
new Class[]{interfaceClass},
clientDynamicProxy);
}
public T getProxy() {
return proxy;
}
public Class<?> getInterfaceClass() {
return interfaceClass;
}
}
public class ConsumerBootstrap {
private final Map<Class<?>,Consumer<?>> consumerMap = new HashMap<>();
private final Bootstrap bootstrap;
private final URLAddress urlAddress;
public ConsumerBootstrap(Bootstrap bootstrap, URLAddress urlAddress) {
this.bootstrap = bootstrap;
this.urlAddress = urlAddress;
}
public <T> Consumer<T> registerConsumer(Class<T> clazz){
if(!consumerMap.containsKey(clazz)){
Consumer<T> consumer = new Consumer<>(clazz,this.bootstrap,this.urlAddress);
consumerMap.put(clazz,consumer);
return consumer;
}
throw new MyRpcException("duplicate consumer! clazz=" + clazz);
}
}
public class Provider<T> {
private Class<?> interfaceClass;
private T ref;
private URLAddress urlAddress;
}
/**
* 客户端动态代理
* */
public class ClientDynamicProxy implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(ClientDynamicProxy.class);
private final Bootstrap bootstrap;
private final URLAddress urlAddress;
public ClientDynamicProxy(Bootstrap bootstrap, URLAddress urlAddress) {
this.bootstrap = bootstrap;
this.urlAddress = urlAddress;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object localMethodResult = processLocalMethod(proxy,method,args);
if(localMethodResult != null){
// 处理toString等对象自带方法,不发起rpc调用
return localMethodResult;
}
logger.debug("ClientDynamicProxy before: methodName=" + method.getName());
// 构造请求和协议头
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterClasses(method.getParameterTypes());
rpcRequest.setParams(args);
MessageHeader messageHeader = new MessageHeader();
messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode());
messageHeader.setTwoWayFlag(false);
messageHeader.setEventFlag(true);
messageHeader.setSerializeType(GlobalConfig.messageSerializeType.getCode());
messageHeader.setResponseStatus((byte)'a');
messageHeader.setMessageId(rpcRequest.getMessageId());
logger.debug("ClientDynamicProxy rpcRequest={}", JsonUtil.obj2Str(rpcRequest));
ChannelFuture channelFuture = bootstrap.connect(urlAddress.getHost(),urlAddress.getPort()).sync();
Channel channel = channelFuture.sync().channel();
// 通过Promise,将netty的异步转为同步,参考dubbo DefaultFuture
DefaultFuture<RpcResponse> defaultFuture = DefaultFutureManager.createNewFuture(channel,rpcRequest);
channel.writeAndFlush(new MessageProtocol<>(messageHeader,rpcRequest));
logger.debug("ClientDynamicProxy writeAndFlush success, wait result");
// 调用方阻塞在这里
RpcResponse rpcResponse = defaultFuture.get();
logger.debug("ClientDynamicProxy defaultFuture.get() rpcResponse={}",rpcResponse);
return processRpcResponse(rpcResponse);
}
private Object processLocalMethod(Object proxy, Method method, Object[] args) throws Exception {
// 处理toString等对象自带方法,不发起rpc调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(proxy, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return proxy.toString();
} else if ("hashCode".equals(methodName)) {
return proxy.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return proxy.equals(args[0]);
}
// 返回null标识非本地方法,需要进行rpc调用
return null;
}
private Object processRpcResponse(RpcResponse rpcResponse){
if(rpcResponse.getExceptionValue() == null){
// 没有异常,return正常的返回值
return rpcResponse.getReturnValue();
}else{
// 有异常,往外抛出去
throw new MyRpcRemotingException(rpcResponse.getExceptionValue());
}
}
}
/**
* 客户端 rpc响应处理器
*/
public class NettyRpcResponseHandler extends SimpleChannelInboundHandler<MessageProtocol<RpcResponse>> {
private static final Logger logger = LoggerFactory.getLogger(NettyRpcResponseHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol<RpcResponse> rpcResponseMessageProtocol) throws Exception {
logger.debug("NettyRpcResponseHandler channelRead0={}",JsonUtil.obj2Str(rpcResponseMessageProtocol));
// 触发客户端的future,令其同步阻塞的线程得到结果
DefaultFutureManager.received(rpcResponseMessageProtocol.getBizDataBody());
}
}
public class DefaultFutureManager {
private static Logger logger = LoggerFactory.getLogger(DefaultFutureManager.class);
public static final Map<Long,DefaultFuture> DEFAULT_FUTURE_CACHE = new ConcurrentHashMap<>();
public static void received(RpcResponse rpcResponse){
Long messageId = rpcResponse.getMessageId();
logger.debug("received rpcResponse={},DEFAULT_FUTURE_CACHE={}",rpcResponse,DEFAULT_FUTURE_CACHE);
DefaultFuture defaultFuture = DEFAULT_FUTURE_CACHE.remove(messageId);
if(defaultFuture != null){
logger.debug("remove defaultFuture success");
if(rpcResponse.getExceptionValue() != null){
// 异常处理
defaultFuture.completeExceptionally(rpcResponse.getExceptionValue());
}else{
// 正常返回
defaultFuture.complete(rpcResponse);
}
}else{
logger.debug("remove defaultFuture fail");
}
}
public static DefaultFuture createNewFuture(Channel channel, RpcRequest rpcRequest){
DefaultFuture defaultFuture = new DefaultFuture(channel,rpcRequest);
return defaultFuture;
}
}
public class RpcClientProxy {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8, new DefaultThreadFactory("NettyClientWorker", true));
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 编码、解码处理器
.addLast("encoder", new NettyEncoder<>())
.addLast("decoder", new NettyDecoder())
// 响应处理器
.addLast("clientHandler", new NettyRpcResponseHandler())
;
}
});
ConsumerBootstrap consumerBootstrap = new ConsumerBootstrap(bootstrap, new URLAddress("127.0.0.1", 8888));
Consumer<UserService> userServiceConsumer = consumerBootstrap.registerConsumer(UserService.class);
// 获得UserService的代理对象
UserService userService = userServiceConsumer.getProxy();
User user = new User("Jerry", 10);
String message = "hello hello!";
// 发起rpc调用并获得返回值
User userFriend = userService.getUserFriend(user, message);
System.out.println("userService.getUserFriend result=" + userFriend);
}
}
可以看到,引入了代理模式后的使用方式就变得简单很多了。 到这一步,我们已经实现了一个点对点的rpc通信的能力,并且如博客开头中所提到的,没有丧失本地调用语义的简洁性.
最后此篇关于自己动手实现rpc框架(一)实现点对点的rpc通信的文章就讲到这里了,如果你想了解更多关于自己动手实现rpc框架(一)实现点对点的rpc通信的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我需要开发一个简单的网站,我通常使用 bootstrap CSS 框架,但是我想使用 Gumbyn,它允许我使用 16 列而不是 12 列。 我想知道是否: 我可以轻松地改变绿色吗? 如何使用固定布局
这个问题在这里已经有了答案: 关闭 13 年前。 与直接编写 PHP 代码相比,使用 PHP 框架有哪些优点/缺点?
我开发了一个 Spring/JPA 应用程序:服务、存储库和域层即将完成。 唯一缺少的层是网络层。我正在考虑将 Playframework 2.0 用于 Web 层,但我不确定是否可以在我的 Play
我现有的 struts Web 应用程序具有单点登录功能。然后我将使用 spring 框架创建一个不同的 Web 应用程序。然后想要使用从 struts 应用程序登录的用户来链接新的 spring 应
我首先使用Spark框架和ORMLite处理网页上表单提交的数据,在提交中文字符时看到了unicode问题。我首先想到问题可能是由于ORMLite,因为我的MySQL数据库的字符集已设置为使用utf8
我有一个使用 .Net 4.5 功能的模块,我们的应用程序也适用于 XP 用户。所以我正在考虑将这个 .net 4.5 依赖模块移动到单独的项目中。我怎样才能有一个解决方案,其中有两个项目针对不同的版
我知道这是一个非常笼统的问题,但我想我并不是真的在寻找明确的答案。作为 PHP 框架的新手,我很难理解它。 Javascript 框架,尤其是带有 UI 扩展的框架,似乎通过将 JS 代码与设计分开来
我需要收集一些关于现有 ORM 解决方案的信息。 请随意编写任何编程语言。 你能谈谈你用过的最好的 ORM 框架吗?为什么它比其他的更好? 最佳答案 我使用了 NHibernate 和 Entity
除了 Apple 的 SDK 之外,还有什么强大的 iPhone 框架可供开始开发?有没有可以加快开发时间的方法? 最佳答案 此类框架最大的是Three20 。 Facebook 和许多其他公司都使用
有人可以启发我使用 NodeJS 的 Web 框架吗?我最近开始从免费代码营学习express js,虽然一切进展顺利,但我对express到底是什么感到困惑。是全栈框架吗?纯粹是为了后端吗?我发现您
您可以推荐哪种 Ajax 框架/工具包来构建使用 struts 的 Web 应用程序的 GUI? 最佳答案 我会说你的 AJAX/javascript 库选择应该较少取决于你的后端是如何实现的,而更多
我有生成以下错误的 python 代码: objc[36554]: Class TKApplication is implemented in both /Library/Frameworks/Tk.
首先,很抱歉,如果我问的问题很明显,因为我没有编程背景,那我去吧: 我想运行一系列测试场景并在背景部分声明了几个变量(我打印它们以仔细检查它们是否已正确声明),第一个是整数,另外两个字符串为你可以看到
在我们承担的一个项目中,我们正在寻找一个视频捕获和录制库。我们的基础工作(基于 google 搜索)表明 vlc (libvlc)、ffmpeg (libavcodec) 和 gstreamer 是三
我试过没有运气的情况下寻找某种功能来杀死/中断Play中的正常工作!框架。 我想念什么吗?还是玩了!实际没有添加此功能? 最佳答案 Java stop类中没有像Thread方法那样的东西,由于种种原因
我们希望在我们的系统中保留所有重大事件的记录。例如,在数据库可能存储当前用户状态的地方,事件日志应记录对该状态的所有更改以及更改发生的时间。 事件记录工具应该尽可能接近于事件引发器的零开销,应该容纳结
那里有 ActionScript 2.0/3.0 的测试框架列表吗? 最佳答案 2010-05-18 更新 由于这篇文章有点旧,而且我刚刚收到了赞成票,因此可能值得提供一些更新的信息,这样人们就不会追
我有一个巨大的 numpy 数组列表(一维),它们是不同事件的时间序列。每个点都有一个标签,我想根据其标签对 numpy 数组进行窗口化。我的标签是 0、1 和 2。每个窗口都有一个固定的大小 M。
我是 Play 的新手!并编写了我的第一个应用程序。这个应用程序有一组它依赖的 URL,从 XML 响应中提取数据并返回有效的 URL。 此应用程序需要在不同的环境(Dev、Staging 和 Pro
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 4年前关闭。 Improve thi
我是一名优秀的程序员,十分优秀!