HelloWood

Spring Cloud Consul 服务注册和发现

2020-09-20

Spring Cloud Consul 服务注册和发现

Spring Cloud Kubernetes 使用,可以通过引入 org.springframework.cloud:spring-cloud-starter-consul-discovery,这个 starter 依赖于 org.springframework.cloud:spring-cloud-consul-coreorg.springframework.cloud:spring-cloud-consul-discovery

Consul 的核心概念

  • server
    集群的核心节点,用于和 agent 通讯,保存服务的信息

  • agent
    集群节点的守护进程,用于服务注册等行为,但不保存数据

  • catalog
    集群服务通信的接口

初始化 Kubernetes Client

初始化 Consul 依赖

相关 Consul 核心依赖的初始化是通过 org.springframework.cloud.consul.ConsulAutoConfiguration实现的

  • 初始化 ConsulClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
@ConditionalOnMissingBean
public ConsulClient consulClient(ConsulProperties consulProperties) {
final int agentPort = consulProperties.getPort();
final String agentHost = !StringUtils.isEmpty(consulProperties.getScheme())
? consulProperties.getScheme() + "://" + consulProperties.getHost()
: consulProperties.getHost();

if (consulProperties.getTls() != null) {
ConsulProperties.TLSConfig tls = consulProperties.getTls();
TLSConfig tlsConfig = new TLSConfig(tls.getKeyStoreInstanceType(),
tls.getCertificatePath(), tls.getCertificatePassword(),
tls.getKeyStorePath(), tls.getKeyStorePassword());
return new ConsulClient(agentHost, agentPort, tlsConfig);
}
return new ConsulClient(agentHost, agentPort);
}

服务注册

初始化 Bean

相关 Bean 的初始化是在 org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistrationAutoConfiguration 中完成的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   // 自动注册
@Bean
@ConditionalOnMissingBean
public ConsulAutoServiceRegistration consulAutoServiceRegistration(
ConsulServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
ConsulDiscoveryProperties properties,
ConsulAutoRegistration consulRegistration) {
return new ConsulAutoServiceRegistration(registry, autoServiceRegistrationProperties, properties, consulRegistration);
}

// 启动事件监听器
@Bean
public ConsulAutoServiceRegistrationListener consulAutoServiceRegistrationListener(
ConsulAutoServiceRegistration registration) {
return new ConsulAutoServiceRegistrationListener(registration);
}
@Bean
@ConditionalOnMissingBean
public ConsulAutoRegistration consulRegistration(
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
ConsulDiscoveryProperties properties, ApplicationContext applicationContext,
ObjectProvider<List<ConsulRegistrationCustomizer>> registrationCustomizers,
ObjectProvider<List<ConsulManagementRegistrationCustomizer>> managementRegistrationCustomizers,
HeartbeatProperties heartbeatProperties) {
return ConsulAutoRegistration.registration(autoServiceRegistrationProperties,
properties, applicationContext, registrationCustomizers.getIfAvailable(),
managementRegistrationCustomizers.getIfAvailable(), heartbeatProperties);
}

注册流程

  • 当监听到 WebServerInitializedEvent 事件时触发注册

ConsulAutoServiceRegistrationListener 类实现了 SmartApplicationListener接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
// 判断是否是 web server 初始化事件
if (applicationEvent instanceof WebServerInitializedEvent) {
WebServerInitializedEvent event = (WebServerInitializedEvent) applicationEvent;

ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
this.autoServiceRegistration.setPortIfNeeded(event.getWebServer().getPort());
// 真正触发服务注册
this.autoServiceRegistration.start();
}
}

调用 org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistration#register 注册

1
2
3
4
5
6
7
8
9
@Override
protected void register() {
if (!this.properties.isRegister()) {
log.debug("Registration disabled.");
return;
}

super.register();
}

然后调用 org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void start() {
if (!this.isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}

} else {
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
this.register();
if (this.shouldRegisterManagement()) {
this.registerManagement();
}

this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
this.running.compareAndSet(false, true);
}

}
}

protected void register() {
this.serviceRegistry.register(this.getRegistration());
}
  • 最终在 ConsulServiceRegistry 实现注册逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void register(ConsulRegistration reg) {
log.info("Registering service with consul: " + reg.getService());
try {
// 将服务注册到 Consul
this.client.agentServiceRegister(reg.getService(), this.properties.getAclToken());
NewService service = reg.getService();

// 添加到心跳检查中
if (this.heartbeatProperties.isEnabled() &&
this.ttlScheduler != null &&
service.getCheck() != null &&
service.getCheck().getTtl() != null) {
this.ttlScheduler.add(reg.getInstanceId());
}
} catch (ConsulException e) {
if (this.properties.isFailFast()) {
log.error("Error registering service with consul: " + reg.getService(), e);
ReflectionUtils.rethrowRuntimeException(e);
}
log.warn("Failfast is false. Error registering service with consul: " + reg.getService(), e);
}
}

最后发出服务注册事件

