阅读背景:

Spring Cloud学习--Spring Cloud Eureka服务续约保活

来源:互联网 

    上一篇博客《Spring Cloud学习--Spring Cloud Eureka服务注册》中我们介绍了客户端注册到注册中心的操作,接下来我们了解学习一下Eureka client和server服务续约和保活机制。Eureka的服务续约保活使用注册者主动定期调用的,类似于hearbeant,每隔一段时间调用Eureka Server对外提供的保活接口,告诉注册中心注册者还是正常运行的。

服务续约保活

Renew操作会在Service Provider定时发起,用来通知Eureka Server自己还活着。 这里有两个比较重要的配置需要如下,可以在Run之前配置。
eureka.instance.leaseRenewalIntervalInSeconds
Renew频率。默认是30秒,也就是每30秒会向Eureka Server发起Renew操作。
eureka.instance.leaseExpirationDurationInSeconds
服务失效时间。默认是90秒,也就是如果Eureka Server在90秒内没有接收到来自Service Provider的Renew操作,就会把Service Provider剔除。

DiscoveryClient    初始化的时候会创建定时任务定时执行HeartbeatThread线程完成服务续约保活。

private void initScheduledTasks() {
       
            // Heartbeat timer
			//省略部分代码
			
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
					
			//省略部分代码

    }

接下来我们看看HeartbeatThread中到底实现了什么操作。

在HeartbeatThread中实现的操作还是很简单的,最终调用了DiscoveryClient.renew方法来实现服务续约保活操作。

 private class HeartbeatThread implements Runnable {

        public void run() {
	    //调用renew实现续约保活操作
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

在renew中调用续约保活接口完成续约保活操作

boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
	    //发送心跳信息
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

通过PUT协议调用接口https://localhost:1001/apps/COMPUTER-SERVICE/127.0.0.1:computer-service:3333进行服务保活操作。


注册中心服务续约保活

    Eureka Server对外提供PUT方法用于接收服务注册者的保活信息,更新服务注册者的相关状态信息。

    在InstanceResource中提供renewLease方法接收注册者的保活信息,进行服务信息保活操作。

@PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
		//根据服务信息进行服务保活操作
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
		//进行服务信息更新操作
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }

在InstanceRegistry中调用renew方法,首先会通过spring 的Event机制通知本地进行更新,然后调用父类PeerAwareInstanceRegistryImpl的renew方法

@Override
	public boolean renew(final String appName, final String serverId,
			boolean isReplication) {
		log("renew " + appName + " serverId " + serverId + ", isReplication {}"
				+ isReplication);
		List<Application> applications = getSortedApplications();
		for (Application input : applications) {
			if (input.getName().equals(appName)) {
				InstanceInfo instance = null;
				for (InstanceInfo info : input.getInstances()) {
					if (info.getId().equals(serverId)) {
						instance = info;
						break;
					}
				}
				publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
						instance, isReplication));
				break;
			}
		}
		return super.renew(appName, serverId, isReplication);
	}

在父类PeerAwareInstanceRegistryImpl中调用renew方法更新状态信息并发送通知

public boolean renew(final String appName, final String id, final boolean isReplication) {
		//调用父类的renew更新状态信息
        if (super.renew(appName, id, isReplication)) {
			//给每个节点发送更新信息
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

在父类AbstractInstanceRegistry中完成时间戳等信息的更新操作

public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
		//更加appName获取相同的app
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
			//根据id获取实例信息
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
					//更新实例信息
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
			//更新时间戳信息
            leaseToRenew.renew();
            return true;
        }
    }

总结:

    服务注册者定时主动调用注册中心的保活接口完成服务的保活工作,注册中心处理的逻辑也很简单。

  盗图:


    

分享到: