Spring Cloud Kubernetes 服务注册和发现 Spring Cloud Kubernetes 使用,可以通过引入 org.springframework.cloud:spring-cloud-starter-kubernetes,这个 starter 依赖于 org.springframework.cloud:spring-cloud-kubernetes-core 和 org.springframework.cloud:spring-cloud-kubernetes-discovery
初始化 Kubernetes Client 初始化环境配置 环境初始化是通过 org.springframework.cloud.kubernetes.profile.KubernetesProfileEnvironmentPostProcessor类实现的,当环境初始化完成时,会检查 Kubernetes 是否开启,如果开启则会判断 Profile 是否注入到容器中,没有时将会注入 Profile 到容器中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void postProcessEnvironment (ConfigurableEnvironment environment, SpringApplication application) { final boolean kubernetesEnabled = environment.getProperty("spring.cloud.kubernetes.enabled" , Boolean.class, true ); if (!kubernetesEnabled) { return ; } if (isInsideKubernetes()) { if (hasKubernetesProfile(environment)) { } else { environment.addActiveProfile(KUBERNETES_PROFILE); } } else { } }
初始化 Kubernetes 依赖 相关 Kubernetes 核心依赖的初始化是通过 org.springframework.cloud.kubernetes.KubernetesAutoConfiguration实现的
1 2 3 4 5 6 7 8 9 10 @Bean @ConditionalOnMissingBean(Config.class) public Config kubernetesClientConfig (KubernetesClientProperties kubernetesClientProperties) { Config base = Config.autoConfigure(null ); Config properties = new ConfigBuilder(base) return properties; }
加载配置时,会先从本地的 ~/.kube中寻找配置,根据本地的配置,将当前应用中没有的配置补全,并返回相应的 Bean
1 2 3 4 5 @Bean @ConditionalOnMissingBean public KubernetesClient kubernetesClient (Config config) { return new DefaultKubernetesClient(config); }
通过生成的配置初始化 KubernetesClient
服务注册 服务注册是通过 org.springframework.cloud.kubernetes.registry.KubernetesAutoServiceRegistration 实现的,但是这个类在 2.x 中已经被标记为废弃,因为部署在 Kubernetes 中的服务已经存在于 etcd 中,所以注册并不会真正执行
初始化 Bean 相关 Bean 的初始化是在 org.springframework.cloud.kubernetes.discovery.KubernetesDiscoveryClientAutoConfiguration 中完成的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Bean public KubernetesServiceRegistry getServiceRegistry () { return new KubernetesServiceRegistry(); } @Bean public KubernetesRegistration getRegistration (KubernetesClient client, KubernetesDiscoveryProperties properties) { return new KubernetesRegistration(client, properties); } @Bean public KubernetesDiscoveryProperties getKubernetesDiscoveryProperties () { return new KubernetesDiscoveryProperties(); }
注册流程
实例化 KubernetesAutoServiceRegistration 类
因为实现了 SmartLifecycle 接口,所以在应用启动完成,收到 ServletWebServerInitializedEvent事件时开始注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @EventListener(ServletWebServerInitializedEvent.class) public void onApplicationEvent (ServletWebServerInitializedEvent event) { int localPort = event.getWebServer().getPort(); if (this .port.get() == 0 ) { this .port.compareAndSet(0 , localPort); start(); } } @Override public void start () { this .serviceRegistry.register(this .registration); this .context.publishEvent( new InstanceRegisteredEvent<>(this , this .registration.getProperties())); this .running.set(true ); }
注册完成后发出实例注册的事件
在 KubernetesServiceRegistry 实现注册逻辑
但实际上并未执行任何注册动作
1 2 3 4 @Override public void register (KubernetesRegistration registration) { log.info("Registering : " + registration); }
取消注册流程
收到事件后,调用 stop 方法,执行关闭逻辑;在 stop 方法中调用 deregister 方法,取消注册
1 2 3 4 5 6 7 8 9 10 11 12 @EventListener(ContextClosedEvent.class) public void onApplicationEvent (ContextClosedEvent event) { if (event.getApplicationContext() == this .context) { stop(); } } @Override public void stop () { this .serviceRegistry.deregister(this .registration); this .running.set(false ); }
KubernetesServiceRegistry 实现取消注册逻辑
1 2 3 4 @Override public void deregister (KubernetesRegistration registration) { log.info("DeRegistering : " + registration); }
服务发现 初始化 Bean 相关 Bean 的初始化在 org.springframework.cloud.kubernetes.discovery.KubernetesDiscoveryClient 中完成
1 2 3 4 5 6 7 8 9 10 @Bean @ConditionalOnMissingBean public KubernetesDiscoveryClient kubernetesDiscoveryClient ( KubernetesClient client, KubernetesDiscoveryProperties properties, KubernetesClientServicesFunction kubernetesClientServicesFunction, DefaultIsServicePortSecureResolver isServicePortSecureResolver) { return new KubernetesDiscoveryClient(client, properties, kubernetesClientServicesFunction, isServicePortSecureResolver); }
获取服务
调用 org.springframework.cloud.kubernetes.discovery.KubernetesDiscoveryClient#getServices() 方法获取指定条件下的服务名称
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public List<String> getServices () { String spelExpression = this .properties.getFilter(); Predicate<Service> filteredServices; if (spelExpression == null || spelExpression.isEmpty()) { filteredServices = (Service instance) -> true ; } else { Expression filterExpr = this .parser.parseExpression(spelExpression); filteredServices = (Service instance) -> { Boolean include = filterExpr.getValue(this .evalCtxt, instance, Boolean.class); if (include == null ) { return false ; } return include; }; } return getServices(filteredServices); } public List<String> getServices (Predicate<Service> filter) { return this .kubernetesClientServicesFunction.apply(this .client) .list() .getItems() .stream() .filter(filter) .map(s -> s.getMetadata().getName()) .collect(Collectors.toList()); }
获取实例
调用 org.springframework.cloud.kubernetes.discovery.KubernetesDiscoveryClient#getInstances,根据服务的名称获取相应的实例列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 @Override public List<ServiceInstance> getInstances (String serviceId) { Assert.notNull(serviceId, "[Assertion failed] - the object argument must not be null" ); List<Endpoints> endpointsList = this .properties.isAllNamespaces() ? this .client.endpoints() .inAnyNamespace() .withField("metadata.name" , serviceId) .list() .getItems() : Collections.singletonList(this .client.endpoints().withName(serviceId).get()); List<EndpointSubsetNS> subsetsNS = endpointsList.stream() .map(this ::getSubsetsFromEndpoints) .collect(Collectors.toList()); List<ServiceInstance> instances = new ArrayList<>(); if (!subsetsNS.isEmpty()) { for (EndpointSubsetNS es : subsetsNS) { instances.addAll(this .getNamespaceServiceInstances(es, serviceId)); } } return instances; } private List<ServiceInstance> getNamespaceServiceInstances (EndpointSubsetNS es, String serviceId) { String namespace = es.getNamespace(); List<EndpointSubset> subsets = es.getEndpointSubset(); List<ServiceInstance> instances = new ArrayList<>(); if (!subsets.isEmpty()) { final Service service = this .client.services() .inNamespace(namespace) .withName(serviceId) .get(); final Map<String, String> serviceMetadata = this .getServiceMetadata(service); KubernetesDiscoveryProperties.Metadata metadataProps = this .properties.getMetadata(); for (EndpointSubset s : subsets) { Map<String, String> endpointMetadata = new HashMap<>(serviceMetadata); if (metadataProps.isAddPorts()) { Map<String, String> ports = s.getPorts() .stream() .filter(port -> !StringUtils.isEmpty(port.getName())) .collect(toMap(EndpointPort::getName, port -> Integer.toString(port.getPort()))); Map<String, String> portMetadata = getMapWithPrefixedKeys(ports, metadataProps.getPortsPrefix()); if (log.isDebugEnabled()) { log.debug("Adding port metadata: " + portMetadata); } endpointMetadata.putAll(portMetadata); } List<EndpointAddress> addresses = s.getAddresses(); for (EndpointAddress endpointAddress : addresses) { String instanceId = null ; if (endpointAddress.getTargetRef() != null ) { instanceId = endpointAddress.getTargetRef().getUid(); } EndpointPort endpointPort = findEndpointPort(s); instances.add(new KubernetesServiceInstance(instanceId, serviceId, endpointAddress, endpointPort, endpointMetadata, this .isServicePortSecureResolver .resolve(new DefaultIsServicePortSecureResolver.Input( endpointPort.getPort(), service.getMetadata().getName(), service.getMetadata().getLabels(), service.getMetadata().getAnnotations())))); } } } return instances; }
服务列表更新 Kubernetes 的服务列表更新是通过定时任务实现的,核心类是 KubernetesDiscoveryClient
Kubernetes 不支持通过服务实例更新,因为调用时是通过 Service 的名称实现的,Kubernetes会做负载均衡,所以不需要在实例维度监听
实例初始化 1 2 3 4 5 6 @Bean @ConditionalOnMissingBean @ConditionalOnProperty(name = "spring.cloud.kubernetes.discovery.catalog-services-watch.enabled", matchIfMissing = true) public KubernetesCatalogWatch kubernetesCatalogWatch (KubernetesClient client) { return new KubernetesCatalogWatch(client); }
监听实现 KubernetesCatalogWatch 类实现了 ApplicationEventPublisherAware接口,用于发现服务列表更新后发送相应的事件
默认执行拉取任务的时间是30s,需要特别注意的是,该任务的开启依赖于@EnableScheduling注解开启定时任务,默认不会生效
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Scheduled(fixedDelayString = "${spring.cloud.kubernetes.discovery.catalogServicesWatchDelay:30000}") public void catalogServicesWatch () { try { List<String> previousState = this .catalogEndpointsState.get(); List<Endpoints> endpoints = this .kubernetesClient.endpoints() .list() .getItems(); List<String> endpointsPodNames = endpoints.stream() .map(Endpoints::getSubsets) .filter(Objects::nonNull) .flatMap(Collection::stream) .map(EndpointSubset::getAddresses) .filter(Objects::nonNull) .flatMap(Collection::stream) .map(EndpointAddress::getTargetRef) .filter(Objects::nonNull) .map(ObjectReference::getName) .sorted(String::compareTo) .collect(Collectors.toList()); this .catalogEndpointsState.set(endpointsPodNames); if (!endpointsPodNames.equals(previousState)) { logger.trace("Received endpoints update from kubernetesClient: {}" , endpointsPodNames); this .publisher.publishEvent(new HeartbeatEvent(this , endpointsPodNames)); } } catch (Exception e) { logger.error("Error watching Kubernetes Services" , e); } }