RemoteConfigLongPollService:远程配置长轮循服务,负责长轮循ConfigServer变更通知,当有更新时立即轮循触发更新
第一步RemetoConfigRepository构造中scheduleLongPollingRefresh()方法进入该类
private void scheduleLongPollingRefresh() {remoteConfigLongPollService.submit(m_namespace, this);}
进入RemoteConfigLongPollService下面方法
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);//// 若未启动长轮询定时任务,进行启动if (!m_longPollStarted.get()) {startLongPolling();}return added;}
若未启动长轮循,进行启动
private void startLongPolling() {if (!m_longPollStarted.compareAndSet(false, true)) {//already startedreturn;}try {final String appId = m_configUtil.getAppId();final String cluster = m_configUtil.getCluster();final String dataCenter = m_configUtil.getDataCenter();final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();m_longPollingService.submit(new Runnable() {@Overridepublic void run() {if (longPollingInitialDelayInMills > 0) {try {logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);} catch (InterruptedException e) {//ignore}}doLongPollingRefresh(appId, cluster, dataCenter);}});} catch (Throwable ex) {m_longPollStarted.set(false);ApolloConfigException exception =new ApolloConfigException("Schedule long polling refresh failed", ex);Tracer.logError(exception);logger.warn(ExceptionUtil.getDetailMessage(exception));}}
下面方法持续执行长轮循
private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {final Random random = new Random();ServiceDTO lastServiceDto = null;// 循环执行,直到停止或线程中断while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {//wait at most 5 secondstry {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {}}Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");String url = null;try {if (lastServiceDto == null) {//获取所有的ConfigServer地址List<ServiceDTO> configServices = getConfigServices();lastServiceDto = configServices.get(random.nextInt(configServices.size()));}// 组装url(notifications/v2)url =assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,m_notifications);logger.debug("Long polling from {}", url);HttpRequest request = new HttpRequest(url);request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);transaction.addData("Url", url);final HttpResponse<List<ApolloConfigNotification>> response =m_httpUtil.doGet(request, m_responseType);logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);if (response.getStatusCode() == 200 && response.getBody() != null) {updateNotifications(response.getBody());updateRemoteNotifications(response.getBody());transaction.addData("Result", response.getBody().toString());//通知对应的RemoteConfigRepositorynotify(lastServiceDto, response.getBody());}//try to load balanceif (response.getStatusCode() == 304 && random.nextBoolean()) {lastServiceDto = null;}m_longPollFailSchedulePolicyInSecond.success();transaction.addData("StatusCode", response.getStatusCode());transaction.setStatus(Transaction.SUCCESS);} catch (Throwable ex) {lastServiceDto = null;Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));transaction.setStatus(ex);long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();logger.warn("Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));try {TimeUnit.SECONDS.sleep(sleepTimeInSecond);} catch (InterruptedException ie) {//ignore}} finally {transaction.complete();}}}
41行:通知第1步注册的RemoteConfigRepository
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {if (notifications == null || notifications.isEmpty()) {return;}for (ApolloConfigNotification notification : notifications) {String namespaceName = notification.getNamespaceName();//create a new list to avoid ConcurrentModificationExceptionList<RemoteConfigRepository> toBeNotified =Lists.newArrayList(m_longPollNamespaces.get(namespaceName));ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();//since .properties are filtered out by default, so we need to check if there is any listener for ittoBeNotified.addAll(m_longPollNamespaces.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {try {remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);} catch (Throwable ex) {Tracer.logError(ex);}}}}remoteConfigRepository.onLongPollNotifiedpublic void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {m_longPollServiceDto.set(longPollNotifiedServiceDto);m_remoteMessages.set(remoteMessages);m_executorService.submit(new Runnable() {@Overridepublic void run() {m_configNeedForceRefresh.set(true);trySync();}});}
trySync()是不是很熟悉, 进入了主流程的1,,,,,,
文章转载自鲁班同学,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




