心跳体制
什么是心跳
所说心跳, 即在 TCP 长连接中, 手机客户端和服务器进行按时发送的一种特有的数据, 通告别人自身还线上, 以保证 TCP 联接的实效性.
注:心跳包也有另一个功效,常常被忽视,即:一个联接假如长期不用,服务器防火墙或是无线路由器便会断掉该联接。
怎样完成
关键Handler —— IdleStateHandler
在 Netty 中, 完成心跳体制的关键是 IdleStateHandler, 那麼这一 Handler 怎么使用呢? 首先看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); }
这儿表述下三个主要参数的含意:
- readerIdleTimeSeconds: 读请求超时. 即如在指定的时间间隔内沒有从 Channel 载入到数据信息时, 会激发一个 READER_IDLE 的 IdleStateEvent 事件.
- writerIdleTimeSeconds: 写请求超时. 即如在指定的时间间隔内沒有数据信息载入到 Channel 时, 会激发一个 WRITER_IDLE 的 IdleStateEvent 事件.
- allIdleTimeSeconds: 读/写请求超时. 即如在指定的时间间隔内沒有读或写实际操作时, 会激发一个ALL_IDLE 的 IdleStateEvent 事件.
注:这三个主要参数默认设置的时间单位是秒。若必须指定别的时间单位,可以应用另一个构造函数:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
在看下面的完成以前,提议先了解一下IdleStateHandler的建立基本原理。
下边立即上编码,必须留意的地区,会在源代码中根据注解开展表明。
应用IdleStateHandler完成心跳
下边将应用IdleStateHandler来完成心跳,Client端接入到Server端后,会循环系统实行一个每日任务:任意等候几秒,随后ping一下Server端,即发送一个心跳包。当等待时间超出規定時间,可能发送不成功,认为Server端在这之前已经积极断开了。编码如下所示:
Client端
ClientIdleStateTrigger —— 心跳触发器原理
类ClientIdleStateTrigger也是一个Handler,仅仅重新写过了userEventTriggered方式,用以捕获IdleState.WRITER_IDLE事件(未在指定時间内向型网络服务器发送数据信息),随后向Server端发送一个心跳包。
/** * <p> * 用以捕获{@link IdleState#WRITER_IDLE}事件(未在指定時间内向型网络服务器发送数据信息),随后向<code>Server</code>端发送一个心跳包。 * </p> */ public class ClientIdleStateTrigger extends ChannelInboundHandlerAdapter { public static final String HEART_BEAT = \"heart beat!\"; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { // write heartbeat to server ctx.writeAndFlush(HEART_BEAT); } } else { super.userEventTriggered(ctx, evt); } } }
Pinger —— 心跳发射装置
/** * <p>手机客户端联接到服务端后,会循环系统实行一个每日任务:任意等候几秒,随后ping一下Server端,即发送一个心跳包。</p> */ public class Pinger extends ChannelInboundHandlerAdapter { private Random random = new Random(); private int baseRandom = 8; private Channel channel; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.channel = ctx.channel(); ping(ctx.channel()); } private void ping(Channel channel) { int second = Math.max(1, random.nextInt(baseRandom)); System.out.println(\"next heart beat will send after \" second \"s.\"); ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() { @Override public void run() { if (channel.isActive()) { System.out.println(\"sending heart beat to the server...\"); channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT); } else { System.err.println(\"The connection had broken, cancel the task that will send a heart beat.\"); channel.closeFuture(); throw new RuntimeException(); } } }, second, TimeUnit.SECONDS); future.addListener(new GenericFutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { ping(channel); } } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 当Channel已经中断的情形下, 依然发送数据信息, 会抛出现异常, 该方式会被启用. cause.printStackTrace(); ctx.close(); } }
ClientHandlersInitializer —— 手机客户端CPU结合的复位类
public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> { private ReconnectHandler reconnectHandler; private EchoHandler echoHandler; public ClientHandlersInitializer(TcpClient tcpClient) { Assert.notNull(tcpClient, \"TcpClient can not be null.\"); this.reconnectHandler = new ReconnectHandler(tcpClient); this.echoHandler = new EchoHandler(); } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new Pinger()); } }
注: 上边的Handler结合,除开Pinger,别的全是转码软件和处理粘包,可以忽视。
TcpClient —— TCP联接的手机客户端
public class TcpClient { private String host; private int port; private Bootstrap bootstrap; /** 将<code>Channel</code>保存, 可用来在别的非handler的地区传送数据 */ private Channel channel; public TcpClient(String host, int port) { this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000)); } public TcpClient(String host, int port, RetryPolicy retryPolicy) { this.host = host; this.port = port; init(); } /** * 向远程控制TCP网络服务器要求联接 */ public void connect() { synchronized (bootstrap) { ChannelFuture future = bootstrap.connect(host, port); this.channel = future.channel(); } } private void init() { EventLoopGroup group = new NioEventLoopGroup(); // bootstrap 可器重, 只需在TcpClient创建对象的情况下复位就可以. bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ClientHandlersInitializer(TcpClient.this)); } public static void main(String[] args) { TcpClient tcpClient = new TcpClient(\"localhost\", 2222); tcpClient.connect(); } }
Server端
ServerIdleStateTrigger —— 断连触发器原理
/** * <p>在要求时间段内未接到服务端的一切数据, 将积极断掉该联接</p> */ public class ServerIdleStateTrigger extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.READER_IDLE) { // 在要求时间段内没接到服务端的上涨数据信息, 积极断开 ctx.disconnect(); } } else { super.userEventTriggered(ctx, evt); } } }
ServerBizHandler —— 服务端的业务流程CPU
/** * <p>接到来源于手机客户端的数据后, 立即在控制面板打印出出去.</p> */ @ChannelHandler.Sharable public class ServerBizHandler extends SimpleChannelInboundHandler<String> { private final String REC_HEART_BEAT = \"I had received the heart beat!\"; @Override protected void channelRead0(ChannelHandlerContext ctx, String data) throws Exception { try { System.out.println(\"receive data: \" data); // ctx.writeAndFlush(REC_HEART_BEAT); } catch (Exception e) { e.printStackTrace(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(\"Established connection with the remote client.\"); // do something ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(\"Disconnected with the remote client.\"); // do something ctx.fireChannelInactive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ServerHandlerInitializer —— 服务端CPU结合的复位类
/** * <p>用以复位服务端牵涉到的全部<code>Handler</code></p> */ public class ServerHandlerInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(\"idleStateHandler\", new IdleStateHandler(5, 0, 0)); ch.pipeline().addLast(\"idleStateTrigger\", new ServerIdleStateTrigger()); ch.pipeline().addLast(\"frameDecoder\", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); ch.pipeline().addLast(\"frameEncoder\", new LengthFieldPrepender(4)); ch.pipeline().addLast(\"decoder\", new StringDecoder()); ch.pipeline().addLast(\"encoder\", new StringEncoder()); ch.pipeline().addLast(\"bizHandler\", new ServerBizHandler()); } }
注:new IdleStateHandler(5, 0, 0)该handler意味着假如在5秒内没得到来源于手机客户端的一切数据(包含但是不限于心跳包),可能积极断掉与该手机客户端的联接。
TcpServer —— 服务端
public class TcpServer { private int port; private ServerHandlerInitializer serverHandlerInitializer; public TcpServer(int port) { this.port = port; this.serverHandlerInitializer = new ServerHandlerInitializer(); } public void start() { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(this.serverHandlerInitializer); // 关联端口号,逐渐接受进去的联接 ChannelFuture future = bootstrap.bind(port).sync(); System.out.println(\"Server start listen at \" port); future.channel().closeFuture().sync(); } catch (Exception e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); e.printStackTrace(); } } public static void main(String[] args) throws Exception { int port = 2222; new TcpServer(port).start(); } }
至此,全部编码已经撰写结束。
检测
最先运行手机客户端,重新启动服务端。运行进行后,在手机客户端的操纵台子上,能够看见打印出如下所示相近日志:
手机客户端控制面板輸出的日志
在服务端能够看见控制面板輸出了相近如下的日志:
服务器端控制面板輸出的日志
能够看见,客户端在发送4个心跳包后,第5个包由于等待的时间较长,直到真真正正发送的情况下,发觉连接已断开了;而服务器端接到客户端的4个心率数据后,一直等不到下一个数据,因此坚决断开该连接。
异常现象
在检测环节中,有可能会产生如下所示状况:
异常现象
发生这种现象的因素是:在连接已断开的情形下,依然向服务器端发送心跳包。尽管在发送心跳包以前会应用channel.isActive()分辨连接是不是可以用,但也是有很有可能上一刻分辨結果为可以用,但下一刻发送数据以前,连接就断掉。
现阶段并未寻找雅致解决这样的情况的计划方案,各位看官如果有好的解决方法,还望鼎力相助。叩谢!!!
断线重新连接
断线重连这儿就但是多详细介绍,坚信诸位都了解是什么原因。这儿只说大概构思,随后立即上编码。
完成构思
客户端在检测到与服务器端的连接断开后,或是一开始就没法连接的情形下,应用特定的重新连接对策开展重新连接实际操作,直到再次创建连接或再试频次耗光。
针对怎样检测连接是不是断开,则是根据重新写过ChannelInboundHandler#channelInactive来完成,但连接不能用,该方式会被开启,因此只要在该方式搞好重新连接工作中就可以。
编码完成
注:下列编码全是在上面心率体制的基本上改动/加上的。
由于断线重新连接是客户端的工作中,因此只需对客户端编码开展改动。
再试对策
RetryPolicy —— 再试对策插口
public interface RetryPolicy { /** * Called when an operation has failed for some reason. This method should return * true to make another attempt. * * @param retryCount the number of times retried so far (0 the first time) * @return true/false */ boolean allowRetry(int retryCount); /** * get sleep time in ms of current retry count. * * @param retryCount current retry count * @return the time to sleep */ long getSleepTimeMs(int retryCount); }
ExponentialBackOffRetry —— 重新连接对策的默认设置完成
/** * <p>Retry policy that retries a set number of times with increasing sleep time between retries</p> */ public class ExponentialBackOffRetry implements RetryPolicy { private static final int MAX_RETRIES_LIMIT = 29; private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE; private final Random random = new Random(); private final long baseSleepTimeMs; private final int maxRetries; private final int maxSleepMs; public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) { this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS); } public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) { this.maxRetries = maxRetries; this.baseSleepTimeMs = baseSleepTimeMs; this.maxSleepMs = maxSleepMs; } @Override public boolean allowRetry(int retryCount) { if (retryCount < maxRetries) { return true; } return false; } @Override public long getSleepTimeMs(int retryCount) { if (retryCount < 0) { throw new IllegalArgumentException(\"retries count must greater than 0.\"); } if (retryCount > MAX_RETRIES_LIMIT) { System.out.println(String.format(\"maxRetries too large (%d). Pinning to %d\", maxRetries, MAX_RETRIES_LIMIT)); retryCount = MAX_RETRIES_LIMIT; } long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount)); if (sleepMs > maxSleepMs) { System.out.println(String.format(\"Sleep extension too large (%d). Pinning to %d\", sleepMs, maxSleepMs)); sleepMs = maxSleepMs; } return sleepMs; } }
ReconnectHandler—— 重新连接CPU
@ChannelHandler.Sharable public class ReconnectHandler extends ChannelInboundHandlerAdapter { private int retries = 0; private RetryPolicy retryPolicy; private TcpClient tcpClient; public ReconnectHandler(TcpClient tcpClient) { this.tcpClient = tcpClient; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(\"Successfully established a connection to the server.\"); retries = 0; ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (retries == 0) { System.err.println(\"Lost the TCP connection with the server.\"); ctx.close(); } boolean allowRetry = getRetryPolicy().allowRetry(retries); if (allowRetry) { long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries); System.out.println(String.format(\"Try to reconnect to the server after %dms. Retry count: %d.\", sleepTimeMs, retries)); final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(() -> { System.out.println(\"Reconnecting ...\"); tcpClient.connect(); }, sleepTimeMs, TimeUnit.MILLISECONDS); } ctx.fireChannelInactive(); } private RetryPolicy getRetryPolicy() { if (this.retryPolicy == null) { this.retryPolicy = tcpClient.getRetryPolicy(); } return this.retryPolicy; } }
ClientHandlersInitializer
在以前的基本上,加上了重新连接CPUReconnectHandler。
public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> { private ReconnectHandler reconnectHandler; private EchoHandler echoHandler; public ClientHandlersInitializer(TcpClient tcpClient) { Assert.notNull(tcpClient, \"TcpClient can not be null.\"); this.reconnectHandler = new ReconnectHandler(tcpClient); this.echoHandler = new EchoHandler(); } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(this.reconnectHandler); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new Pinger()); } }
TcpClient
在以前的基础上添加重新连接、重连策略的支持。
public class TcpClient { private String host; private int port; private Bootstrap bootstrap; /** 重新连接策略 */ private RetryPolicy retryPolicy; /** 将<code>Channel</code>保存, 可用来在别的非handler的地区传送数据 */ private Channel channel; public TcpClient(String host, int port) { this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000)); } public TcpClient(String host, int port, RetryPolicy retryPolicy) { this.host = host; this.port = port; this.retryPolicy = retryPolicy; init(); } /** * 向远程控制TCP网络服务器要求联接 */ public void connect() { synchronized (bootstrap) { ChannelFuture future = bootstrap.connect(host, port); future.addListener(getConnectionListener()); this.channel = future.channel(); } } public RetryPolicy getRetryPolicy() { return retryPolicy; } private void init() { EventLoopGroup group = new NioEventLoopGroup(); // bootstrap 可器重, 只需在TcpClient创建对象的情况下复位就可以. bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ClientHandlersInitializer(TcpClient.this)); } private ChannelFutureListener getConnectionListener() { return new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { future.channel().pipeline().fireChannelInactive(); } } }; } public static void main(String[] args) { TcpClient tcpClient = new TcpClient(\"localhost\", 2222); tcpClient.connect(); } }
检测
在测试以前,为了更好地绕开 Connection reset by peer 出现异常,可以略微改动Pinger的ping()方式,添加if (second == 5)的标准分辨。如下所示:
private void ping(Channel channel) { int second = Math.max(1, random.nextInt(baseRandom)); if (second == 5) { second = 6; } System.out.println(\"next heart beat will send after \" second \"s.\"); ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() { @Override public void run() { if (channel.isActive()) { System.out.println(\"sending heart beat to the server...\"); channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT); } else { System.err.println(\"The connection had broken, cancel the task that will send a heart beat.\"); channel.closeFuture(); throw new RuntimeException(); } } }, second, TimeUnit.SECONDS); future.addListener(new GenericFutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { ping(channel); } } }); }
运行手机客户端
先只运行手机客户端,观查控制面板輸出,能够看见相近如下所示日志:
断开重新连接检测——手机客户端控制面板輸出
能够看见,当手机客户端发觉无法连接到服务端,因此一直试着重新连接。伴随着再试频次提升,再试间隔时间越大,但又不愿无尽扩大下来,因此必须定一个阀值,例如60s。如上图所述所显示,时下一次再试時间超出60s时,会打印出Sleep extension too large(*). Pinning to 60000,企业为ms。发生这样的话的意思是,推算出来的时长超出阀值(60s),因此把真真正正睡眠质量的时间重置为阀值(60s)。
运行服务端
然后运行服务端,随后再次观查手机客户端控制面板輸出。
断开重新连接检测——服务端运行后手机客户端控制面板輸出
能够看见,在第9次再试不成功后,第10次再试以前,运行的网络服务器,因此第10次重新连接的效果为Successfully established a connection to the server.,即取得成功联接到网络服务器。下面由于或是不按时ping网络服务器,因此发生断开重新连接、断开重新连接的循环系统。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。