netty断线重连应该怎么实现(客户端netty断线重连的方法教程)

前言 在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开?如何实现断线后重新连接?netty客户端线程给多大比较合理? 其实上面都是笔者在做断线重连时所遇到的问题,而“netty客户端线程给多大比较合理?”这个问题更是笔者在做断线重连时因一个异常引发的思考。下面讲讲整个过程: 因为本节讲解内容主…

引言

在完成TCP长联网控制中,客户端断线重新连接是一个很普遍的问题,在我们应用netty完成断线重连时,是不是考虑到过如下所示好多个问题:

  • 怎样监视到客户端和服务端联接断掉 ?
  • 怎样完成断线后再次联接 ?
  • netty客户端进程给多少较为有效 ?

实际上上边全是笔者在做断线重连时所碰到的问题,而 “netty客户端进程给多少较为有效?” 这个问题也是笔者在做断线重新连接时易一个出现异常引起的思索。下边讲下整个过程:

由于这节解读內容具体涉及到在客户端,可是为了更好地阅读者可以运作全部程序流程,因此这儿先得出服务端及公共性的依赖和实体类。

服务端及common编码

maven依赖:

<dependencies>
    <!--仅仅使用了spring-boot的日志架构-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.4.1</version>
    </dependency>

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.56.Final</version>
    </dependency>

    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling-serial</artifactId>
        <version>2.0.10.Final</version>
    </dependency>
</dependencies>

服务端业务流程解决编码

主要运用于纪录打印出现阶段客户端线程数,当接受到客户端信息内容后回到“hello netty”字符串数组

