有关如何利用netty开发实现,高性能rpc服务器的一些设计思路、设计原理,以及具体的实现方案。在文章的最后提及到,其实基于该方案设计的RPC服务器的处理性能,还有优化的余地。于是利用周末的时间,在原来NettyRPC框架的基础上,加以优化重构,本次主要优化改造点如下:1、NettyRPC中对RPC消息进行编码、解码采用的是Netty自带的ObjectEncoder、ObjectDecoder(对象编码、解码器),该编码、解码器基于的是java的原生序列化机制,从已有的文章以及测试数据来看,Java的原生序列化性能效率不高,而且产生的序列化二进制码流太大,故本次在优化中,引入RPC消息序列化协议的概念。所谓消息序列化协议,就是针对RPC消息的序列化、反序列化过程进行特殊的定制,引入第三方编解码框架。本次引入的第三方编解码框架有Kryo、Hessian。这里,不得不再次提及一下,对象序列化、反序列化的概念,在RPC的远程服务调用过程中,需要把消息对象通过网络传输,这个就要用到序列化将对象转变成字节流,到达另外一端之后,再反序列化回来变成消息对象。
2、引入Google Guava并发编程框架对NettyRPC的Nio线程池、业务线程池进行重新梳理封装。
3、利用第三方编解码框架(Kryo、Hessian)的时候,考虑到高并发的场景下,频繁的创建、销毁序列化对象,会非常消耗JVM的内存资源,影响整个RPC服务器的处理性能,因此引入对象池化(Object Pooling)技术。众所周知,创建新对象并初始化,可能会消耗很多的时间。当需要产生大量对象的时候,可能会对性能造成一定的影响。为了解决这个问题,除了提升硬件条件之外,对象池化技术就是这方面的银弹,而Apache Commons Pool框架就是对象池化技术的一个很好的实现(开源项目路径:http://commons.apache.org/proper/commons-pool/download_pool.cgi)。本文中的Hessian池化工作,主要是基于Apache Commons Pool框架,进行封装处理。
本文将着重,从上面的三个方面,对重构优化之后的NettyRPC服务器的实现思路、实现方式进行重点讲解。首先请大家简单看下,本次优化之后的NettyRPC服务器支持的序列化协议,如下图所示:
可以很清楚的看到,优化之后的NettyRPC可以支持Kryo、Hessian、Java本地序列化三种消息序列化方式。其中Java本地序列化方式,相信大家应该很熟悉了,再次不再重复讲述。现在我们重点讲述一下,另外两种序列化方式:
1、Kryo序列化。它是针对Java,而定制实现的高效对象序列化框架,相比Java本地原生序列化方式,Kryo在处理性能上、码流大小上等等方面有很大的优化改进。目前已知的很多著名开源项目,都引入采用了该序列化方式。比如alibaba开源的dubbo RPC等等。本文中采用的Kryo的默认版本是基于:kryo-3.0.3。它的下载链接是:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3。为什么采用这个版本?主要原因我上面也说明了,出于应对高并发场景下,频繁地创建、销毁序列化对象,会非常消耗JVM的内存资源、以及时间。Kryo的这个发行版本中,集成引入了序列化对象池功能模块(KryoFactory、KryoPool),这样我们就不必再利用Apache Commons Pool对其进行二次封装。
2、Hessian序列化。Hessian本身是一种序列化协议,它比Java原生的序列化、反序列化速度更快、序列化出来的数据也更小。它是采用二进制格式进行数据传输,而且,目前支持多种语言格式。本文中采用的是:hessian-4.0.37 版本,它的下载链接是:http://hessian.caucho.com/#Java。
接下来,先来看下优化之后的NettyRPC的消息协议编解码包(newlandframework.netty.rpc.serialize.support、newlandframework.netty.rpc.serialize.support.kryo、newlandframework.netty.rpc.serialize.support.hessian)的结构,如下图所示:
其中RPC请求消息结构代码如下:
/**
* @filename:Messagerequest.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc服务请求结构
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.model;
import java.io.Serializable;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;
public class MessageRequest implements Serializable {
private String messageId;
private String className;
private String methodName;
private Class<?>[] typeParameters;
private Object[] parametersVal;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getTypeParameters() {
return typeParameters;
}
public void setTypeParameters(Class<?>[] typeParameters) {
this.typeParameters = typeParameters;
}
public Object[] getParameters() {
return parametersVal;
}
public void setParameters(Object[] parametersVal) {
this.parametersVal = parametersVal;
}
public String toString() {
return ReflectionToStringBuilder.toStringExclude(this, new String[]{"typeParameters", "parametersVal"});
}
RPC应答消息结构,如下所示:
/**
* @filename:MessageResponse.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc服务应答结构
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.model;
import java.io.Serializable;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;
public class MessageResponse implements Serializable {
private String messageId;
private String error;
private Object resultDesc;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public Object getResult() {
return resultDesc;
}
public void setResult(Object resultDesc) {
this.resultDesc = resultDesc;
}
public String toString() {
return ReflectionToStringBuilder.toString(this);
}
}
现在,我们就来对上述的RPC请求消息、应答消息进行编解码框架的设计。由于NettyRPC中的协议类型,目前已经支持Kryo序列化、Hessian序列化、Java原生本地序列化方式。考虑到可扩展性,故要抽象出RPC消息序列化,协议类型对象(RpcSerializeProtocol),它的代码实现如下所示:
/**
* @filename:RpcSerializeProtocol.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:RPC消息序序列化协议类型
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
public enum RpcSerializeProtocol {
//目前由于没有引入跨语言RPC通信机制,暂时采用支持同构语言Java序列化/反序列化机制的第三方插件
//NettyRPC目前已知的序列化插件有:Java原生序列化、Kryo、Hessian
JDKSERIALIZE("jdknative"), KRYOSERIALIZE("kryo"), HESSIANSERIALIZE("hessian");
private String serializeProtocol;
private RpcSerializeProtocol(String serializeProtocol) {
this.serializeProtocol = serializeProtocol;
}
public String toString() {
ReflectionToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
return ReflectionToStringBuilder.toString(this);
}
public String getProtocol() {
return serializeProtocol;
}
}
针对不同编解码序列化的框架(这里主要是指Kryo、Hessian),再抽象、萃取出一个RPC消息序列化/反序列化接口(RpcSerialize)、RPC消息编解码接口(MessageCodecUtil)。
/**
* @filename:RpcSerialize.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:RPC消息序列化/反序列化接口定义
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public interface RpcSerialize {
void serialize(OutputStream output, Object object) throws IOException;
Object deserialize(InputStream input) throws IOException;
}
/**
* @filename:MessageCodecUtil.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:RPC消息编解码接口
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
public interface MessageCodecUtil {
//RPC消息报文头长度4个字节
final public static int MESSAGE_LENGTH = 4;
public void encode(final ByteBuf out, final Object message) throws IOException;
public Object decode(byte[] body) throws IOException;
}
最后我们的NettyRPC框架要能自由地支配、定制Netty的RPC服务端、客户端,采用何种序列化来进行RPC消息对象的网络传输。因此,要再抽象一个RPC消息序列化协议选择器接口(RpcSerializeFrame),对应的实现如下:
/**
* @filename:RpcSerializeFrame.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:RPC消息序序列化协议选择器接口
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support;
import io.netty.channel.ChannelPipeline;
public interface RpcSerializeFrame {
public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline);
}
现在有了上面定义的一系列的接口,现在就可以定制实现,基于Kryo、Hessian方式的RPC消息序列化、反序列化模块了。先来看下整体的类图结构:
首先是RPC消息的编码器MessageEncoder,它继承自Netty的MessageToByteEncoder编码器。主要是把RPC消息对象编码成二进制流的格式,对应实现如下:
/**
* @filename:MessageEncoder.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:RPC消息编码接口
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MessageEncoder extends MessageToByteEncoder<Object> {
private MessageCodecUtil util = null;
public MessageEncoder(final MessageCodecUtil util) {
this.util = util;
}
protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception {
util.encode(out, msg);
}
}
接下来是RPC消息的解码器MessageDecoder,它继承自Netty的ByteToMessageDecoder。主要针对二进制流程序列化成消息对象。当然了,在之前的一篇文章中我曾经提到,NettyRPC是基于TCP协议的,TCP在传输数据的过程中会出现所谓的“粘包”现象,所以我们的MessageDecoder要对RPC消息体的长度进行校验,如果不满足RPC消息报文头中指定的消息体长度,我们直接重置一下ByteBuf读索引的位置,具体可以参考如下的代码方式,进行RPC消息协议的解析:
/**
* @filename:MessageDecoder.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:RPC消息解码接口
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.IOException;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
public class MessageDecoder extends ByteToMessageDecoder {
final public static int MESSAGE_LENGTH = MessageCodecUtil.MESSAGE_LENGTH;
private MessageCodecUtil util = null;
public MessageDecoder(final MessageCodecUtil util) {
this.util = util;
}
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//出现粘包导致消息头长度不对,直接返回
if (in.readableBytes() < MessageDecoder.MESSAGE_LENGTH) {
return;
}
in.markReaderIndex();
//读取消息的内容长度
int messageLength = in.readInt();
if (messageLength < 0) {
ctx.close();
}
//读到的消息长度和报文头的已知长度不匹配。那就重置一下ByteBuf读索引的位置
if (in.readableBytes() < messageLength) {
in.resetReaderIndex();
return;
} else {
byte[] messageBody = new byte[messageLength];
in.readBytes(messageBody);
try {
Object obj = util.decode(messageBody);
out.add(obj);
} catch (IOException ex) {
Logger.getLogger(MessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
现在,我们进一步实现,利用Kryo序列化方式,对RPC消息进行编解码的模块。首先是要实现NettyRPC消息序列化接口(RpcSerialize)的方法。
/**
* @filename:KryoSerialize.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Kryo序列化/反序列化实现
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.kryo;
import newlandframework.netty.rpc.serialize.support.RpcSerialize;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.pool.KryoPool;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class KryoSerialize implements RpcSerialize {
private KryoPool pool = null;
public KryoSerialize(final KryoPool pool) {
this.pool = pool;
}
public void serialize(OutputStream output, Object object) throws IOException {
Kryo kryo = pool.borrow();
Output out = new Output(output);
kryo.writeClassAndObject(out, object);
out.close();
pool.release(kryo);
}
public Object deserialize(InputStream input) throws IOException {
Kryo kryo = pool.borrow();
Input in = new Input(input);
Object result = kryo.readClassAndObject(in);
in.close();
pool.release(kryo);
return result;
}
}
接着利用Kryo库里面的对象池,对RPC消息对象进行编解码。首先是Kryo对象池工厂(KryoPoolFactory),这个也是我为什么选择kryo-3.0.3版本的原因了。代码如下:
/**
* @filename:KryoPoolFactory.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Kryo对象池工厂
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.kryo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import org.objenesis.strategy.StdInstantiatorStrategy;
public class KryoPoolFactory {
private static KryoPoolFactory poolFactory = null;
private KryoFactory factory = new KryoFactory() {
public Kryo create() {
Kryo kryo = new Kryo();
kryo.setReferences(false);
//把已知的结构注册到Kryo注册器里面,提高序列化/反序列化效率
kryo.register(MessageRequest.class);
kryo.register(MessageResponse.class);
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
return kryo;
}
};
private KryoPool pool = new KryoPool.Builder(factory).build();
private KryoPoolFactory() {
}
public static KryoPool getKryoPoolInstance() {
if (poolFactory == null) {
synchronized (KryoPoolFactory.class) {
if (poolFactory == null) {
poolFactory = new KryoPoolFactory();
}
}
}
return poolFactory.getPool();
}
public KryoPool getPool() {
return pool;
}
}
Kryo对RPC消息进行编码、解码的工具类KryoCodecUtil,实现了RPC消息编解码接口(MessageCodecUtil),具体实现方式如下:
/**
* @filename:KryoCodecUtil.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Kryo编解码工具类
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.kryo;
import com.esotericsoftware.kryo.pool.KryoPool;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import com.google.common.io.Closer;
public class KryoCodecUtil implements MessageCodecUtil {
private KryoPool pool;
private static Closer closer = Closer.create();
public KryoCodecUtil(KryoPool pool) {
this.pool = pool;
}
public void encode(final ByteBuf out, final Object message) throws IOException {
try {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
closer.register(byteArrayOutputStream);
KryoSerialize kryoSerialization = new KryoSerialize(pool);
kryoSerialization.serialize(byteArrayOutputStream, message);
byte[] body = byteArrayOutputStream.toByteArray();
int dataLength = body.length;
out.writeInt(dataLength);
out.writeBytes(body);
} finally {
closer.close();
}
}
public Object decode(byte[] body) throws IOException {
try {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
closer.register(byteArrayInputStream);
KryoSerialize kryoSerialization = new KryoSerialize(pool);
Object obj = kryoSerialization.deserialize(byteArrayInputStream);
return obj;
} finally {
closer.close();
}
}
}
最后是,Kryo自己的编码器、解码器,其实只要调用Kryo编解码工具类(KryoCodecUtil)里面的encode、decode方法就可以了。现在贴出具体的代码:
/**
* @filename:KryoDecoder.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Kryo解码器
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.kryo;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageDecoder;
public class KryoDecoder extends MessageDecoder {
public KryoDecoder(MessageCodecUtil util) {
super(util);
}
}
/**
* @filename:KryoEncoder.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Kryo编码器
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.kryo;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageEncoder;
public class KryoEncoder extends MessageEncoder {
public KryoEncoder(MessageCodecUtil util) {
super(util);
}
}
最后,我们再来实现一下,利用Hessian实现RPC消息的编码、解码器代码模块。首先还是Hessian序列化/反序列化实现(HessianSerialize),它同样实现了RPC消息序列化/反序列化接口(RpcSerialize),对应的代码如下:
/**
* @filename:HessianSerialize.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hessian序列化/反序列化实现
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.hessian;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import newlandframework.netty.rpc.serialize.support.RpcSerialize;
public class HessianSerialize implements RpcSerialize {
public void serialize(OutputStream output, Object object) {
Hessian2Output ho = new Hessian2Output(output);
try {
ho.startMessage();
ho.writeObject(object);
ho.completeMessage();
ho.close();
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public Object deserialize(InputStream input) {
Object result = null;
try {
Hessian2Input hi = new Hessian2Input(input);
hi.startMessage();
result = hi.readObject();
hi.completeMessage();
hi.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}
现在利用对象池(Object Pooling)技术,对Hessian序列化/反序列化类(HessianSerialize)进行池化处理,对应的代码如下:
/**
* @filename:HessianSerializeFactory.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hessian序列化/反序列化对象工厂池
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.hessian;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
public class HessianSerializeFactory extends BasePooledObjectFactory<HessianSerialize> {
public HessianSerialize create() throws Exception {
return createHessian();
}
public PooledObject<HessianSerialize> wrap(HessianSerialize hessian) {
return new DefaultPooledObject<HessianSerialize>(hessian);
}
private HessianSerialize createHessian() {
return new HessianSerialize();
}
}
/**
* @filename:HessianSerializePool.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hessian序列化/反序列化池
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.hessian;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
public class HessianSerializePool {
//Netty采用Hessian序列化/反序列化的时候,为了避免重复产生对象,提高JVM内存利用率,故引入对象池技术,经过测试
//遇到高并发序列化/反序列化的场景的时候,序列化效率明显提升不少。
private GenericObjectPool<HessianSerialize> hessianPool;
private static HessianSerializePool poolFactory = null;
private HessianSerializePool() {
hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory());
}
public static HessianSerializePool getHessianPoolInstance() {
if (poolFactory == null) {
synchronized (HessianSerializePool.class) {
if (poolFactory == null) {
poolFactory = new HessianSerializePool();
}
}
}
return poolFactory;
}
//预留接口,后续可以通过Spring Property Placeholder依赖注入
public HessianSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) {
hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory());
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
//最大池对象总数
config.setMaxTotal(maxTotal);
//最小空闲数
config.setMinIdle(minIdle);
//最大等待时间, 默认的值为-1,表示无限等待
config.setMaxWaitMillis(maxWaitMillis);
//退出连接的最小空闲时间 默认1800000毫秒
config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
hessianPool.setConfig(config);
}
public HessianSerialize borrow() {
try {
return getHessianPool().borrowObject();
} catch (final Exception ex) {
ex.printStackTrace();
return null;
}
}
public void restore(final HessianSerialize object) {
getHessianPool().returnObject(object);
}
public GenericObjectPool<HessianSerialize> getHessianPool() {
return hessianPool;
}
}
Hessian序列化对象经过池化处理之后,我们通过Hessian编解码工具类,来“借用”Hessian序列化对象(HessianSerialize),当然了,你借出来之后,一定要还回去嘛。Hessian编解码工具类的实现方式如下:
/**
* @filename:HessianCodecUtil.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hessian编解码工具类
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.hessian;
import com.google.common.io.Closer;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
public class HessianCodecUtil implements MessageCodecUtil {
HessianSerializePool pool = HessianSerializePool.getHessianPoolInstance();
private static Closer closer = Closer.create();
public HessianCodecUtil() {
}
public void encode(final ByteBuf out, final Object message) throws IOException {
try {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
closer.register(byteArrayOutputStream);
HessianSerialize hessianSerialization = pool.borrow();
hessianSerialization.serialize(byteArrayOutputStream, message);
byte[] body = byteArrayOutputStream.toByteArray();
int dataLength = body.length;
out.writeInt(dataLength);
out.writeBytes(body);
pool.restore(hessianSerialization);
} finally {
closer.close();
}
}
public Object decode(byte[] body) throws IOException {
try {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
closer.register(byteArrayInputStream);
HessianSerialize hessianSerialization = pool.borrow();
Object object = hessianSerialization.deserialize(byteArrayInputStream);
pool.restore(hessianSerialization);
return object;
} finally {
closer.close();
}
}
}
最后Hessian对RPC消息的编码器、解码器参考实现代码如下所示:
/**
* @filename:HessianDecoder.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hessian解码器
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.hessian;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageDecoder;
public class HessianDecoder extends MessageDecoder {
public HessianDecoder(MessageCodecUtil util) {
super(util);
}
}
/**
* @filename:HessianEncoder.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hessian编码器
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.serialize.support.hessian;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageEncoder;
public class HessianEncoder extends MessageEncoder {
public HessianEncoder(MessageCodecUtil util) {
super(util);
}
}
到目前为止,NettyRPC所针对的Kryo、Hessian序列化协议模块,已经设计实现完毕,现在我们就要把这个协议,嵌入NettyRPC的核心模块包(newlandframework.netty.rpc.core),下面只给出优化调整之后的代码,其它代码模块的内容,可以参考我上一篇的文章:谈谈如何使用Netty开发实现高性能的RPC服务器。好了,我们先来看下,NettyRPC核心模块包(newlandframework.netty.rpc.core)的层次结构:
先来看下,NettyRPC服务端的实现部分。首先是,Rpc服务端管道初始化(MessageRecvChannelInitializer),跟上一版本对比,主要引入了序列化消息对象(RpcSerializeProtocol),具体实现代码如下:
/**
* @filename:MessageRecvChannelInitializer.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc服务端管道初始化
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import java.util.Map;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> {
private RpcSerializeProtocol protocol;
private RpcRecvSerializeFrame frame = null;
MessageRecvChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) {
this.protocol = protocol;
return this;
}
MessageRecvChannelInitializer(Map<String, Object> handlerMap) {
frame = new RpcRecvSerializeFrame(handlerMap);
}
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
frame.select(protocol, pipeline);
}
}
Rpc服务器执行模块(MessageRecvExecutor)中,默认的序列化采用Java原生本地序列化机制,并且优化了线程池异步调用的层次结构。具体代码如下:
/**
* @filename:MessageRecvExecutor.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc服务器执行模块
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import newlandframework.netty.rpc.model.MessageKeyVal;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {
private String serverAddress;
//默认JKD本地序列化协议
private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;
private final static String DELIMITER = ":";
private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>();
private static ListeningExecutorService threadPoolExecutor;
public MessageRecvExecutor(String serverAddress, String serializeProtocol) {
this.serverAddress = serverAddress;
this.serializeProtocol = Enum.valueOf(RpcSerializeProtocol.class, serializeProtocol);
}
public static void submit(Callable<Boolean> task, ChannelHandlerContext ctx, MessageRequest request, MessageResponse response) {
if (threadPoolExecutor == null) {
synchronized (MessageRecvExecutor.class) {
if (threadPoolExecutor == null) {
threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
}
}
}
ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(task);
//Netty服务端把计算结果异步返回
Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
public void onSuccess(Boolean result) {
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("RPC Server Send message-id respone:" request.getMessageId());
}
});
}
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, threadPoolExecutor);
}
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
try {
MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));
Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();
Set s = rpcServiceObject.entrySet();
Iterator<Map.Entry<String, Object>> it = s.iterator();
Map.Entry<String, Object> entry;
while (it.hasNext()) {
entry = it.next();
handlerMap.put(entry.getKey(), entry.getValue());
}
} catch (ClassNotFoundException ex) {
java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void afterPropertiesSet() throws Exception {
//netty的线程池模型设置成主从线程池模式,这样可以应对高并发请求
//当然netty还支持单线程、多线程网络IO模型,可以根据业务需求灵活配置
ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");
//方法返回到Java虚拟机的可用的处理器数量
int parallel = Runtime.getRuntime().availableProcessors() * 2;
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup(parallel, threadRpcFactory, SelectorProvider.provider());
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new MessageRecvChannelInitializer(handlerMap).buildRpcSerializeProtocol(serializeProtocol))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);
if (ipAddr.length == 2) {
String host = ipAddr[0];
int port = Integer.parseInt(ipAddr[1]);
ChannelFuture future = bootstrap.bind(host, port).sync();
System.out.printf("[author tangjie] Netty RPC Server start success!\nip:%s\nport:%d\nprotocol:%s\n\n", host, port, serializeProtocol);
future.channel().closeFuture().sync();
} else {
System.out.printf("[author tangjie] Netty RPC Server start fail!\n");
}
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
Rpc服务器消息处理(MessageRecvHandler)也跟随着调整:
/**
* @filename:MessageRecvHandler.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc服务器消息处理
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Map;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
public class MessageRecvHandler extends ChannelInboundHandlerAdapter {
private final Map<String, Object> handlerMap;
public MessageRecvHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRequest request = (MessageRequest) msg;
MessageResponse response = new MessageResponse();
MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap);
//不要阻塞nio线程,复杂的业务逻辑丢给专门的线程池
MessageRecvExecutor.submit(recvTask, ctx, request, response);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//网络有异常要关闭通道
ctx.close();
}
}
Rpc服务器消息线程任务处理(MessageRecvInitializeTask)完成的任务也更加单纯,即根据RPC消息的请求报文,利用反射得到最终的计算结果,并把结果写入RPC应答报文结构。代码如下:
/**
* @filename:MessageRecvInitializeTask.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc服务器消息线程任务处理
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.concurrent.Callable;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import org.apache.commons.lang.reflect.MethodUtils;
public class MessageRecvInitializeTask implements Callable<Boolean> {
private MessageRequest request = null;
private MessageResponse response = null;
private Map<String, Object> handlerMap = null;
private ChannelHandlerContext ctx = null;
public MessageResponse getResponse() {
return response;
}
public MessageRequest getRequest() {
return request;
}
public void setRequest(MessageRequest request) {
this.request = request;
}
MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap) {
this.request = request;
this.response = response;
this.handlerMap = handlerMap;
this.ctx = ctx;
}
public Boolean call() {
response.setMessageId(request.getMessageId());
try {
Object result = reflect(request);
response.setResult(result);
return Boolean.TRUE;
} catch (Throwable t) {
response.setError(t.toString());
t.printStackTrace();
System.err.printf("RPC Server invoke error!\n");
return Boolean.FALSE;
}
}
private Object reflect(MessageRequest request) throws Throwable {
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
return MethodUtils.invokeMethod(serviceBean, methodName, parameters);
}
}
刚才说到了,NettyRPC的服务端,可以选择具体的序列化协议,目前是通过硬编码方式实现。后续可以考虑,通过Spring IOC方式,依赖注入。其对应代码如下:
/**
* @filename:RpcRecvSerializeFrame.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:RPC服务端消息序列化协议框架
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.util.Map;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.RpcSerializeFrame;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
import newlandframework.netty.rpc.serialize.support.hessian.HessianCodecUtil;
import newlandframework.netty.rpc.serialize.support.hessian.HessianDecoder;
import newlandframework.netty.rpc.serialize.support.hessian.HessianEncoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoCodecUtil;
import newlandframework.netty.rpc.serialize.support.kryo.KryoDecoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoEncoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoPoolFactory;
public class RpcRecvSerializeFrame implements RpcSerializeFrame {
private Map<String, Object> handlerMap = null;
public RpcRecvSerializeFrame(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
//后续可以优化成通过spring ioc方式注入
public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) {
switch (protocol) {
case JDKSERIALIZE: {
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH));
pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new MessageRecvHandler(handlerMap));
break;
}
case KRYOSERIALIZE: {
KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());
pipeline.addLast(new KryoEncoder(util));
pipeline.addLast(new KryoDecoder(util));
pipeline.addLast(new MessageRecvHandler(handlerMap));
break;
}
case HESSIANSERIALIZE: {
HessianCodecUtil util = new HessianCodecUtil();
pipeline.addLast(new HessianEncoder(util));
pipeline.addLast(new HessianDecoder(util));
pipeline.addLast(new MessageRecvHandler(handlerMap));
break;
}
}
}
}
到目前为止,NettyRPC的服务端的设计实现,已经告一段落。
现在继续实现一下NettyRPC的客户端模块。其中,Rpc客户端管道初始化(MessageSendChannelInitializer)模块调整之后,同样也支持选择具体的消息序列化协议(RpcSerializeProtocol)。代码如下:
/**
* @filename:MessageSendChannelInitializer.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客户端管道初始化
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> {
private RpcSerializeProtocol protocol;
private RpcSendSerializeFrame frame = new RpcSendSerializeFrame();
MessageSendChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) {
this.protocol = protocol;
return this;
}
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
frame.select(protocol, pipeline);
}
}
Rpc客户端执行模块(MessageSendExecutor)代码实现如下:
/**
* @filename:MessageSendExecutor.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客户端执行模块
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import com.google.common.reflect.Reflection;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
public class MessageSendExecutor {
private RpcServerLoader loader = RpcServerLoader.getInstance();
public MessageSendExecutor() {
}
public MessageSendExecutor(String serverAddress, RpcSerializeProtocol serializeProtocol) {
loader.load(serverAddress, serializeProtocol);
}
public void setRpcServerLoader(String serverAddress, RpcSerializeProtocol serializeProtocol) {
loader.load(serverAddress, serializeProtocol);
}
public void stop() {
loader.unLoad();
}
public static <T> T execute(Class<T> rpcInterface) {
return (T) Reflection.newProxy(rpcInterface, new MessageSendProxy<T>());
}
}
Rpc客户端线程任务处理(MessageSendInitializeTask),其中参数增加了协议类型(RpcSerializeProtocol),具体代码如下:
/**
* @filename:MessageSendInitializeTask.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客户端线程任务处理
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
public class MessageSendInitializeTask implements Callable<Boolean> {
private EventLoopGroup eventLoopGroup = null;
private InetSocketAddress serverAddress = null;
private RpcSerializeProtocol protocol;
MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcSerializeProtocol protocol) {
this.eventLoopGroup = eventLoopGroup;
this.serverAddress = serverAddress;
this.protocol = protocol;
}
public Boolean call() {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new MessageSendChannelInitializer().buildRpcSerializeProtocol(protocol));
ChannelFuture channelFuture = b.connect(serverAddress);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class);
RpcServerLoader.getInstance().setMessageSendHandler(handler);
}
}
});
return Boolean.TRUE;
}
}
Rpc客户端消息处理(MessageSendProxy)的实现方式调整重构之后,如下所示:
/**
* @filename:MessageSendProxy.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客户端消息处理
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import java.lang.reflect.Method;
import java.util.UUID;
import newlandframework.netty.rpc.model.MessageRequest;
import com.google.common.reflect.AbstractInvocationHandler;
public class MessageSendProxy<T> extends AbstractInvocationHandler {
public Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
MessageRequest request = new MessageRequest();
request.setMessageId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setTypeParameters(method.getParameterTypes());
request.setParameters(args);
MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();
MessageCallBack callBack = handler.sendRequest(request);
return callBack.start();
}
}
同样,NettyRPC的客户端也是可以选择协议类型的,必须注意的是,NettyRPC的客户端和服务端的协议类型必须一致,才能互相通信。NettyRPC的客户端消息序列化协议框架代码实现方式如下:
/**
* @filename:RpcSendSerializeFrame.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:RPC客户端消息序列化协议框架
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.hessian.HessianCodecUtil;
import newlandframework.netty.rpc.serialize.support.hessian.HessianDecoder;
import newlandframework.netty.rpc.serialize.support.hessian.HessianEncoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoCodecUtil;
import newlandframework.netty.rpc.serialize.support.kryo.KryoDecoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoEncoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoPoolFactory;
import newlandframework.netty.rpc.serialize.support.RpcSerializeFrame;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
public class RpcSendSerializeFrame implements RpcSerializeFrame {
//后续可以优化成通过spring ioc方式注入
public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) {
switch (protocol) {
case JDKSERIALIZE: {
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH));
pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new MessageSendHandler());
break;
}
case KRYOSERIALIZE: {
KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());
pipeline.addLast(new KryoEncoder(util));
pipeline.addLast(new KryoDecoder(util));
pipeline.addLast(new MessageSendHandler());
break;
}
case HESSIANSERIALIZE: {
HessianCodecUtil util = new HessianCodecUtil();
pipeline.addLast(new HessianEncoder(util));
pipeline.addLast(new HessianDecoder(util));
pipeline.addLast(new MessageSendHandler());
break;
}
}
}
}
最后,NettyRPC客户端,要加载NettyRPC服务端的一些上下文(Context)信息。因此,RPC服务器配置加载(RpcServerLoader)的代码重构调整如下:
/**
* @filename:RpcServerLoader.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc服务器配置加载
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
public class RpcServerLoader {
private volatile static RpcServerLoader rpcServerLoader;
private final static String DELIMITER = ":";
//默认采用Java原生序列化协议方式传输RPC消息
private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;
//方法返回到Java虚拟机的可用的处理器数量
private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
//netty nio线程池
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);
private static ListeningExecutorService threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
private MessageSendHandler messageSendHandler = null;
//等待Netty服务端链路建立通知信号
private Lock lock = new ReentrantLock();
private Condition connectStatus = lock.newCondition();
private Condition handlerStatus = lock.newCondition();
private RpcServerLoader() {
}
//并发双重锁定
public static RpcServerLoader getInstance() {
if (rpcServerLoader == null) {
synchronized (RpcServerLoader.class) {
if (rpcServerLoader == null) {
rpcServerLoader = new RpcServerLoader();
}
}
}
return rpcServerLoader;
}
public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {
String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);
if (ipAddr.length == 2) {
String host = ipAddr[0];
int port = Integer.parseInt(ipAddr[1]);
final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, serializeProtocol));
//监听线程池异步的执行结果成功与否再决定是否唤醒全部的客户端RPC线程
Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
public void onSuccess(Boolean result) {
try {
lock.lock();
if (messageSendHandler == null) {
handlerStatus.await();
}
//Futures异步回调,唤醒所有rpc等待线程
if (result == Boolean.TRUE && messageSendHandler != null) {
connectStatus.signalAll();
}
} catch (InterruptedException ex) {
Logger.getLogger(RpcServerLoader.class.getName()).log(Level.SEVERE, null, ex);
} finally {
lock.unlock();
}
}
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, threadPoolExecutor);
}
}
public void setMessageSendHandler(MessageSendHandler messageInHandler) {
try {
lock.lock();
this.messageSendHandler = messageInHandler;
handlerStatus.signal();
} finally {
lock.unlock();
}
}
public MessageSendHandler getMessageSendHandler() throws InterruptedException {
try {
lock.lock();
//Netty服务端链路没有建立完毕之前,先挂起等待
if (messageSendHandler == null) {
connectStatus.await();
}
return messageSendHandler;
} finally {
lock.unlock();
}
}
public void unLoad() {
messageSendHandler.close();
threadPoolExecutor.shutdown();
eventLoopGroup.shutdownGracefully();
}
public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {
this.serializeProtocol = serializeProtocol;
}
}
到目前为止,NettyRPC的主要核心模块的代码,全部呈现出来了。到底经过改良重构之后,NettyRPC服务器的性能如何?还是那句话,实践是检验真理的唯一标准。现在,我们就来启动三台NettyRPC服务器进行验证。具体服务端的配置参数,参考如下:
1、Java原生本地序列化NettyRPC服务器,对应IP为:127.0.0.1:18887。
2、Kryo序列化NettyRPC服务器,对应IP为:127.0.0.1:18888。
3、Hessian序列化NettyRPC服务器,对应IP为:127.0.0.1:18889。
Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved