HelloWood

Spring Cloud 使用 Kubernetes 作为配置中心

2020-09-20

Spring Cloud 使用 Kubernetes 作为配置中心

Spring Cloud 支持使用 Kubernetes 作为配置中心,通过 ConfigMap 或 Secret,将配置添加到应用中

加载配置

加载配置是通过 PropertySourceLocator 来实现的,ConfigMap 使用 ConfigMapPropertySourceLocator 加载,Secret 使用 SecretsPropertySourceLocator加载

Bean 初始化

1
2
3
4
5
6
7
8
9
10
11
@Bean
@ConditionalOnProperty(name = "spring.cloud.kubernetes.config.enabled", matchIfMissing = true)
public ConfigMapPropertySourceLocator configMapPropertySourceLocator(ConfigMapConfigProperties properties) {
return new ConfigMapPropertySourceLocator(this.client, properties);
}

@Bean
@ConditionalOnProperty(name = "spring.cloud.kubernetes.secrets.enabled", matchIfMissing = true)
public SecretsPropertySourceLocator secretsPropertySourceLocator(SecretsConfigProperties properties) {
return new SecretsPropertySourceLocator(this.client, properties);
}

获取配置

获取配置是通过 PropertySourceLocator#locate 方法实现的,最终将获取到属性添加到环境中

ConfigMap

  • ConfigMapPropertySourceLocator#locate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public PropertySource locate(Environment environment) {
if (environment instanceof ConfigurableEnvironment) {
ConfigurableEnvironment env = (ConfigurableEnvironment) environment;

List<ConfigMapConfigProperties.NormalizedSource> sources = this.properties.determineSources();
CompositePropertySource composite = new CompositePropertySource("composite-configmap");
if (this.properties.isEnableApi()) {
sources.forEach(s -> composite.addFirstPropertySource(
getMapPropertySourceForSingleConfigMap(env, s)));
}

// 将配置添加的容器环境中
addPropertySourcesFromPaths(environment, composite);

return composite;
}
return null;
}

真正向 Kubernetes 发起请求的是通过调用 getMapPropertySourceForSingleConfigMap 方法,创建ConfigMapPropertySource实例的时候,会根据 getData 方法,从 ConfigMap 获取属性解析并添加到环境中

  • ConfigMapPropertySourceLocator#getMapPropertySourceForSingleConfigMap
1
2
3
4
5
6
7
8
9
10
private MapPropertySource getMapPropertySourceForSingleConfigMap(
ConfigurableEnvironment environment, NormalizedSource normalizedSource) {

String configurationTarget = this.properties.getConfigurationTarget();
// 创建新的属性
return new ConfigMapPropertySource(this.client,
getApplicationName(environment, normalizedSource.getName(), configurationTarget),
getApplicationNamespace(this.client, normalizedSource.getNamespace(), configurationTarget),
environment);
}
  • ConfigMapPropertySource
1
2
3
4
5
6
7
public ConfigMapPropertySource(KubernetesClient client, 
String name,
String namespace,
Environment environment) {
super(getName(client, name, namespace),
asObjectMap(getData(client, name, namespace, environment)));
}
  • ConfigMapPropertySource#getData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static Map<String, Object> getData(KubernetesClient client, String name,
String namespace, Environment environment) {
try {
Map<String, Object> result = new LinkedHashMap<>();
// 获取 ConfigMap
ConfigMap map = StringUtils.isEmpty(namespace)
? client.configMaps().withName(name).get()
: client.configMaps().inNamespace(namespace).withName(name).get();

// 添加到map 中
if (map != null) {
result.putAll(processAllEntries(map.getData(), environment));
}

if (environment != null) {
// 根据 Profile 加载 ConfigMap
for (String activeProfile : environment.getActiveProfiles()) {

String mapNameWithProfile = name + "-" + activeProfile;

ConfigMap mapWithProfile = StringUtils.isEmpty(namespace)
? client.configMaps().withName(mapNameWithProfile).get()
: client.configMaps().inNamespace(namespace)
.withName(mapNameWithProfile).get();

if (mapWithProfile != null) {
result.putAll(
processAllEntries(mapWithProfile.getData(), environment));
}

}
}

return result;

} catch (Exception e) {
LOG.warn("Can't read configMap with name: [" + name + "] in namespace:["
+ namespace + "]. Ignoring.", e);
}

return new LinkedHashMap<>();
}

Secret

  • SecretsPropertySourceLocator#locate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public PropertySource locate(Environment environment) {
if (environment instanceof ConfigurableEnvironment) {
ConfigurableEnvironment env = (ConfigurableEnvironment) environment;

List<SecretsConfigProperties.NormalizedSource> sources = this.properties.determineSources();
CompositePropertySource composite = new CompositePropertySource("composite-secrets");
if (this.properties.isEnableApi()) {
// 获取Secret
sources.forEach(s -> composite.addFirstPropertySource(
getKubernetesPropertySourceForSingleSecret(env, s)));
}

// read for secrets mount
// 读取并加入到容器中
putPathConfig(composite);

return composite;
}
return null;
}
  • SecretsPropertySourceLocator#getKubernetesPropertySourceForSingleSecret
1
2
3
4
5
6
7
8
9
10
11
12
private MapPropertySource getKubernetesPropertySourceForSingleSecret(
ConfigurableEnvironment environment,
SecretsConfigProperties.NormalizedSource normalizedSource) {

String configurationTarget = this.properties.getConfigurationTarget();
// 加载 Secret 属性 source
return new SecretsPropertySource(this.client,
environment,
getApplicationName(environment, normalizedSource.getName(), configurationTarget),
getApplicationNamespace(this.client, normalizedSource.getNamespace(), configurationTarget),
normalizedSource.getLabels());
}
  • SecretsPropertySource
1
2
3
4
5
public SecretsPropertySource(KubernetesClient client, Environment env, String name,
String namespace, Map<String, String> labels) {
super(getSourceName(client, env, name, namespace),
getSourceData(client, env, name, namespace, labels));
}
  • SecretsPropertySource#getSourceData

获取 Secret 的流程和获取 ConfigMap 一样,不同的是 Secret 在放入环境中之前,需要先通过 Base64 解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private static Map<String, Object> getSourceData(KubernetesClient client,
Environment env, String name, String namespace, Map<String, String> labels) {
Map<String, Object> result = new HashMap<>();

try {
// Read for secrets api (named)
// 根据名称和命名空间获取 secret
Secret secret;
if (StringUtils.isEmpty(namespace)) {
secret = client.secrets()
.withName(name)
.get();
} else {
secret = client.secrets()
.inNamespace(namespace)
.withName(name)
.get();
}
// 解码
putAll(secret, result);

// Read for secrets api (label)
// 根据 label 读取 Secret
if (!labels.isEmpty()) {
if (StringUtils.isEmpty(namespace)) {
client.secrets()
.withLabels(labels)
.list()
.getItems()
.forEach(s -> putAll(s, result));
} else {
client.secrets()
.inNamespace(namespace)
.withLabels(labels)
.list()
.getItems()
.forEach(s -> putAll(s, result));
}
}
} catch (Exception e) {
LOG.warn("Can't read secret with name: [" + name + "] or labels [" + labels
+ "] in namespace:[" + namespace + "] (cause: " + e.getMessage()
+ "). Ignoring");
}

return result;
}

监听配置

支持两种方式的监听配置,一种是通过和 Kubernetes 建立长连接,当配置发生变化时可以立即推送,另一种是通过长轮询的方式,通过定时任务来实现

配置的监听必须显式开启

Bean 初始化

Bean 的初始化是在 org.springframework.cloud.kubernetes.config.reload.ConfigReloadAutoConfiguration.ConfigReloadAutoConfigurationBeans 中实现的

  • 监听配置变化

根据配置选择是通过轮询还是监听事件方式实现,默认是监听事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Bean
@ConditionalOnMissingBean
public ConfigurationChangeDetector propertyChangeWatcher(ConfigReloadProperties properties, ConfigurationUpdateStrategy strategy) {
switch (properties.getMode()) {
case POLLING:
return new PollingConfigurationChangeDetector(this.environment,
properties,
this.kubernetesClient,
strategy,
this.configMapPropertySourceLocator,
this.secretsPropertySourceLocator);
case EVENT:
return new EventBasedConfigurationChangeDetector(this.environment,
properties,
this.kubernetesClient,
strategy,
this.configMapPropertySourceLocator,
this.secretsPropertySourceLocator);
}
throw new IllegalStateException("Unsupported configuration reload mode: " + properties.getMode());
}
  • 配置更新策略

配置更新支持三种策略,分别是重启,刷新,和关闭,关闭应用依赖于健康检查,当发现应用被关闭后需要通过 Kubernetes 主动拉起

默认策略是刷新上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Bean
@ConditionalOnMissingBean
public ConfigurationUpdateStrategy configurationUpdateStrategy(
ConfigReloadProperties properties,
ConfigurableApplicationContext ctx,
@Autowired(required = false) RestartEndpoint restarter,
ContextRefresher refresher) {
switch (properties.getStrategy()) {
// 重启
case RESTART_CONTEXT:
Assert.notNull(restarter, "Restart endpoint is not enabled");
return new ConfigurationUpdateStrategy(properties.getStrategy().name(),
() -> {
wait(properties);
restarter.restart();
});
// 刷新
case REFRESH:
return new ConfigurationUpdateStrategy(properties.getStrategy().name(),
refresher::refresh);
// 关闭
case SHUTDOWN:
return new ConfigurationUpdateStrategy(properties.getStrategy().name(),
() -> {
wait(properties);
ctx.close();
});
}
throw new IllegalStateException("Unsupported configuration update strategy: "
+ properties.getStrategy());
}

监听实现

spring-cloud-kubernetes-configuration-change-detector.png

PollingConfigurationChangeDetectorEventBasedConfigurationChangeDetector 都是 ConfigurationChangeDetector的子类

polling

默认 15s 拉取一次配置

  • PollingConfigurationChangeDetector#executeCycle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Scheduled(initialDelayString = "${spring.cloud.kubernetes.reload.period:15000}",
fixedDelayString = "${spring.cloud.kubernetes.reload.period:15000}")
public void executeCycle() {

boolean changedConfigMap = false;
// 监控 ConfigMap
if (this.properties.isMonitoringConfigMaps()) {
List<? extends MapPropertySource> currentConfigMapSources = findPropertySources(ConfigMapPropertySource.class);

if (!currentConfigMapSources.isEmpty()) {
changedConfigMap = changed(
locateMapPropertySources(this.configMapPropertySourceLocator, this.environment),
currentConfigMapSources);
}
}

boolean changedSecrets = false;
// 监控 Secret
if (this.properties.isMonitoringSecrets()) {
List<MapPropertySource> currentSecretSources = locateMapPropertySources(this.secretsPropertySourceLocator, this.environment);

if (currentSecretSources != null && !currentSecretSources.isEmpty()) {
List<SecretsPropertySource> propertySources = findPropertySources(SecretsPropertySource.class);
changedSecrets = changed(currentSecretSources, propertySources);
}
}

// 当发生变化时,更新属性
if (changedConfigMap || changedSecrets) {
reloadProperties();
}
}

拉取配置,通过调用change方法进行比较,判断是否发生变化,如果发生变化,则调用 reloadProperties 方法刷新

  • ConfigurationChangeDetector#reloadProperties
1
2
3
4
public void reloadProperties() {
this.log.info("Reloading using strategy: " + this.strategy.getName());
this.strategy.reload();
}

最终调用配置策略的 reload 方法,重新加载配置,需要注意的是,要被刷新的属性类应当通过 @RefreshScope@ConfigurationProperties注解修饰,这样才能监听到上下文的变化

Event

  • EventBasedConfigurationChangeDetector#watch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@PostConstruct
public void watch() {
boolean activated = false;

// 如果监控 ConfigMap
if (this.properties.isMonitoringConfigMaps()) {
try {
String name = "config-maps-watch";
// Watch 是通过 WebSocket 实现的
this.watches.put(name, this.kubernetesClient.configMaps()
.watch(new Watcher<ConfigMap>() {
@Override
public void eventReceived(Action action,
ConfigMap configMap) {
onEvent(configMap);
}

@Override
public void onClose(KubernetesClientException e) {
}
}));
activated = true;
this.log.info("Added new Kubernetes watch: " + name);
} catch (Exception e) {
this.log.error(
"Error while establishing a connection to watch config maps: configuration may remain stale",
e);
}
}

// 如果监控 Secret
if (this.properties.isMonitoringSecrets()) {
try {
activated = false;
String name = "secrets-watch";
this.watches.put(name,
this.kubernetesClient.secrets().watch(new Watcher<Secret>() {
@Override
public void eventReceived(Action action, Secret secret) {
onEvent(secret);
}

@Override
public void onClose(KubernetesClientException e) {
}
}));
activated = true;
this.log.info("Added new Kubernetes watch: " + name);
} catch (Exception e) {
this.log.error(
"Error while establishing a connection to watch secrets: configuration may remain stale",
e);
}
}

if (activated) {
this.log.info(
"Kubernetes event-based configuration change detector activated");
}
}

当收到消息时,将会调用 onEvent方法处理事件

  • EventBasedConfigurationChangeDetector#onEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void onEvent(ConfigMap configMap) {
// 加载配置,检测是否变化
boolean changed = changed(
locateMapPropertySources(this.configMapPropertySourceLocator, this.environment),
findPropertySources(ConfigMapPropertySource.class)
);
// 如果变化了,则重新加载属性
if (changed) {
this.log.info("Detected change in config maps");
reloadProperties();
}
}

private void onEvent(Secret secret) {
boolean changed = changed(
locateMapPropertySources(this.secretsPropertySourceLocator, this.environment),
findPropertySources(SecretsPropertySource.class));
if (changed) {
this.log.info("Detected change in secrets");
reloadProperties();
}
}

重新加载配置,当发现配置发生变化时会调用 reloadProperties方法更新配置