// Timed 是 Prometheus 的监控 @Timed("consul.watch-config-keys") publicvoidwatchConfigKeyValues(){ if (this.running.get()) { // 遍历所有的配置的 key for (String context : this.consulIndexes.keySet()) {
// turn the context into a Consul folder path (unless our config format // are FILES) if (this.properties.getFormat() != FILES && !context.endsWith("/")) { context = context + "/"; }
// 根据配置返回的 index 判断是否发生变化 try { Long currentIndex = this.consulIndexes.get(context); if (currentIndex == null) { currentIndex = -1L; }
log.trace("watching consul for context '" + context + "' with index " + currentIndex);
// use the consul ACL token if found String aclToken = this.properties.getAclToken(); if (StringUtils.isEmpty(aclToken)) { aclToken = null; }
// if response.value == null, response was a 404, otherwise it was a // 200 // reducing churn if there wasn't anything if (response.getValue() != null && !response.getValue().isEmpty()) { Long newIndex = response.getConsulIndex();
// 判断 key 的 index 是否相等,如果发生变化,则发出 RefreshEvent 事件 if (newIndex != null && !newIndex.equals(currentIndex)) { // don't publish the same index again, don't publish the first // time (-1) so index can be primed // 没有发布过这个 index 的事件,且不是第一次发布 if (!this.consulIndexes.containsValue(newIndex) && !currentIndex.equals(-1L)) { log.trace("Context " + context + " has new index " + newIndex); // 发送事件 RefreshEventData data = new RefreshEventData(context, currentIndex, newIndex); this.publisher.publishEvent(new RefreshEvent(this, data, data.toString())); } elseif (log.isTraceEnabled()) { log.trace("Event for index already published for context " + context); } this.consulIndexes.put(context, newIndex); } elseif (log.isTraceEnabled()) { log.trace("Same index for context " + context); } } elseif (log.isTraceEnabled()) { log.trace("No value for context " + context); }
} catch (Exception e) { // only fail fast on the initial query, otherwise just log the error if (this.firstTime && this.properties.isFailFast()) { log.error("Fail fast is set and there was an error reading configuration from consul."); ReflectionUtils.rethrowRuntimeException(e); } elseif (log.isTraceEnabled()) { log.trace("Error querying consul Key/Values for context '" + context + "'", e); } elseif (log.isWarnEnabled()) { // simplified one line log message in the event of an agent // failure log.warn("Error querying consul Key/Values for context '" + context + "'. Message: " + e.getMessage()); } } } } this.firstTime = false; }