暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

dubbo-------心跳机制

Lord Lean Notes 2020-01-05
1613

dubbo的心跳机制是用来检测provider和consumer之间的connection链接是否还存在,如果不存在做什么处理。心跳机制的原理是:

提供者:如果60s内没有接收到消息,则发送心跳消息,如果连续三次(180s)没有接收到消息,则关闭channel。

消费者:如果在60s内没有接收到消息,则发送心跳消息,如果连着三次没有收到心跳消息,消费者就会重连。

我们在学习服务暴露时,其中会创建netty客户端来进行通信,其中有一个createServer方法,在这个方法中会给url设置心跳时间,默认是60s。设置方法是:

现在我们来看下提供端心跳机制的调用路径:

可以看到最终执行到了new HeaderExchangeServer方法中,现在我们来看下构造此类的代码:

我们可以看到如果server为null就抛出异常,不为null的话就获取server心跳时间默认为0,心跳超时时间是心跳时间的三倍,如果心跳超时时间<心跳时间的两倍就抛出异常,否则执行startHeatbeatTimer方法。现在来看下这个方法的代码:

我们可以看到,在这个方法中,创建了一个给定初始延迟的间隔性scheduleWithFixedDelay线程池,而这个线程池在这个方法中的作用就是当我们创建完心跳线程[线程名---dubbo-remoting-server-heartbeat]之后,延迟60s执行任务,当执行完一次任务之后,在60s之后在执行一次任务。如果在创建心跳线程任务时,会获取到NettyServer中全部的channel链接。对于已经存在心跳线程,则首先关闭心跳线程,然后再创建心跳线程。
现在我们来看HeartBeatTask线程任务的代码:
    public void run() {
    try {
    long now = System.currentTimeMillis();
    Iterator i$ = this.channelProvider.getChannels().iterator();


    while(true) {
    Channel channel;
                /* channel是不可以被关闭的 */
    do {
    if (!i$.hasNext()) {
    return;
    }


    channel = (Channel)i$.next();
    } while(channel.isClosed());


    try {
    /* 获取最后一次读写的时间 */
    Long lastRead = (Long)channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
    Long lastWrite = (Long)channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    /* 如果最后一次读写的时间距离现在时间已经超过心跳时间,则发送心跳请求 */
    if (lastRead != null && now - lastRead > (long)this.heartbeat || lastWrite != null && now - lastWrite > (long)this.heartbeat) {
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setEvent(Request.HEARTBEAT_EVENT);
    channel.send(req);
    if (logger.isDebugEnabled()) {
    logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + this.heartbeat + "ms");
    }
    }
                    /* 正常消息和心跳消息都未接收到 */
    if (lastRead != null && now - lastRead > (long)this.heartbeatTimeout) {
    logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + this.heartbeatTimeout + "ms");
    /* 如果是consumer端则重连,provider端则关闭链接 */
    if (channel instanceof Client) {
                            try {
    ((Client)channel).reconnect();
    } catch (Exception var8) {
    }
    } else {
    channel.close();
    }
    }
    } catch (Throwable var9) {
    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), var9);
    }
    }
    } catch (Throwable var10) {
    logger.warn("Unhandled exception when heartbeat, cause: " + var10.getMessage(), var10);
    }
    }
    我们可以看到在HeartBeatTask类中把心跳机制的重试机制和当心跳和正常消息都未接收到的处理方案。可以看到还是比较简单的。
    在了解完提供者心跳机制的创建之后,我们消费端的心跳机制:

    通过请求和响应的调用顺序图,我们可以看到consumer端是建立了长连接,最后的实现是在HeaderExchangeClient类,现在就让我们来看一下:

    我们可以看到创建这个类时的步骤和提供者是相似的,下面我们来看下startHeatTimer方法,代码如下: 

    可以看到会先暂停现有的心跳检测器,然后创建一个新的,但是和提供者创建心跳检测器不同的是consumer只会获取当前实例。

    通过对提供者和服务者的心跳机制的学习,我们对dubbo的心跳机制就会有一个更深的理解。心跳机制主要是通过心跳线程来进行检测,并且当最后一次的读写时间超过心跳时间后,就会进行重连。如果最后一次写的时间超过心跳超时时间就会提供者就会执行关闭操作,而消费者就会进行重连操作。

    文章转载自Lord Lean Notes,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论