@ChannelHandler.Sharable
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);
    public static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
        log.info(\"客户端联接取得成功: client address :{}\", ctx.channel().remoteAddress());
        log.info(\"现阶段一共有{}个客户端联接\", channels.size());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info(\"server channelRead:{}\", msg);
        ctx.channel().writeAndFlush(\"hello netty\");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info(\"channelInactive: client close\");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof java.io.IOException) {
            log.warn(\"exceptionCaught: client close\");
        } else {
            cause.printStackTrace();
        }
    }
}

服务端心率查验编码

当接受心率”ping”信息内容后,回到客户端’’pong”信息内容。假如客户端在指定的时间段内沒有推送一切信息内容则关掉客户端。

public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info(\"server channelRead:{}\", msg);
        if (msg.equals(\"ping\")) {
            ctx.channel().writeAndFlush(\"pong\");
        } else {
            //由下一个handler解决,实例中则为SimpleServerHandler
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //该情况必须相互配合 io.netty.handler.timeout.IdleStateHandler应用
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                //超出特定時间沒有读事情,关掉联接
                log.info(\"超出心率時间,关掉和服务端的联接:{}\", ctx.channel().remoteAddress());
                //ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

编解码java工具

关键应用jboss-marshalling-serial编解码专用工具,可自主查看其优点和缺点,这儿仅仅实例应用。

public final class MarshallingCodeFactory {
    /** 建立Jboss marshalling 视频解码器 */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //主要参数serial表明建立的是Java编码序列化工企业目标,由jboss-marshalling-serial给予
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory(\"serial\");
        MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
        return new MarshallingDecoder(provider, 1024);
    }

    /** 建立Jboss marshalling 伺服电机 */
    public static MarshallingEncoder buildMarshallingEncoder() {
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory(\"serial\");
        MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
        return new MarshallingEncoder(provider);
    }
}

公共性实体类

public class UserInfo implements Serializable {
    private static final long serialVersionUID = 6271330872494117382L;
    private String username;
    private int age;

    public UserInfo() {
    }

    public UserInfo(String username, int age) {
        this.username = username;
        this.age = age;
    }
   //省去getter/setter/toString
}

下边逐渐文中的关键,客户端断开重新连接及其问题思索。

客户端完成

  • 一开始运作时必须开展同歩连接,特定连接频次内没有用根据则抛出异常,过程撤出。
  • 客户端运行后,打开计划任务,仿真模拟客户端数据信息推送。

客户端业务流程解决handler,接受到信息后,根据日志打印出。

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    private NettyClient client;

    public SimpleClientHandler(NettyClient client) {
        this.client = client;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info(\"client receive:{}\", msg);
    }
}

封装形式连接方式、断掉连接方式、getChannel()返回io.netty.channel.Channel用以向服务器端传送数据。boolean connect()是一个同歩连接方式,假如连接取得成功返回true,连接不成功返回false。

public class NettyClient {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);

    private EventLoopGroup workerGroup;
    private Bootstrap bootstrap;
    private volatile Channel clientChannel;

    public NettyClient() {
        this(-1);
    }

    public NettyClient(int threads) {
        workerGroup = threads > 0 ? new NioEventLoopGroup(threads) : new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
                .handler(new ClientHandlerInitializer(this));
    }

    public boolean connect() {
        log.info(\"试着连接到服务器端: 127.0.0.1:8088\");
        try {
            ChannelFuture channelFuture = bootstrap.connect(\"127.0.0.1\", 8088);

            boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS);
            clientChannel = channelFuture.channel();
            if (notTimeout) {
                if (clientChannel != null && clientChannel.isActive()) {
                    log.info(\"netty client started !!! {} connect to server\", clientChannel.localAddress());
                    return true;
                }
                Throwable cause = channelFuture.cause();
                if (cause != null) {
                    exceptionHandler(cause);
                }
            } else {
                log.warn(\"connect remote host[{}] timeout {}s\", clientChannel.remoteAddress(), 30);
            }
        } catch (Exception e) {
            exceptionHandler(e);
        }
        clientChannel.close();
        return false;
    }

    private void exceptionHandler(Throwable cause) {
        if (cause instanceof ConnectException) {
            log.error(\"连接出现异常:{}\", cause.getMessage());
        } else if (cause instanceof ClosedChannelException) {
            log.error(\"connect error:{}\", \"client has destroy\");
        } else {
            log.error(\"connect error:\", cause);
        }
    }

    public void close() {
        if (clientChannel != null) {
            clientChannel.close();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    public Channel getChannel() {
        return clientChannel;
    }

    static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
        private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
        private NettyClient client;

        public ClientHandlerInitializer(NettyClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
            pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
            //pipeline.addLast(new IdleStateHandler(25, 0, 10));
            //pipeline.addLast(new ClientHeartbeatHandler());
            pipeline.addLast(new SimpleClientHandler(client));
        }
    }
}

客户端运行类

public class NettyClientMain {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClientMain.class);
    private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

    public static void main(String[] args) {
        NettyClient nettyClient = new NettyClient();
        boolean connect = false;
        //刚运作时试着连接10次,都没法创建连接则没有在试着
        //假如想在刚起动后,一直试着连接,必须放到进程中,多线程实行,避免堵塞程序流程for (int i = 0; i < 10; i  ) {
            connect = nettyClient.connect();
            if (connect) {
                break;
            }
            //联接失败,隔5s以后再次试着联接
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        if (connect) {
            log.info(\"定时发送数据信息\");
            send(nettyClient);
        } else {
            nettyClient.close();
            log.info(\"过程撤出\");
        }
    }

    /** 定时发送数据信息 */
    static void send(NettyClient client) {
        scheduledExecutor.schedule(new SendTask(client,scheduledExecutor), 2, TimeUnit.SECONDS);
    }
}

手机客户端断开重新连接

断开重新连接要求:

  • 服务器端和手机客户端中间网络连接超时,或回应请求超时(例如有一个很长期的fullGC),手机客户端必须积极重新连接别的连接点。
  • 服务器端服务器宕机时或是和手机客户端中间产生一切出现异常时,手机客户端必须积极重新连接别的连接点。
  • 服务器端积极向手机客户端推送(服务器端)退出通告时,手机客户端必须积极重新连接别的连接点。

怎样监视到手机客户端和服务器端联接断掉 ?

netty的io.netty.channel.ChannelInboundHandler插口中为我们带来了很多关键的插口方式。为了防止完成所有的插口方式,可以利用承继io.netty.channel.ChannelInboundHandlerAdapter来重新写过相对应的方式就可以。

1.void channelInactive(ChannelHandlerContext ctx);在手机客户端关掉时被启用,表明手机客户端断开。当有下列几类状况产生的时候会开启:

  • 手机客户端在正常的active情况下,积极启用channel或是ctx的close方式。
  • 服务器端积极启用channel或是ctx的close方式关掉手机客户端的联接 。
  • 产生java.io.IOException(一般情形下是彼此联接断掉)或是java.lang.OutOfMemoryError(4.1.52版本号中新增加)时

2.void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;则是在入栈产生一切出现异常时被启用。假如出现异常是java.io.IOException或是java.lang.OutOfMemoryError(4.1.52版本号新增加)时,还会继续开启channelInactive方式,也就是上边channelInactive被开启的第3条状况。

3.心率查验也是检查手机客户端与服务器端中间联接情况的必需方法,由于在一些情况下,两边事实上已经断开,但手机客户端没法认知,此刻就要根据心率来分辨两边的联接情况。心率可以是手机客户端心率和服务器端心率。

  • 手机客户端信跳:即是手机客户端推送心跳ping信息内容,服务器端回应pong信息内容。那样在指定的时间段内,彼此有数据信息互动则以为是正常的联接情况。
  • 服务器端信息内容:则是服务器端向手机客户端推送ping信息内容,手机客户端回应pong信息内容。在指定的时间段内没接到回应,则觉得另一方退出。

netty给大家带来了比较简单的心率查验方法,只要在channel的handler链上,加上io.netty.handler.timeout.IdleStateHandler就可以完成。

IdleStateHandler有以下好多个关键的主要参数:

  • readerIdleTimeSeconds, 读请求超时. 即如在特定的间隔时间内沒有从 Channel 载入到数据信息时, 会激发一个READER_IDLE的IdleStateEvent 事情.
  • writerIdleTimeSeconds, 写请求超时. 即如在特定的间隔时间内沒有数据信息载入到 Channel 时, 会激发一个WRITER_IDLE的IdleStateEvent 事情.
  • allIdleTimeSeconds, 读/写请求超时. 当在规定的间隔时间内沒有读或写实际操作时, 会激发一个ALL_IDLE的IdleStateEvent 事情.

为了更好地可以监视到这种事情的开启,还必须重新写过ChannelInboundHandler#userEventTriggered(ChannelHandlerContext ctx, Object evt)方式,根据主要参数evt分辨事情种类。在规定的時间内要是没有读写能力则推送一条心率的ping要求,在指定的时间段内没接到读实际操作则每日任务已经和服务器端断开。则启用channel或是ctx的close方式,使手机客户端Handler实行channelInactive方式。

到这儿来看大家只需要在channelInactive和exceptionCaught2个办法中保持自身的重新连接逻辑性就可以,可是小编碰到了第一个坑,重新连接方式实行了2次。

首先看实例编码和結果,在com.bruce.netty.rpc.client.SimpleClientHandler中加上如下所示编码:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    //省去一部分编码......
    /** 手机客户端正常的退出时实行该方式 */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn(\"channelInactive:{}\", ctx.channel().localAddress());
        reconnection(ctx);
    }

    /** 入栈产生出现异常时实行exceptionCaught */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn(\"exceptionCaught:手机客户端[{}]和远程控制断开\", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        reconnection(ctx);
    }

    private void reconnection(ChannelHandlerContext ctx) {
        log.info(\"5s以后再次创建联接\");
        //临时为空完成
    }
}

ClientHandlerInitializer 中加上io.netty.handler.timeout.IdleStateHandler用以心率查验,ClientHeartbeatHandler用以监视心率事情,接受心率pong回应。

static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
    private NettyClient client;

    public ClientHandlerInitializer(NettyClient client) {
        this.client = client;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
        pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
        //25s内沒有read实际操作则开启READER_IDLE事情
        //10s内既沒有read又没有write实际操作则开启ALL_IDLE事情
        pipeline.addLast(new IdleStateHandler(25, 0, 10));
        pipeline.addLast(new ClientHeartbeatHandler());
        pipeline.addLast(new SimpleClientHandler(client));
    }
}

com.bruce.netty.rpc.client.ClientHeartbeatHandler

public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(ClientHeartbeatHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg.equals(\"pong\")) {
            log.info(\"接到心率回应\");
        } else {
            super.channelRead(ctx, msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //该情况必须相互配合 io.netty.handler.timeout.IdleStateHandler应用
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
                //向服务器端推送心跳检测
                ctx.writeAndFlush(\"ping\");
                log.info(\"发送心率数据信息\");
            } else if (idleStateEvent.state() == IdleState.READER_IDLE) {
                //超出特定時间沒有读事情,关掉连接
                log.info(\"超出心率時间,关掉和服务器端的连接:{}\", ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

先运行server端,重新启动client端,待连接取得成功以后kill掉 server端过程。

Netty客户端断线重连实现及问题思考

根据客户端日志可以看得出,起先执行了exceptionCaught方法随后执行了channelInactive方法,可是这两个方法里都调用了reconnection方法,造成与此同时执行了2次重新连接。

为什么执行了exceptionCaught方法又执行了channelInactive方法呢?

我们可以在exceptionCaught和channelInactive方法加上中断点一步步查询源代码

Netty客户端断线重连实现及问题思考

当NioEventLoop执行select实际操作以后,解决对应的SelectionKey,产生异常情况后,会调用AbstractNioByteChannel.NioByteUnsafe#handleReadException方法开展解决,并开启pipeline.fireExceptionCaught(cause),最后调用到客户handler的fireExceptionCaught方法。

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
  RecvByteBufAllocator.Handle allocHandle) {
 if (byteBuf != null) {
  if (byteBuf.isReadable()) {
   readPending = false;
   pipeline.fireChannelRead(byteBuf);
  } else {
   byteBuf.release();
  }
 }
 allocHandle.readComplete();
 pipeline.fireChannelReadComplete();
 pipeline.fireExceptionCaught(cause);

 // If oom will close the read event, release connection.
 // See https://github.com/netty/netty/issues/10434
 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
  closeOnRead(pipeline);
 }
}

该方法最终会分辨出现异常种类,执行close连接的方法。在连接断线的情景中,这儿即是java.io.IOException,因此执行了close方法,当debug到AbstractChannel.AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, notify)方法中会看到最终又调用了AbstractUnsafe#fireChannelInactiveAndDeregister方法,再次debug最终则会执行自定的fireChannelInactive方法。

到这儿可以汇总一个知识要点:netty中当执行到handler地fireExceptionCaught方法时,很有可能会再次开启到fireChannelInactive,也有可能不容易开启fireChannelInactive。

除开netty依据出现异常种类分辨是不是执行close方法外,实际上开发者还可以自身根据ctx或是channel去调用close方法,编码如下所示:

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    if (cause instanceof IOException) {
        log.warn(\"exceptionCaught:客户端[{}]和远程控制断掉连接\", ctx.channel().localAddress());
    } else {
        log.error(cause);
    }
    //ctx.close();
    ctx.channel().close();
}

但这类表明调用close方法,是不是一定会开启调用fireChannelInactive呢?

如果是,那麼只要在exceptionCaught中调用close方法,fireChannelInactive中做重新连接的逻辑性就可以!!

在小编根据日志观查到,在exceptionCaught中调用close方法每一次都是会调用fireChannelInactive方法。可是查询源代码,小编觉得这也是不一定的,由于在AbstractChannel.AbstractUnsafe#close(ChannelPromise,Throwable, ClosedChannelException, notify)中会调用io.netty.channel.Channel#isActive开展分辨,仅有为true,才会执行fireChannelInactive方法。

//io.netty.channel.socket.nio.NioSocketChannel#isActive
@Override
public boolean isActive() {
    SocketChannel ch = javaChannel();
    return ch.isOpen() && ch.isConnected();
}

如何解决与此同时执行2次问题呢?

在netty复位时,大家都是会加上一系列的handlerCPU,这种handler事实上会在netty建立Channel目标(NioSocketChannel)时,被装封在DefaultChannelPipeline中,而DefaultChannelPipeline事实上是一个双向链表,头连接点为TailContext,尾节点为TailContext,而里面的连接点则是大家加上的一个个handler(被封装形式成DefaultChannelHandlerContext),当执行Pipeline上的方法时,会从单链表上解析xmlhandler执行,因而当执行exceptionCaught方法时,大家只要提早清除单链表上自定的Handler则没法执行fireChannelInactive方法。

Netty客户端断线重连实现及问题思考

最终完成源代码如下所示:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {

    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn(\"channelInactive:{}\", ctx.channel().localAddress());
  ctx.pipeline().remove(this);
        ctx.channel().close();
        reconnection(ctx);
    }
   @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn(\"exceptionCaught:客户端[{}]和远程控制断开\", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        ctx.pipeline().remove(this);
        //ctx.close();
        ctx.channel().close();
        reconnection(ctx);
    }
}

实行实际效果如下所示,能够看见当产生出现异常时,仅仅实行了exceptionCaught方式,而且根据channel关掉了上一次联接資源,都没有实行现阶段handler的fireChannelInactive方式。

Netty客户端断线重连实现及问题思考

怎样完成断开后再次联接 ?

根据上边剖析,大家已经了解在什么方法中保持自身的重新连接逻辑性,可是实际该怎么完成呢,满怀好奇心的心理状态检索了一下各加大码友的建立计划方案。大多数作法是根据ctx.channel().eventLoop().schedule加上一个计划任务启用客户端的衔接方式。小编也参照该方法完成源代码如下所示:

private void reconnection(ChannelHandlerContext ctx) {
 log.info(\"5s以后再次创建联接\");
 ctx.channel().eventLoop().schedule(new Runnable() {
  @Override
  public void run() {
   boolean connect = client.connect();
   if (connect) {
    log.info(\"再次联接取得成功\");
   } else {
    reconnection(ctx);
   }
  }
 }, 5, TimeUnit.SECONDS);
}

检测:先运行server端,重新启动client端,待联接取得成功以后kill掉 server端过程。客户端按期定时执行重新连接,但也便去餐区倒杯茶的時间,回家后发觉了如下所示出现异常。

......省去14条同样的再试日志
[2021-01-17 18:46:45.032] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s以后再次创建联接
[2021-01-17 18:46:48.032] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 试着联接到服务器端: 127.0.0.1:8088
[2021-01-17 18:46:50.038] ERROR   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 联接出现异常:Connection refused: no further information: /127.0.0.1:8088
[2021-01-17 18:46:50.038] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s以后再次创建联接
[2021-01-17 18:46:53.040] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 试着联接到服务器端: 127.0.0.1:8088
[2021-01-17 18:46:53.048] ERROR   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : connect error:
io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@10122121(incomplete)
 at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:462)
 at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:159)
 at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:667)
 at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:305)
 at com.bruce.netty.rpc.client.NettyClient.connect(NettyClient.java:49)
 at com.bruce.netty.rpc.client.SimpleClientHandler$1.run(SimpleClientHandler.java:65)
 at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
 at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
 at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
 at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
 at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)

依据出现异常栈,可以发觉是com.bruce.netty.rpc.client.NettyClient#connect方式中获取了等候方式

boolean notTimeout = channelFuture.awaitUninterruptibly(20, TimeUnit.SECONDS);

而该方法里面会开展检验,是不是在io进程上实行了同歩等候,这会造成抛出异常BlockingOperationException。

@Override
protected void checkDeadLock() {
    if (channel().isRegistered()) {
        super.checkDeadLock();
    }
}
protected void checkDeadLock() {
    EventExecutor e = executor();
    if (e != null && e.inEventLoop()) {
        throw new BlockingOperationException(toString());
    }
}

令人费解的是为什么并不是每一次试着重新连接都抛出去该出现异常,反而是每过16次抛出去一次呢?

这要我想象到自身的笔记本电脑是8核CPU,而netty默认设置线程池是2 * c,便是16条进程,这中间好像有一些关系。

事实上在启用ChannelFuture channelFuture = bootstrap.connect(“127.0.0.1”, 8088);,netty最先会建立一个io.netty.channel.Channel(实例中是NioSocketChannel),随后根据io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser先后挑选一个NioEventLoop,将Channel关联到NioEventLoop上。

Netty客户端断线重连实现及问题思考

io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop

//Return true if the given Thread is executed in the event loop, false otherwise.
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

重新连接的办法是在一个NioEventLoop(也就是io进程)上被启用,第1次重新连接事实上是挑选了第2个NioEventLoop,第2次重新连接事实上是挑选了第3个NioEventLoop,依此类推,当一轮挑选之后,再次挑到第一个NioEventLoop时,boolean inEventLoop()回到true,则提出了BlockingOperationException。

计划方案1

不要在netty的io线程上执行同歩联接,应用独立的线程池按时执行再试,该线程还能够执行自身重新连接的业务逻辑操作,不堵塞io线程。(假如不用业务流程操作以后消毁线程池)。

com.bruce.netty.rpc.client.SimpleClientHandler 改动reconnection方式

private static ScheduledExecutorService SCHEDULED_EXECUTOR;

private void initScheduledExecutor() {
 if (SCHEDULED_EXECUTOR == null) {
  synchronized (SimpleClientHandler.class) {
   if (SCHEDULED_EXECUTOR == null) {
    SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(r -> {
     Thread t = new Thread(r, \"Client-Reconnect-1\");
     t.setDaemon(true);
     return t;
    });
   }
  }
 }
}

private void reconnection(ChannelHandlerContext ctx) {
 log.info(\"5s以后再次创建联接\");
 initScheduledExecutor();

 SCHEDULED_EXECUTOR.schedule(() -> {
  boolean connect = client.connect();
  if (connect) {
   //连接取得成功,关掉线程池
   SCHEDULED_EXECUTOR.shutdown();
   log.info(\"再次联接取得成功\");
  } else {
   reconnection(ctx);
  }
 }, 3, TimeUnit.SECONDS);
}

计划方案2

可以在io线程上应用多线程重新连接:

com.bruce.netty.rpc.client.NettyClient加上方式connectAsync方法,二者的差异取决于connectAsync方式中沒有启用channelFuture的同歩等候方式。反而是改为窃听器(ChannelFutureListener)的方法,事实上这一窃听器是运作在io线程上。

 public void connectAsync() {
    log.info(\"试着联接到服务器端: 127.0.0.1:8088\");
    ChannelFuture channelFuture = bootstrap.connect(\"127.0.0.1\", 8088);
    channelFuture.addListener((ChannelFutureListener) future -> {
        Throwable cause = future.cause();
        if (cause != null) {
            exceptionHandler(cause);
            log.info(\"等候下一次重新连接\");
            channelFuture.channel().eventLoop().schedule(this::connectAsync, 5, TimeUnit.SECONDS);
        } else {
            clientChannel = channelFuture.channel();
            if (clientChannel != null && clientChannel.isActive()) {
                log.info(\"Netty client started !!! {} connect to server\", clientChannel.localAddress());
            }
        }
    });
}

com.bruce.netty.rpc.client.SimpleClientHandler

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    private NettyClient client;

    public SimpleClientHandler(NettyClient client) {
        this.client = client;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info(\"client receive:{}\", msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn(\"channelInactive:{}\", ctx.channel().localAddress());
        ctx.pipeline().remove(this);
        ctx.channel().close();
        reconnectionAsync(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn(\"exceptionCaught:手机客户端[{}]和远程控制断开\", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        ctx.pipeline().remove(this);
        ctx.close();
        reconnectionAsync(ctx);
    }

    private void reconnectionAsync(ChannelHandlerContext ctx) {
        log.info(\"5s以后再次创建联接\");
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                client.connectAsync();
            }
        }, 5, TimeUnit.SECONDS);
    }
}

netty手机客户端线程给多少较为有效 ?

netty中一个NioEventLoopGroup默认设置建立的线程数是cpu核心数 * 2 ,这种线程全是用以io操作,那麼针对手机客户端应用软件而言确实必须这么多io线程么?

根据上边剖析BlockingOperationException出现异常时大家剖析到,事实上netty在建立一个Channel目标后只能从NioEventLoopGroup中选取一个NioEventLoop来关联,仅有建立好几个Channel才会先后挑选下一个NioEventLoop,换句话说一个Channel只能相匹配一个NioEventLoop,而NioEventLoop可以关联好几个Channel。

1.针对手机客户端而言,假如仅仅联接的一个server连接点,那麼只需设定1条线程就可以。即使发生了断开重新连接,在联接断掉以后,以前的Channel会从NioEventLoop清除。重新连接以后,依然只能在仅有的一个NioEventLoop申请注册一个新的Channel。

2.假如手机客户端与此同时如下所示方法多次启用io.netty.bootstrap.Bootstrap#connect(String inetHost, int inetPort)联接好几个Server连接点,那麼线程可以设定大一点,但不能高于2*c,并且只需发生断开重新连接,一样不可以确保每一个NioEventLoop都是会关联一个手机客户端Channel。

 public boolean connect() {
      try {
          ChannelFuture channelFuture1 = bootstrap.connect(\"127.0.0.1\", 8088);
          ChannelFuture channelFuture2 = bootstrap.connect(\"127.0.0.1\", 8088);
          ChannelFuture channelFuture3 = bootstrap.connect(\"127.0.0.1\", 8088);
      } catch (Exception e) {
          exceptionHandler(e);
      }
      clientChannel.close();
      return false;
  }

3.假如netty手机客户端线程数设定超过1有哪些不良影响么?

显著的出现异常肯定是不可能有的,可是导致网络资源消耗,最先会建立好几个NioEventLoop目标,但NioEventLoop是处在非运作情况。一旦发生断开重新连接,那麼再次接入时,下一个NioEventLoop则会被选定,并建立/运行线程一直处在runnable情况。而上一个NioEventLoop也是一直处在runnable情况,因为上一个Channel已经被close,因此会导致每一次select結果全是空的,没有意义的空轮询。

如下所示则是netty手机客户端应用默认设置线程数,4次断开重新连接后一共建立的5条NioEventLoop线程,可是事实上仅有第5条线程在执行读写能力操作。

Netty客户端断线重新连接完成及问题思考
Netty客户端断线重连实现及问题思考

4.假如客户端存有耗时的业务逻辑,应当直接应用业务流程线程池,防止在netty的io进程中实行耗时逻辑性解决。

汇总

这篇关键解读了,netty断线重新连接的二种完成计划方案,及其建立流程中遇上的不正常问题,根据分析问题,让我们掌握netty的完成关键点。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

(0)
上一篇 2022年5月8日 上午11:16
下一篇 2022年5月8日 上午11:18

相关推荐

  • 怎么制作超链接,先带你了解什么是超链接

    接下来,我们来讲讲公众号怎么做超链接? 之前没有接触过互联网或者说是你是刚入门的朋友,可能还不知道什么叫超链接。之前如果有做超链接的朋友,那么恭喜你,这篇文章可能就不用看了。经常会有刚入门的朋友,来问我,像文章、网页中这样的蓝色的链接是怎样做出来的?相信大家很多都是看到过这些蓝色链接的吧,其实你一点就可以进入到他链接到的页面,对吧?那很多朋友就问了,说这样的链接是怎么做出来的? ​ 这个超链接的制…

    2022年7月2日
    740
  • seo网站编辑是做什么的,seo网站编辑的主要工作内容介绍

    网站编辑是企业开展网络SEO优化的一个重要岗位,网站编辑通过撰写企业品牌、产品信息、推广软文等进行网站营销,这其中需要提供良好的用户体验,解决用户的需求,再者就是需要站在搜索引擎的角度,把撰写的文章得到更好的排名,让企业产品得到更好的曝光,挖掘更多的潜在的客户。网站文案该怎么结合网站SEO优化技巧进行撰写文案呢?小编在这里给大家分享一下。 1、围绕用户需求撰写文案 做网站的目的就是为了更好的吸引客…

    2022年8月10日
    450
  • 怎么下载网页上的图片到电脑,掌握这个方法下载其实很简单

    有一些朋友因为工作需要或者个人爱好,总是需要下载各种图片,比如壁纸啦、设计素材啦,有时候耗费大量时间终于在网页里找到了。 下载图片很简单,或者右键另存为,或者直接拖到桌面,但是这只适合几张图片的下载,如果一个页面上有几十兆,甚至几百兆上千张的图片,显然用鼠标来一张一张下载太慢了,有没有办法快速下载呢? 当然有的,小迷虫可是一直使用的这种批量下载图片的方法呢,这个方法就是浏览器配合插件:图片助手,就…

    2022年9月9日
    480
  • seo技术有哪些,seo技术的基本技术和应用

    SEO是什么?百度百科上的解释如下: 图一:什么是SEO 简单的来说,就是通过优化手段来提升网站的排名。我们来举个例子说明一下,例如在百度搜索框中搜索SEO,如下图所示。 图二:SEO的搜素结果 我们看到,百度为您找到相关结果约100,000,000个,那么这么多页面展示在我们面前,不可能一下全部展现出来,他们的展示一定是有个先后顺序的。如上图所示,排在第一位的是SEO的百度百科,第二位,第三位分…

    2022年7月14日
    730
  • 电动工具排名有哪些(国际电动工具品牌)

    博世BOSCH 说到电动工具,世界排名第一的品牌毋庸置疑,就是德国博世集团。旗下的电动工具品牌包括BOSCH蓝、BOSCH绿、BOSCH-T、skill。在国内能见到的有BOSCH蓝、BOSCH-T。skill世纪是BOSCH收购的美国品牌。 其中BOSCH蓝基本上全品类电动工具都棒棒哒。 挑出来一个代表性产品有些困难,不过一般电动工具行业倾向于用电锤技术代表公司整体技术实力,所以随便挑了一个博世…

    2022年10月19日
    5040
  • 产品概念是什么,产品概念的产生方法介绍

    RWW,即Real-Win-Worth,分别表示的是:真实、胜利、值得。如何利用RWW法评估市场机会的潜力和风险呢? RWW概述 Real-Win-Worth,简称RWW,起源于3M公司。 自20世纪80年代以来,越来越多的公司,包括3M,通用电气,霍尼韦尔,诺华等知名公司,都在使用Real-Win-Worth来评估市场机会或产品概念的商业潜力和风险大小,目前3M已在超过1500多个项目中使用RW…

    2022年7月24日
    1610
  • 手机银行与网上银行有什么区别,这些区别你都了解吗

    互联网的发展使得我们的生活变得越来越方便。我们需要购买什么东西,足不出户就可以在网上消费购买。那么我们就需要用到网银和手机银行。可是网银和手机银行到底有什么区别呢? 区别: 手机银行: 通过银行开发的手机APP进行登录,主要用于小额支付,每天转账额度不得大于100万,且跨行取款免手续费,相对网上银行来说携带方便、操作简单。但是不能通过个人手机登录手机银行办理以上的业务。 网上银行: 需要通过电脑P…

    2022年10月8日
    710
  • 新品推广活动方案怎么写,一个优秀推广方案离不开这四步

    做亚马逊在选择一款产品上架时,必先做好前期的调研准备,无论是市场调查、还是listing优化、推广计划等等,都要面面俱到,那么在帮助新品进行快速成长之前,如何利用现有的匹配工具做好准备和规划呢? 一、选品/数据分析 方案: 1、通过CPC数据分析同类目排名 2、选择对标排名 3、参考对标排名的相关广告数据等,筹划自己新品广告投入大致费用,匹配工具: 店铺CPC数据:参考竞争对手怎么做?研究下竞争对…

    2022年9月18日
    650
  • 免费照片做成视频的软件叫什么(把照片编辑成视频的技巧)

    短视频普及的时代,每个人都是视频的生产者。大家开始用手机拍摄视频,拍视频简单,大家都会,但会剪辑的人就少了,今天小蚁给大家介绍5款实用又简单的手机剪辑APP,人人都能学会,心动不?学会这些工具,你也可以轻松玩转后期! 1、剪映 推荐指数:★★★★★ 优点:设计简洁,操作无任何难度,还可以拍摄抖音同款热门视频 素材全面,而且都是免费的,不花一分钱,就能出大片 剪辑好的视频可以直接同步到抖音,这个非常…

    2022年5月12日
    730
  • 注册表修复软件(系统注册表修复工具)

    PCSleekErrorCleaner是windows上的一款既简单又好用的注册表修复工具,功能上软件为用户们提供了扫描注册表下的所有的问题,并一键修复服务能力。其中为了便于一些用户们的特定需求,还提供支持选择要扫描的注册表目录哦,里面就包含了注册表完整性、ActiveX/COM、旧软件、应用程序、应用程序路径、文件关联、MRU文件、设备驱动程序…内容。至于效率上还是相当快的,在选中全部注册表…

    2022年5月10日
    990

发表回复

登录后才能评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信