取消注册流程

  • 在 Bean org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistration 销毁的时候调用 stop 方法,执行关闭逻辑;在 stop 方法中调用 deregister 方法,取消注册
1
2
3
4
5
6
7
8
9
10
11
public void stop() {
if (this.getRunning().compareAndSet(true, false) && this.isEnabled()) {
this.deregister();
if (this.shouldRegisterManagement()) {
this.deregisterManagement();
}

this.serviceRegistry.close();
}

}
  • ConsulServiceRegistry 实现取消注册逻辑
1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void deregister(ConsulRegistration reg) {
if (this.ttlScheduler != null) {
this.ttlScheduler.remove(reg.getInstanceId());
}
if (log.isInfoEnabled()) {
log.info("Deregistering service with consul: " + reg.getInstanceId());
}
// 将实例从 Consul 中移除
this.client.agentServiceDeregister(reg.getInstanceId(), this.properties.getAclToken());
}

服务发现

初始化 Bean

相关 Bean 的初始化在 org.springframework.cloud.consul.discovery.ConsulDiscoveryClientConfiguration 中完成

1
2
3
4
5
6
@Bean
@ConditionalOnMissingBean
public ConsulDiscoveryClient consulDiscoveryClient(ConsulClient consulClient,
ConsulDiscoveryProperties discoveryProperties) {
return new ConsulDiscoveryClient(consulClient, discoveryProperties);
}

获取服务

  • getService

调用 org.springframework.cloud.consul.discovery.ConsulDiscoveryClient#getServices 方法获取指定条件下的服务名称

1
2
3
4
5
6
7
8
9
@Override
public List<String> getServices() {
String aclToken = this.properties.getAclToken();

CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
.setQueryParams(QueryParams.DEFAULT)
.setToken(this.properties.getAclToken()).build();
return new ArrayList<>(this.client.getCatalogServices(request).getValue().keySet());
}

最终是调用了 Consul 的 /v1/catalog/services接口

获取实例

  • getInstance

调用 org.springframework.cloud.consul.discovery.ConsulDiscoveryClient#getInstances(java.lang.String, com.ecwid.consul.v1.QueryParams),根据服务的名称获取相应的实例列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public List<ServiceInstance> getInstances(final String serviceId,
final QueryParams queryParams) {
List<ServiceInstance> instances = new ArrayList<>();

addInstancesToList(instances, serviceId, queryParams);

return instances;
}


private void addInstancesToList(List<ServiceInstance> instances, String serviceId,
QueryParams queryParams) {

// 请求参数
HealthServicesRequest request = HealthServicesRequest.newBuilder()
.setTag(this.properties.getDefaultQueryTag())
.setPassing(this.properties.isQueryPassing())
.setQueryParams(queryParams)
.setToken(this.properties.getAclToken()).build();
Response<List<HealthService>> services = this.client.getHealthServices(serviceId,
request);

for (HealthService service : services.getValue()) {
String host = findHost(service);

Map<String, String> metadata = getMetadata(service, this.properties.isTagsAsMetadata());
boolean secure = false;
if (metadata.containsKey("secure")) {
secure = Boolean.parseBoolean(metadata.get("secure"));
}
instances.add(
new DefaultServiceInstance(
service.getService().getId(),
serviceId,
host,
service.getService().getPort(),
secure,
metadata)
);
}
}

服务列表更新

Consul 的实例监听是通过定时任务,默认每秒都会拉取服务列表,如果发现返回的 Index 发生变化,则说明服务发生变化,发出 HeartbeatEvent 事件

实例初始化

1
2
3
4
5
6
7
8
9
@Bean
@ConditionalOnMissingBean
public ConsulCatalogWatch consulCatalogWatch(
ConsulDiscoveryProperties discoveryProperties,
ConsulClient consulClient,
@Qualifier(CATALOG_WATCH_TASK_SCHEDULER_NAME) TaskScheduler taskScheduler) {
return new ConsulCatalogWatch(discoveryProperties, consulClient, taskScheduler);
}

监听实现

是在 org.springframework.cloud.consul.discovery.ConsulCatalogWatch#catalogServicesWatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Timed("consul.watch-catalog-services")
public void catalogServicesWatch() {
try {
long index = -1;
if (this.catalogServicesIndex.get() != null) {
index = this.catalogServicesIndex.get().longValue();
}

// 获取服务信息
CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
.setQueryParams(new QueryParams(this.properties.getCatalogServicesWatchTimeout(), index))
.setToken(this.properties.getAclToken()).build();
Response<Map<String, List<String>>> response = this.consul.getCatalogServices(request);
// 获取位点并发送事件
Long consulIndex = response.getConsulIndex();
if (consulIndex != null) {
this.catalogServicesIndex.set(BigInteger.valueOf(consulIndex));
}

if (log.isTraceEnabled()) {
log.trace("Received services update from consul: " + response.getValue() + ", index: " + consulIndex);
}
this.publisher.publishEvent(new HeartbeatEvent(this, consulIndex));
} catch (Exception e) {
log.error("Error watching Consul CatalogServices", e);
}
}