【Nacos源码之配置管理 十一】服务端LongPollingService推送变更数据到客户端

进击的老码农 2019年9月14日 11:48 阅读量:4706

前言


上一篇 【Nacos源码之配置管理 十】客户端长轮询监听服务端变更数据 介绍了客户端会像服务端发起长轮询来获取变更数据, 其实在客户端发起长轮询的请求相当于向服务端发起了一个订阅; 因为服务端接受到客户端的请求之后如果没有查询到变更数据是不会返回的;而是会等待29.5s(当然时间可配),在这个29.5s时间内,服务端如果检测到有数据变更,会立马像客户端发起响应请求,因为这个时间内服务端还是有hold住客户端发过来的请求,所以能发回响应数据; hold住request是用的AsyncContext异步
这边文章就具体来讲一讲

  • 服务端是怎么通知到客户端数据变更的
  • 如何以 拉模式 长轮询服务端

LongPollingService


LongPollingService 是一个长轮询服务,但是它是处理客户端的长轮询;
LongPollingService 还处理服务端本地数据变更之后的事情

服务端数据变更事件

LongPollingService实现了AbstractEventListener的onEvent方法; 这是一个发布订阅模式; 可以看 【Nacos源码之配置管理 二】Nacos中的事件发布与订阅–观察者模式;

在这里插入图片描述
之前在文章 Nacos源码之配置管理 七】服务端增删改配置数据之后如何通知集群中的其他机器 中有介绍修改数据之后的一些流程,就有讲到这里;这个是一个本地数据变更事件 ;
其中讲到DataChangeTask的时候留下了2个问号,前面挖的坑现在是填的时候了;
在这里插入图片描述
这次我们好好讲一讲数据变更任务DataChangeTask

DataChangeTask 服务端数据变更任务

在这里插入图片描述
配置数据有变更的时候执行这个方法; 遍历allSubs.iterator();得到对象 ClientLongPolling ;这是一个客户端长轮询的对象;里面保存了一些例如ip、clientMd5Map、asyncContext等等还有其他一些数据;

asyncContext :Servlet 3.0新增了异步处理, 这个对象持有客户端的 requestresponse; 就是通过这个对象,在服务端有了数据变更的情况下,能够里面的将变更数据返回响应给客户端; AsyncContext异步请求的用法

那么就剩下一个很重要的问题就是 allSubs 是什么时候订阅上的?

客户端发起长轮询


上一篇文章 【Nacos源码之配置管理 十】客户端长轮询监听服务端变更数据 分析了客户端发起长轮询的请求;如下
在这里插入图片描述
那么看看服务端这个listener做了什么
在这里插入图片描述
客户端发起了请求,高版本是支持长轮询,同时也兼容了老版本的短轮询; 短轮询就不分析了;

  public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                     int probeRequestSize) {

        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // do nothing but set fix polling timeout
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                    clientMd5Map.size(), probeRequestSize, changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
        asyncContext.setTimeout(0L);

        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }
  1. 获取客户端的请求参数;
    ①.str: 长轮询的超时时间,默认30s;详细可见上一篇文章
    ②.noHangUpFlag:不挂起标识,这个标识为false的时候,会把客户端的请求挂起;等待超时或者数据变更通知;如果客户端监听的数据是首次初始化,这个标识为true;
    ③.delayTime:延时时间;为了避免客户端请求超时,需要提前这个时间返回响应;这个数据是在配置管理中配置的,默认500毫秒,详细见 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中
    在这里插入图片描述
    在这里插入图片描述
  2. 对比客户端和服务端MD5是否相同,有不同则直接返回不同的dataid+group响应;
  3. 如果2中没有不同,并且如果noHangUpFlag=true 则直接返回,不挂起请求;
  4. 2和3都不满足,则使用AsyncContext,将请求异步化,直接挂起; 超时时间为
    str-delayTIme,str是客户端设置的时间如下所示,delayTime默认500毫秒,可以在管理后台配置(见上面具体如何配置),如果都不主动配置,那么超时时间是30000-500=29500; 29.5秒;
    在这里插入图片描述
  5. 执行ClientLongPolling任务

ClientLongPolling任务

这个类有如下属性
在这里插入图片描述
asyncContext中持有客户端的请求;clientMd5Map包含了客户端所要监听的数据的MD5;
在这里插入图片描述
ClientLongPolling这个任务类执行的时候,是把 一个任务延迟了timeoutTime之后再执行的,并且返回asyncTimeoutFuture,这个timeoutTime就是上面说到的超时时间,例如29.5s;
最重要的是,把这个当前实例放到了allSubs中; 等待有数据变更的时候,可以通知到这个客户端,因为当前实例有asyncContext ,可以相应客户端的请求;

