博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
eureka客户端源码分析
阅读量:4229 次
发布时间:2019-05-26

本文共 18798 字,大约阅读时间需要 62 分钟。

eureka客户端可以完成的事情

  1. eureka客户端可以获取其他客户端的信息,用于向其他客户端发送请求。
  2. eureka客户端可以把自己注册到注册中心,这样其他客户端就可以发现本客户端,直接来调用本客户端。

相关依赖

首先要是springboot应用

org.springframework.cloud
spring-cloud-starter-eureka

我这里使用的版本是Brixton.SR5,不同版本的实现逻辑可能不一样。

逻辑详解

启动类上需要加@EnableDiscoveryClient注解,我们打开该注解开一下代码。

/** * Annotation to enable a DiscoveryClient implementation. * @author Spencer Gibb */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Import(EnableDiscoveryClientImportSelector.class)public @interface EnableDiscoveryClient {}

按注释的意思就可以知道,这里是去启用DiscoveryClient的实现类。这里如何去启动的?大概的思路是通过springboot的starter机制实现的,通过扫描特定的配置类EurekaClientAutoConfiguration,在该配置类中,生成了DiscoveryClient的bean。这里贴一小段代码。

@Bean(destroyMethod = "shutdown")		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)		public EurekaClient eurekaClient(ApplicationInfoManager manager,				EurekaClientConfig config) {			return new CloudEurekaClient(manager, config, this.optionalArgs,					this.context);		}

其实CloudEurekaClient里面的代码很少,基本都是父类DiscoveryClient实现的,我们直接看父类的方法。

最后是调用了如下构造方法,这里先贴一下构造方法的具体代码,不用去研究。

@Inject    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,                    Provider
backupRegistryProvider) { //健康校验的,跳过 if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; } //这主要是获取本应用信息的 this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference
(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); //一些监视器 if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } //如果不需要注册到eureka,也不需要从eureka读取信息的话,直接返回。 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } //这里初始化了一些线程池,主要是给下面的task用的, //heartbeatExecutor用于心跳,cacheRefreshExecutor用于重新获取信息 try { scheduler = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue
(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue
(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } //重点,刷新缓存和心跳是在这个方法里面干的 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }

主要逻辑在initScheduledTasks()里面,我们直接看这个里面的实现。

/**     * Initializes all scheduled tasks.     */    private void initScheduledTasks() {        //如果需要获取注册中心的信息,则执行里面的定时任务。        if (clientConfig.shouldFetchRegistry()) {            // registry cache refresh timer            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();            //定时去获取注册中心的信息,这里主要的逻辑是通过CacheRefreshThread去实现的            scheduler.schedule(                    new TimedSupervisorTask(                            "cacheRefresh",                            scheduler,                            cacheRefreshExecutor,                            registryFetchIntervalSeconds,                            TimeUnit.SECONDS,                            expBackOffBound,                            new CacheRefreshThread()                    ),                    registryFetchIntervalSeconds, TimeUnit.SECONDS);        }                    //如果需要把自身注册到注册中心的话,执行里面的逻辑        if (clientConfig.shouldRegisterWithEureka()) {            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);            //每隔一段时间,发送心跳信息到注册中心,具体的逻辑由HeartbeatThread线程实现。            // Heartbeat timer            scheduler.schedule(                    new TimedSupervisorTask(                            "heartbeat",                            scheduler,                            heartbeatExecutor,                            renewalIntervalInSecs,                            TimeUnit.SECONDS,                            expBackOffBound,                            new HeartbeatThread()                    ),                    renewalIntervalInSecs, TimeUnit.SECONDS);            // InstanceInfo replicator            instanceInfoReplicator = new InstanceInfoReplicator(                    this,                    instanceInfo,                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),                    2); // burstSize            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {                @Override                public String getId() {                    return "statusChangeListener";                }                @Override                public void notify(StatusChangeEvent statusChangeEvent) {                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {                        // log at warn level if DOWN was involved                        logger.warn("Saw local status change event {}", statusChangeEvent);                    } else {                        logger.info("Saw local status change event {}", statusChangeEvent);                    }                    instanceInfoReplicator.onDemandUpdate();                }            };            if (clientConfig.shouldOnDemandUpdateStatusChange()) {                applicationInfoManager.registerStatusChangeListener(statusChangeListener);            }            //用于更新本地instanceinfo并将其复制到远程服务器的任务            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());        } else {            logger.info("Not registering with Eureka server per configuration");        }    }

这里一共有三个逻辑

  1. 获取注册中心信息
  2. 心跳
  3. 更新本地信息并同步至远程

更新本地信息并同步至远程

这里看下InstanceInfoReplicator的run方法

public void run() {        try {            //刷新本地配置信息            discoveryClient.refreshInstanceInfo();            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();            //如果本地信息已经更新了且没有同步到远程,才有dirtyTimestamp时间            if (dirtyTimestamp != null) {                //向远程注册                discoveryClient.register();                                //已经同步到远程过了,这里需要更新状态,表明下次不需要重新注册                instanceInfo.unsetIsDirty(dirtyTimestamp);            }        } catch (Throwable t) {            logger.warn("There was a problem with the instance info replicator", t);        } finally {            //这里会继续轮询,发现本地信息更新了,就会同步到远程            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);            scheduledPeriodicRef.set(next);        }    }

我们再看下注册方法,注册方法也很简单,只是调用一下http请求,把自身信息告诉远程。

boolean register() throws Throwable {        logger.info(PREFIX + appPathIdentifier + ": registering service...");        EurekaHttpResponse
httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }

获取注册中心的信息

这里主要看一下CacheRefreshThread的run方法,最后调用了fetchRegistry方法

private boolean fetchRegistry(boolean forceFullRegistryFetch) {        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();        try {            //获取原来注册中心获取的信息            Applications applications = getApplications();            //新增的情况            if (clientConfig.shouldDisableDelta()                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))                    || forceFullRegistryFetch                    || (applications == null)                    || (applications.getRegisteredApplications().size() == 0)                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta            {                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);                logger.info("Application is null : {}", (applications == null));                logger.info("Registered Applications size is zero : {}",                        (applications.getRegisteredApplications().size() == 0));                logger.info("Application version is -1: {}", (applications.getVersion() == -1));                getAndStoreFullRegistry();            } else {                //更新的情况                getAndUpdateDelta(applications);            }            applications.setAppsHashCode(applications.getReconcileHashCode());            logTotalInstances();        } catch (Throwable e) {            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);            return false;        } finally {            if (tracer != null) {                tracer.stop();            }        }        // Notify about cache refresh before updating the instance remote status        onCacheRefreshed();        // Update remote status based on refreshed data held in the cache        updateInstanceRemoteStatus();        // registry was fetched successfully, so return true        return true;    }

这里直接看一个更新的情况getAndUpdateDelta

private void getAndUpdateDelta(Applications applications) throws Throwable {        long currentUpdateGeneration = fetchRegistryGeneration.get();        Applications delta = null;        //调用远程获取信息        EurekaHttpResponse
httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { //调用该方法更新了注册信息。 updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }

心跳

下面再看下心跳,主要看HeartbeatThread的run方法,最后调用了renew方法。

boolean renew() {        EurekaHttpResponse
httpResponse; try { //发送http请求,instanceInfo为客户端信息。 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); return register(); } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }

注册信息

这里贴一个Applications类的代码,该类用于保存注册中心返回的信息,具体信息内容可以自己打断点查看。

里面保存了很多数据结构,主要是为了后期数据更方便的使用。

/** * The class that wraps all the registry information returned by eureka server. * */@Serializer("com.netflix.discovery.converters.EntityBodyConverter")@XStreamAlias("applications")@JsonRootName("applications")public class Applications {    private static final String APP_INSTANCEID_DELIMITER = "$$";    private static final Logger logger = LoggerFactory.getLogger(Applications.class);    private static final String STATUS_DELIMITER = "_";    private Long versionDelta = Long.valueOf(-1);    @XStreamImplicit    private AbstractQueue
applications; private Map
appNameApplicationMap = new ConcurrentHashMap
(); private Map
> virtualHostNameAppMap = new ConcurrentHashMap
>(); private Map
> secureVirtualHostNameAppMap = new ConcurrentHashMap
>(); private Map
virtualHostNameIndexMap = new ConcurrentHashMap
(); private Map
secureVirtualHostNameIndexMap = new ConcurrentHashMap
(); private Map
>> shuffleVirtualHostNameMap = new ConcurrentHashMap
>>(); private Map
>> shuffledSecureVirtualHostNameMap = new ConcurrentHashMap
>>(); //其他代码省略。}

转载地址:http://asjqi.baihongyu.com/

你可能感兴趣的文章
剑指Offer——从上到下打印二叉树
查看>>
剑指Offer——字符串的排列
查看>>
剑指Offer——把数组排成最小的数
查看>>
剑指Offer——丑数
查看>>
剑指Offer——字符串中第一个只出现一次的字符
查看>>
Linux 中的硬链接与软连接有什么区别
查看>>
Python 图像处理库
查看>>
使用PHPMailer-master发送邮件
查看>>
利用smtp协议实现命令行发送邮件
查看>>
利用php的mail()函数发送邮件
查看>>
(一).postman学习——前期知识准备
查看>>
qt入门级使用
查看>>
Web Stotage——本地储存详解及案例
查看>>
File Reader文件操作
查看>>
地理位置服务——navigator.geolocation
查看>>
地理位置服务——百度地图API
查看>>
js拖放事件详解及实战
查看>>
js字符串常用属性与方法
查看>>
C++递归算法案例
查看>>
C++算法——异或运算解决出现次数问题
查看>>