引言
在完成TCP长联网控制中,客户端断线重新连接是一个很普遍的问题,在我们应用netty完成断线重连时,是不是考虑到过如下所示好多个问题:
- 怎样监视到客户端和服务端联接断掉 ?
- 怎样完成断线后再次联接 ?
- netty客户端进程给多少较为有效 ?
实际上上边全是笔者在做断线重连时所碰到的问题,而 “netty客户端进程给多少较为有效?” 这个问题也是笔者在做断线重新连接时易一个出现异常引起的思索。下边讲下整个过程:
由于这节解读內容具体涉及到在客户端,可是为了更好地阅读者可以运作全部程序流程,因此这儿先得出服务端及公共性的依赖和实体类。
服务端及common编码
maven依赖:
<dependencies>
<!--仅仅使用了spring-boot的日志架构-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.56.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.10.Final</version>
</dependency>
</dependencies>
服务端业务流程解决编码
主要运用于纪录打印出现阶段客户端线程数,当接受到客户端信息内容后回到“hello netty”字符串数组
@ChannelHandler.Sharable
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);
public static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
log.info(\"客户端联接取得成功: client address :{}\", ctx.channel().remoteAddress());
log.info(\"现阶段一共有{}个客户端联接\", channels.size());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(\"server channelRead:{}\", msg);
ctx.channel().writeAndFlush(\"hello netty\");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info(\"channelInactive: client close\");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof java.io.IOException) {
log.warn(\"exceptionCaught: client close\");
} else {
cause.printStackTrace();
}
}
}
服务端心率查验编码
当接受心率”ping”信息内容后,回到客户端’’pong”信息内容。假如客户端在指定的时间段内沒有推送一切信息内容则关掉客户端。
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(\"server channelRead:{}\", msg);
if (msg.equals(\"ping\")) {
ctx.channel().writeAndFlush(\"pong\");
} else {
//由下一个handler解决,实例中则为SimpleServerHandler
ctx.fireChannelRead(msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
//该情况必须相互配合 io.netty.handler.timeout.IdleStateHandler应用
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
//超出特定時间沒有读事情,关掉联接
log.info(\"超出心率時间,关掉和服务端的联接:{}\", ctx.channel().remoteAddress());
//ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
编解码java工具
关键应用jboss-marshalling-serial编解码专用工具,可自主查看其优点和缺点,这儿仅仅实例应用。
public final class MarshallingCodeFactory {
/** 建立Jboss marshalling 视频解码器 */
public static MarshallingDecoder buildMarshallingDecoder() {
//主要参数serial表明建立的是Java编码序列化工企业目标,由jboss-marshalling-serial给予
MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory(\"serial\");
MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
return new MarshallingDecoder(provider, 1024);
}
/** 建立Jboss marshalling 伺服电机 */
public static MarshallingEncoder buildMarshallingEncoder() {
MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory(\"serial\");
MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
return new MarshallingEncoder(provider);
}
}
公共性实体类
public class UserInfo implements Serializable {
private static final long serialVersionUID = 6271330872494117382L;
private String username;
private int age;
public UserInfo() {
}
public UserInfo(String username, int age) {
this.username = username;
this.age = age;
}
//省去getter/setter/toString
}
下边逐渐文中的关键,客户端断开重新连接及其问题思索。
客户端完成
- 一开始运作时必须开展同歩连接,特定连接频次内没有用根据则抛出异常,过程撤出。
- 客户端运行后,打开计划任务,仿真模拟客户端数据信息推送。
客户端业务流程解决handler,接受到信息后,根据日志打印出。
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
private NettyClient client;
public SimpleClientHandler(NettyClient client) {
this.client = client;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(\"client receive:{}\", msg);
}
}
封装形式连接方式、断掉连接方式、getChannel()返回io.netty.channel.Channel用以向服务器端传送数据。boolean connect()是一个同歩连接方式,假如连接取得成功返回true,连接不成功返回false。
public class NettyClient {
private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
private EventLoopGroup workerGroup;
private Bootstrap bootstrap;
private volatile Channel clientChannel;
public NettyClient() {
this(-1);
}
public NettyClient(int threads) {
workerGroup = threads > 0 ? new NioEventLoopGroup(threads) : new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
.handler(new ClientHandlerInitializer(this));
}
public boolean connect() {
log.info(\"试着连接到服务器端: 127.0.0.1:8088\");
try {
ChannelFuture channelFuture = bootstrap.connect(\"127.0.0.1\", 8088);
boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS);
clientChannel = channelFuture.channel();
if (notTimeout) {
if (clientChannel != null && clientChannel.isActive()) {
log.info(\"netty client started !!! {} connect to server\", clientChannel.localAddress());
return true;
}
Throwable cause = channelFuture.cause();
if (cause != null) {
exceptionHandler(cause);
}
} else {
log.warn(\"connect remote host[{}] timeout {}s\", clientChannel.remoteAddress(), 30);
}
} catch (Exception e) {
exceptionHandler(e);
}
clientChannel.close();
return false;
}
private void exceptionHandler(Throwable cause) {
if (cause instanceof ConnectException) {
log.error(\"连接出现异常:{}\", cause.getMessage());
} else if (cause instanceof ClosedChannelException) {
log.error(\"connect error:{}\", \"client has destroy\");
} else {
log.error(\"connect error:\", cause);
}
}
public void close() {
if (clientChannel != null) {
clientChannel.close();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
public Channel getChannel() {
return clientChannel;
}
static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
private NettyClient client;
public ClientHandlerInitializer(NettyClient client) {
this.client = client;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
//pipeline.addLast(new IdleStateHandler(25, 0, 10));
//pipeline.addLast(new ClientHeartbeatHandler());
pipeline.addLast(new SimpleClientHandler(client));
}
}
}
客户端运行类
public class NettyClientMain {
private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClientMain.class);
private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient();
boolean connect = false;
//刚运作时试着连接10次,都没法创建连接则没有在试着
//假如想在刚起动后,一直试着连接,必须放到进程中,多线程实行,避免堵塞程序流程for (int i = 0; i < 10; i ) {
connect = nettyClient.connect();
if (connect) {
break;
}
//联接失败,隔5s以后再次试着联接
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (connect) {
log.info(\"定时发送数据信息\");
send(nettyClient);
} else {
nettyClient.close();
log.info(\"过程撤出\");
}
}
/** 定时发送数据信息 */
static void send(NettyClient client) {
scheduledExecutor.schedule(new SendTask(client,scheduledExecutor), 2, TimeUnit.SECONDS);
}
}
手机客户端断开重新连接
断开重新连接要求:
- 服务器端和手机客户端中间网络连接超时,或回应请求超时(例如有一个很长期的fullGC),手机客户端必须积极重新连接别的连接点。
- 服务器端服务器宕机时或是和手机客户端中间产生一切出现异常时,手机客户端必须积极重新连接别的连接点。
- 服务器端积极向手机客户端推送(服务器端)退出通告时,手机客户端必须积极重新连接别的连接点。
怎样监视到手机客户端和服务器端联接断掉 ?
netty的io.netty.channel.ChannelInboundHandler插口中为我们带来了很多关键的插口方式。为了防止完成所有的插口方式,可以利用承继io.netty.channel.ChannelInboundHandlerAdapter来重新写过相对应的方式就可以。
1.void channelInactive(ChannelHandlerContext ctx);在手机客户端关掉时被启用,表明手机客户端断开。当有下列几类状况产生的时候会开启:
- 手机客户端在正常的active情况下,积极启用channel或是ctx的close方式。
- 服务器端积极启用channel或是ctx的close方式关掉手机客户端的联接 。
- 产生java.io.IOException(一般情形下是彼此联接断掉)或是java.lang.OutOfMemoryError(4.1.52版本号中新增加)时
2.void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;则是在入栈产生一切出现异常时被启用。假如出现异常是java.io.IOException或是java.lang.OutOfMemoryError(4.1.52版本号新增加)时,还会继续开启channelInactive方式,也就是上边channelInactive被开启的第3条状况。
3.心率查验也是检查手机客户端与服务器端中间联接情况的必需方法,由于在一些情况下,两边事实上已经断开,但手机客户端没法认知,此刻就要根据心率来分辨两边的联接情况。心率可以是手机客户端心率和服务器端心率。
- 手机客户端信跳:即是手机客户端推送心跳ping信息内容,服务器端回应pong信息内容。那样在指定的时间段内,彼此有数据信息互动则以为是正常的联接情况。
- 服务器端信息内容:则是服务器端向手机客户端推送ping信息内容,手机客户端回应pong信息内容。在指定的时间段内没接到回应,则觉得另一方退出。
netty给大家带来了比较简单的心率查验方法,只要在channel的handler链上,加上io.netty.handler.timeout.IdleStateHandler就可以完成。
IdleStateHandler有以下好多个关键的主要参数:
- readerIdleTimeSeconds, 读请求超时. 即如在特定的间隔时间内沒有从 Channel 载入到数据信息时, 会激发一个READER_IDLE的IdleStateEvent 事情.
- writerIdleTimeSeconds, 写请求超时. 即如在特定的间隔时间内沒有数据信息载入到 Channel 时, 会激发一个WRITER_IDLE的IdleStateEvent 事情.
- allIdleTimeSeconds, 读/写请求超时. 当在规定的间隔时间内沒有读或写实际操作时, 会激发一个ALL_IDLE的IdleStateEvent 事情.
为了更好地可以监视到这种事情的开启,还必须重新写过ChannelInboundHandler#userEventTriggered(ChannelHandlerContext ctx, Object evt)方式,根据主要参数evt分辨事情种类。在规定的時间内要是没有读写能力则推送一条心率的ping要求,在指定的时间段内没接到读实际操作则每日任务已经和服务器端断开。则启用channel或是ctx的close方式,使手机客户端Handler实行channelInactive方式。
到这儿来看大家只需要在channelInactive和exceptionCaught2个办法中保持自身的重新连接逻辑性就可以,可是小编碰到了第一个坑,重新连接方式实行了2次。
首先看实例编码和結果,在com.bruce.netty.rpc.client.SimpleClientHandler中加上如下所示编码:
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
//省去一部分编码......
/** 手机客户端正常的退出时实行该方式 */
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn(\"channelInactive:{}\", ctx.channel().localAddress());
reconnection(ctx);
}
/** 入栈产生出现异常时实行exceptionCaught */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof IOException) {
log.warn(\"exceptionCaught:手机客户端[{}]和远程控制断开\", ctx.channel().localAddress());
} else {
log.error(cause);
}
reconnection(ctx);
}
private void reconnection(ChannelHandlerContext ctx) {
log.info(\"5s以后再次创建联接\");
//临时为空完成
}
}
ClientHandlerInitializer 中加上io.netty.handler.timeout.IdleStateHandler用以心率查验,ClientHeartbeatHandler用以监视心率事情,接受心率pong回应。
static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
private NettyClient client;
public ClientHandlerInitializer(NettyClient client) {
this.client = client;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
//25s内沒有read实际操作则开启READER_IDLE事情
//10s内既沒有read又没有write实际操作则开启ALL_IDLE事情
pipeline.addLast(new IdleStateHandler(25, 0, 10));
pipeline.addLast(new ClientHeartbeatHandler());
pipeline.addLast(new SimpleClientHandler(client));
}
}
com.bruce.netty.rpc.client.ClientHeartbeatHandler
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(ClientHeartbeatHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg.equals(\"pong\")) {
log.info(\"接到心率回应\");
} else {
super.channelRead(ctx, msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
//该情况必须相互配合 io.netty.handler.timeout.IdleStateHandler应用
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
//向服务器端推送心跳检测
ctx.writeAndFlush(\"ping\");
log.info(\"发送心率数据信息\");
} else if (idleStateEvent.state() == IdleState.READER_IDLE) {
//超出特定時间沒有读事情,关掉连接
log.info(\"超出心率時间,关掉和服务器端的连接:{}\", ctx.channel().remoteAddress());
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
先运行server端,重新启动client端,待连接取得成功以后kill掉 server端过程。
根据客户端日志可以看得出,起先执行了exceptionCaught方法随后执行了channelInactive方法,可是这两个方法里都调用了reconnection方法,造成与此同时执行了2次重新连接。
为什么执行了exceptionCaught方法又执行了channelInactive方法呢?
我们可以在exceptionCaught和channelInactive方法加上中断点一步步查询源代码
当NioEventLoop执行select实际操作以后,解决对应的SelectionKey,产生异常情况后,会调用AbstractNioByteChannel.NioByteUnsafe#handleReadException方法开展解决,并开启pipeline.fireExceptionCaught(cause),最后调用到客户handler的fireExceptionCaught方法。
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
// If oom will close the read event, release connection.
// See https://github.com/netty/netty/issues/10434
if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
该方法最终会分辨出现异常种类,执行close连接的方法。在连接断线的情景中,这儿即是java.io.IOException,因此执行了close方法,当debug到AbstractChannel.AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, notify)方法中会看到最终又调用了AbstractUnsafe#fireChannelInactiveAndDeregister方法,再次debug最终则会执行自定的fireChannelInactive方法。
到这儿可以汇总一个知识要点:netty中当执行到handler地fireExceptionCaught方法时,很有可能会再次开启到fireChannelInactive,也有可能不容易开启fireChannelInactive。
除开netty依据出现异常种类分辨是不是执行close方法外,实际上开发者还可以自身根据ctx或是channel去调用close方法,编码如下所示:
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof IOException) {
log.warn(\"exceptionCaught:客户端[{}]和远程控制断掉连接\", ctx.channel().localAddress());
} else {
log.error(cause);
}
//ctx.close();
ctx.channel().close();
}
但这类表明调用close方法,是不是一定会开启调用fireChannelInactive呢?
如果是,那麼只要在exceptionCaught中调用close方法,fireChannelInactive中做重新连接的逻辑性就可以!!
在小编根据日志观查到,在exceptionCaught中调用close方法每一次都是会调用fireChannelInactive方法。可是查询源代码,小编觉得这也是不一定的,由于在AbstractChannel.AbstractUnsafe#close(ChannelPromise,Throwable, ClosedChannelException, notify)中会调用io.netty.channel.Channel#isActive开展分辨,仅有为true,才会执行fireChannelInactive方法。
//io.netty.channel.socket.nio.NioSocketChannel#isActive
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
如何解决与此同时执行2次问题呢?
在netty复位时,大家都是会加上一系列的handlerCPU,这种handler事实上会在netty建立Channel目标(NioSocketChannel)时,被装封在DefaultChannelPipeline中,而DefaultChannelPipeline事实上是一个双向链表,头连接点为TailContext,尾节点为TailContext,而里面的连接点则是大家加上的一个个handler(被封装形式成DefaultChannelHandlerContext),当执行Pipeline上的方法时,会从单链表上解析xmlhandler执行,因而当执行exceptionCaught方法时,大家只要提早清除单链表上自定的Handler则没法执行fireChannelInactive方法。
最终完成源代码如下所示:
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn(\"channelInactive:{}\", ctx.channel().localAddress());
ctx.pipeline().remove(this);
ctx.channel().close();
reconnection(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof IOException) {
log.warn(\"exceptionCaught:客户端[{}]和远程控制断开\", ctx.channel().localAddress());
} else {
log.error(cause);
}
ctx.pipeline().remove(this);
//ctx.close();
ctx.channel().close();
reconnection(ctx);
}
}
实行实际效果如下所示,能够看见当产生出现异常时,仅仅实行了exceptionCaught方式,而且根据channel关掉了上一次联接資源,都没有实行现阶段handler的fireChannelInactive方式。
怎样完成断开后再次联接 ?
根据上边剖析,大家已经了解在什么方法中保持自身的重新连接逻辑性,可是实际该怎么完成呢,满怀好奇心的心理状态检索了一下各加大码友的建立计划方案。大多数作法是根据ctx.channel().eventLoop().schedule加上一个计划任务启用客户端的衔接方式。小编也参照该方法完成源代码如下所示:
private void reconnection(ChannelHandlerContext ctx) {
log.info(\"5s以后再次创建联接\");
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
boolean connect = client.connect();
if (connect) {
log.info(\"再次联接取得成功\");
} else {
reconnection(ctx);
}
}
}, 5, TimeUnit.SECONDS);
}
检测:先运行server端,重新启动client端,待联接取得成功以后kill掉 server端过程。客户端按期定时执行重新连接,但也便去餐区倒杯茶的時间,回家后发觉了如下所示出现异常。
......省去14条同样的再试日志
[2021-01-17 18:46:45.032] INFO [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s以后再次创建联接
[2021-01-17 18:46:48.032] INFO [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 试着联接到服务器端: 127.0.0.1:8088
[2021-01-17 18:46:50.038] ERROR [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 联接出现异常:Connection refused: no further information: /127.0.0.1:8088
[2021-01-17 18:46:50.038] INFO [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s以后再次创建联接
[2021-01-17 18:46:53.040] INFO [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 试着联接到服务器端: 127.0.0.1:8088
[2021-01-17 18:46:53.048] ERROR [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : connect error:
io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@10122121(incomplete)
at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:462)
at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:159)
at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:667)
at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:305)
at com.bruce.netty.rpc.client.NettyClient.connect(NettyClient.java:49)
at com.bruce.netty.rpc.client.SimpleClientHandler$1.run(SimpleClientHandler.java:65)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
依据出现异常栈,可以发觉是com.bruce.netty.rpc.client.NettyClient#connect方式中获取了等候方式
boolean notTimeout = channelFuture.awaitUninterruptibly(20, TimeUnit.SECONDS);
而该方法里面会开展检验,是不是在io进程上实行了同歩等候,这会造成抛出异常BlockingOperationException。
@Override
protected void checkDeadLock() {
if (channel().isRegistered()) {
super.checkDeadLock();
}
}
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
令人费解的是为什么并不是每一次试着重新连接都抛出去该出现异常,反而是每过16次抛出去一次呢?
这要我想象到自身的笔记本电脑是8核CPU,而netty默认设置线程池是2 * c,便是16条进程,这中间好像有一些关系。
事实上在启用ChannelFuture channelFuture = bootstrap.connect(“127.0.0.1”, 8088);,netty最先会建立一个io.netty.channel.Channel(实例中是NioSocketChannel),随后根据io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser先后挑选一个NioEventLoop,将Channel关联到NioEventLoop上。
io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
//Return true if the given Thread is executed in the event loop, false otherwise.
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
重新连接的办法是在一个NioEventLoop(也就是io进程)上被启用,第1次重新连接事实上是挑选了第2个NioEventLoop,第2次重新连接事实上是挑选了第3个NioEventLoop,依此类推,当一轮挑选之后,再次挑到第一个NioEventLoop时,boolean inEventLoop()回到true,则提出了BlockingOperationException。
计划方案1
不要在netty的io线程上执行同歩联接,应用独立的线程池按时执行再试,该线程还能够执行自身重新连接的业务逻辑操作,不堵塞io线程。(假如不用业务流程操作以后消毁线程池)。
com.bruce.netty.rpc.client.SimpleClientHandler 改动reconnection方式
private static ScheduledExecutorService SCHEDULED_EXECUTOR;
private void initScheduledExecutor() {
if (SCHEDULED_EXECUTOR == null) {
synchronized (SimpleClientHandler.class) {
if (SCHEDULED_EXECUTOR == null) {
SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, \"Client-Reconnect-1\");
t.setDaemon(true);
return t;
});
}
}
}
}
private void reconnection(ChannelHandlerContext ctx) {
log.info(\"5s以后再次创建联接\");
initScheduledExecutor();
SCHEDULED_EXECUTOR.schedule(() -> {
boolean connect = client.connect();
if (connect) {
//连接取得成功,关掉线程池
SCHEDULED_EXECUTOR.shutdown();
log.info(\"再次联接取得成功\");
} else {
reconnection(ctx);
}
}, 3, TimeUnit.SECONDS);
}
计划方案2
可以在io线程上应用多线程重新连接:
com.bruce.netty.rpc.client.NettyClient加上方式connectAsync方法,二者的差异取决于connectAsync方式中沒有启用channelFuture的同歩等候方式。反而是改为窃听器(ChannelFutureListener)的方法,事实上这一窃听器是运作在io线程上。
public void connectAsync() {
log.info(\"试着联接到服务器端: 127.0.0.1:8088\");
ChannelFuture channelFuture = bootstrap.connect(\"127.0.0.1\", 8088);
channelFuture.addListener((ChannelFutureListener) future -> {
Throwable cause = future.cause();
if (cause != null) {
exceptionHandler(cause);
log.info(\"等候下一次重新连接\");
channelFuture.channel().eventLoop().schedule(this::connectAsync, 5, TimeUnit.SECONDS);
} else {
clientChannel = channelFuture.channel();
if (clientChannel != null && clientChannel.isActive()) {
log.info(\"Netty client started !!! {} connect to server\", clientChannel.localAddress());
}
}
});
}
com.bruce.netty.rpc.client.SimpleClientHandler
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
private NettyClient client;
public SimpleClientHandler(NettyClient client) {
this.client = client;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(\"client receive:{}\", msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn(\"channelInactive:{}\", ctx.channel().localAddress());
ctx.pipeline().remove(this);
ctx.channel().close();
reconnectionAsync(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof IOException) {
log.warn(\"exceptionCaught:手机客户端[{}]和远程控制断开\", ctx.channel().localAddress());
} else {
log.error(cause);
}
ctx.pipeline().remove(this);
ctx.close();
reconnectionAsync(ctx);
}
private void reconnectionAsync(ChannelHandlerContext ctx) {
log.info(\"5s以后再次创建联接\");
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
client.connectAsync();
}
}, 5, TimeUnit.SECONDS);
}
}
netty手机客户端线程给多少较为有效 ?
netty中一个NioEventLoopGroup默认设置建立的线程数是cpu核心数 * 2 ,这种线程全是用以io操作,那麼针对手机客户端应用软件而言确实必须这么多io线程么?
根据上边剖析BlockingOperationException出现异常时大家剖析到,事实上netty在建立一个Channel目标后只能从NioEventLoopGroup中选取一个NioEventLoop来关联,仅有建立好几个Channel才会先后挑选下一个NioEventLoop,换句话说一个Channel只能相匹配一个NioEventLoop,而NioEventLoop可以关联好几个Channel。
1.针对手机客户端而言,假如仅仅联接的一个server连接点,那麼只需设定1条线程就可以。即使发生了断开重新连接,在联接断掉以后,以前的Channel会从NioEventLoop清除。重新连接以后,依然只能在仅有的一个NioEventLoop申请注册一个新的Channel。
2.假如手机客户端与此同时如下所示方法多次启用io.netty.bootstrap.Bootstrap#connect(String inetHost, int inetPort)联接好几个Server连接点,那麼线程可以设定大一点,但不能高于2*c,并且只需发生断开重新连接,一样不可以确保每一个NioEventLoop都是会关联一个手机客户端Channel。
public boolean connect() {
try {
ChannelFuture channelFuture1 = bootstrap.connect(\"127.0.0.1\", 8088);
ChannelFuture channelFuture2 = bootstrap.connect(\"127.0.0.1\", 8088);
ChannelFuture channelFuture3 = bootstrap.connect(\"127.0.0.1\", 8088);
} catch (Exception e) {
exceptionHandler(e);
}
clientChannel.close();
return false;
}
3.假如netty手机客户端线程数设定超过1有哪些不良影响么?
显著的出现异常肯定是不可能有的,可是导致网络资源消耗,最先会建立好几个NioEventLoop目标,但NioEventLoop是处在非运作情况。一旦发生断开重新连接,那麼再次接入时,下一个NioEventLoop则会被选定,并建立/运行线程一直处在runnable情况。而上一个NioEventLoop也是一直处在runnable情况,因为上一个Channel已经被close,因此会导致每一次select結果全是空的,没有意义的空轮询。
如下所示则是netty手机客户端应用默认设置线程数,4次断开重新连接后一共建立的5条NioEventLoop线程,可是事实上仅有第5条线程在执行读写能力操作。
4.假如客户端存有耗时的业务逻辑,应当直接应用业务流程线程池,防止在netty的io进程中实行耗时逻辑性解决。
汇总
这篇关键解读了,netty断线重新连接的二种完成计划方案,及其建立流程中遇上的不正常问题,根据分析问题,让我们掌握netty的完成关键点。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。