29.5s之后做了什么事情

看看timeoutTime之后执行的方法体做了什么
在这里插入图片描述

isFixedPolling()下面再讲,暂时忽略;我们看到最终执行的是

  1. 删除订阅关系,为啥要删除,因为这个allSubs在配置数据有变更的时候会遍历这个来进行通知,这里相当于本次请求要结束了,所以删除,不让通知了
  2. 执行sendResponse(null);方法;

在这里插入图片描述
看完方法之后了解到,超时时间到了之后,服务端会直接完成客户端的本次请求;客户端没有获取到变更数据,然后会又立马进行下一次请求;重复这个过程;

timeoutTime超时时间到了之后,服务端会直接结束本次请求;然后客户端又会立马重新发起新一轮请求,重复这个过程;相当于是说客户端每隔timeoutTime时间之后,就发起一次请求判断服务端是否有变更数据;

那么问题来了,如果只是这样的话,那么就是服务端纯粹的使用 拉模式, 并没有服务端的推模式呀?

服务端变更数据使用推模式推送数据

还记得文章一开头就说到一个事件吗,LocalDataChangeEvent 事件,服务端中修改了配置数据之后,就通知这个事件,这个事件最终会执行DataChangeTask任务;
这个任务上面已经分析过了,我们再回头分析一下关键点;

   /**下面删除了部分代码;保留关键点**/
  class DataChangeTask implements Runnable {
        @Override
        public void run() {
            try {
                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {                  
                        iter.remove(); // 删除订阅关系                     
                      clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {            
            }
        }

  1. 遍历的allSubs; 这个allSubs是上面介绍过,客户端发起长轮询的请求的时候注册上的;
  2. 比较客户端订阅的配置数据MD5与当前是否一致,这个时候基本是不一致的,因为有修改嘛;
  3. 做了一些过滤操作之后,sendResponse(Arrays.asList(groupKey));
    这个操作,就是向客户端发送已经变更的配置项;发送了之后,本次请求也就结束了;客户端又会重新再发起新的一轮请求;

客户端+服务端

上面分析完了之后,我们总结一下;

  1. 客户端发起订阅请求;

  2. 服务端接收到请求之后,立马去查询一次数据是否变更;
    ①.如果有变更立马返回;然后客户端又回到步骤1;
    ②.如果没有变更,则把当前请求hold住一定的时间(默认29.5s)

  3. ①.如果这期间客户端所监听的数据都一直没有变更,怎时间到达之后,结束客户端的本次请求;客户端又回到步骤1;
    ②.如果期间有变更; 服务端会轮询所有监听了这个变更了配置项的客户端;
    然后立马返回响应变更了的配置项;本次请求也结束;客户端又回到步骤1;

就是这样一个不停的轮询的过程; 但是注意,服务端返回的只是哪些配置项有变更(只返回dataid+group等等,并没有返回content),客户端拿到这些变更配置项之后,还有主动请求配置项的content,来更新自己的缓存

isFixedPolling 固定长轮询

上面在介绍的时候,我们选择性忽略了这个固定长轮询;现在来介绍一下
固定长轮询并不是上介绍的 拉+推 的模式;而是客户端纯粹的
客户端发起长轮询,服务端立马查询是否有变更,若没有变更会挂起请求; 等待时间到达之后,会再次查询一次;然后返回结果; 如果期间有配置变更;是不会立马推送给客户端的,所以客户端是有一定的延迟的; 默认情况这个时间是10s;

如何设置固定长轮询

新增一个元数据配置项, DataId是 com.alibaba.nacos.meta.switch ; Group是DEFAULT_GROUP ;注意这个配置是内置的,一定要这样配置;

在这里插入图片描述

  1. 将配置isFixedPolling=true 打开固定长轮询
  2. fixedPollingInertval=10000;固定长轮询的间隔时间
  3. fixedDelayTime=500 延迟时间; 例如间隔时间是10s,延迟时间是0.5秒; 那么每隔9.5s执行一次轮询;0.5s的时间是为了防止请求超时的;

两种模式的比较

拉+推 的模式具有时效性;
纯粹的拉 会有一定的延迟;
推荐使用 拉+推模式

总结

在这里插入图片描述

进击的老码农

原文链接:https://blog.csdn.net/u010634066/article/details/100811030

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。