序
本文主要研究一下spring-cloud-kubernetes-client-loadbalancer
ServiceInstanceListSupplier
org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplier.java
public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {
String getServiceId(); default Flux<List<ServiceInstance>> get(Request request) { return get(); } static ServiceInstanceListSupplierBuilder builder() { return new ServiceInstanceListSupplierBuilder(); }
}
spring-cloud-loadbalancer定义了ServiceInstanceListSupplier,它继承自Supplier,其泛型为
Flux<List<ServiceInstance>>
,它定义了getServiceId、get(Request)方法,并提供了builder静态方法
Request
org/springframework/cloud/client/loadbalancer/Request.java
public interface Request<C> {
// Avoid breaking backward compatibility default C getContext() { return null; } // TODO: define contents
}
Request提供了getContext方法,默认返回null
DefaultRequest
org/springframework/cloud/client/loadbalancer/DefaultRequest.java
public class DefaultRequest<T> implements Request<T> {
private T context; public DefaultRequest() { new DefaultRequestContext(); } public DefaultRequest(T context) { this.context = context; } @Override public T getContext() { return context; } public void setContext(T context) { this.context = context; } @Override public String toString() { ToStringCreator to = new ToStringCreator(this); to.append("context", context); return to.toString(); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof DefaultRequest<?> that)) { return false; } return Objects.equals(context, that.context); } @Override public int hashCode() { return Objects.hash(context); }
}
DefaultRequest实现了Request,其定义的泛型为context的类型
ServiceInstanceListSupplier
spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/loadbalancer/KubernetesServicesListSupplier.java
public abstract class KubernetesServicesListSupplier implements ServiceInstanceListSupplier {
protected final Environment environment; protected final KubernetesDiscoveryProperties discoveryProperties; protected final KubernetesServiceInstanceMapper mapper; public KubernetesServicesListSupplier(Environment environment, KubernetesServiceInstanceMapper mapper, KubernetesDiscoveryProperties discoveryProperties) { this.environment = environment; this.discoveryProperties = discoveryProperties; this.mapper = mapper; } @Override public String getServiceId() { return environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); } @Override public abstract Flux<List<ServiceInstance>> get();
}
KubernetesServicesListSupplier声明实现ServiceInstanceListSupplier接口,它是一个抽象类,定义了get方法,这里忽略了get(Request),没有将Request传递下来
KubernetesClientServicesListSupplier
spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientServicesListSupplier.java
public class KubernetesClientServicesListSupplier extends KubernetesServicesListSupplier {
private static final Log LOG = LogFactory.getLog(KubernetesClientServicesListSupplier.class); private CoreV1Api coreV1Api; private KubernetesClientProperties kubernetesClientProperties; private KubernetesNamespaceProvider kubernetesNamespaceProvider; public KubernetesClientServicesListSupplier(Environment environment, KubernetesServiceInstanceMapper mapper, KubernetesDiscoveryProperties discoveryProperties, CoreV1Api coreV1Api, KubernetesNamespaceProvider kubernetesNamespaceProvider) { super(environment, mapper, discoveryProperties); this.coreV1Api = coreV1Api; this.kubernetesNamespaceProvider = kubernetesNamespaceProvider; } private String getNamespace() { return kubernetesNamespaceProvider != null ? kubernetesNamespaceProvider.getNamespace() : kubernetesClientProperties.namespace(); } @Override public Flux<List<ServiceInstance>> get() { LOG.info("Getting services with id " + this.getServiceId()); List<ServiceInstance> result = new ArrayList<>(); List<V1Service> services; try { if (discoveryProperties.allNamespaces()) { services = coreV1Api.listServiceForAllNamespaces(null, null, "metadata.name=" + this.getServiceId(), null, null, null, null, null, null, null, null).getItems(); } else { services = coreV1Api.listNamespacedService(getNamespace(), null, null, null, "metadata.name=" + this.getServiceId(), null, null, null, null, null, null, null).getItems(); } services.forEach(service -> result.add(mapper.map(service))); } catch (ApiException e) { LOG.warn("Error retrieving service with name " + this.getServiceId(), e); } LOG.info("Returning services: " + result); return Flux.defer(() -> Flux.just(result)); }
}
KubernetesClientServicesListSupplier继承了KubernetesServicesListSupplier,其构造器依赖KubernetesServiceInstanceMapper、KubernetesDiscoveryProperties、CoreV1Api、KubernetesNamespaceProvider;其get方法使用coreV1Api.listServiceForAllNamespaces或者coreV1Api.listNamespacedService来返回指定serviceId的V1Service信息,之后通过mapper转换为ServiceInstance
V1Service
io/kubernetes/client/openapi/models/V1Service.java
public class V1Service implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
private String apiVersion;public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
private String kind;public static final String SERIALIZED_NAME_METADATA = "metadata";
@SerializedName(SERIALIZED_NAME_METADATA)
private V1ObjectMeta metadata;public static final String SERIALIZED_NAME_SPEC = "spec";
@SerializedName(SERIALIZED_NAME_SPEC)
private V1ServiceSpec spec;public static final String SERIALIZED_NAME_STATUS = "status";
@SerializedName(SERIALIZED_NAME_STATUS)
private V1ServiceStatus status;
//......
}
V1Service定义了apiVersion、kind、metadata、spec、status属性
KubernetesServiceInstanceMapper
spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/loadbalancer/KubernetesServiceInstanceMapper.java
public interface KubernetesServiceInstanceMapper<T> {
KubernetesServiceInstance map(T service); static String createHost(String serviceName, String namespace, String clusterDomain) { return String.format("%s.%s.svc.%s", serviceName, StringUtils.hasText(namespace) ? namespace : "default", clusterDomain); } static boolean isSecure(Map<String, String> labels, Map<String, String> annotations, String servicePortName, Integer servicePort) { if (labels != null) { final String securedLabelValue = labels.getOrDefault("secured", "false"); if (securedLabelValue.equals("true")) { return true; } } if (annotations != null) { final String securedAnnotationValue = annotations.getOrDefault("secured", "false"); if (securedAnnotationValue.equals("true")) { return true; } } return (servicePortName != null && servicePortName.endsWith("https")) || servicePort.toString().endsWith("443"); } static Map<String, String> getMapWithPrefixedKeys(Map<String, String> map, String prefix) { if (map == null) { return new HashMap<>(); } if (!StringUtils.hasText(prefix)) { return map; } final Map<String, String> result = new HashMap<>(); map.forEach((k, v) -> result.put(prefix + k, v)); return result; }
}
KubernetesServiceInstanceMapper接口定义了map方法,它提供了createHost、isSecure、getMapWithPrefixedKeys静态方法
KubernetesClientServiceInstanceMapper
spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientServiceInstanceMapper.java
public class KubernetesClientServiceInstanceMapper implements KubernetesServiceInstanceMapper<V1Service> {
private KubernetesLoadBalancerProperties properties; private KubernetesDiscoveryProperties discoveryProperties; public KubernetesClientServiceInstanceMapper(KubernetesLoadBalancerProperties properties, KubernetesDiscoveryProperties discoveryProperties) { this.properties = properties; this.discoveryProperties = discoveryProperties; } @Override public KubernetesServiceInstance map(V1Service service) { final V1ObjectMeta meta = service.getMetadata(); final List<V1ServicePort> ports = service.getSpec().getPorts(); V1ServicePort port = null; if (ports.size() == 1) { port = ports.get(0); } else if (ports.size() > 1 && StringUtils.hasText(this.properties.getPortName())) { Optional<V1ServicePort> optPort = ports.stream() .filter(it -> properties.getPortName().endsWith(it.getName())).findAny(); if (optPort.isPresent()) { port = optPort.get(); } } if (port == null) { return null; } final String host = KubernetesServiceInstanceMapper.createHost(service.getMetadata().getName(), service.getMetadata().getNamespace(), properties.getClusterDomain()); final boolean secure = KubernetesServiceInstanceMapper.isSecure(service.getMetadata().getLabels(), service.getMetadata().getAnnotations(), port.getName(), port.getPort()); return new DefaultKubernetesServiceInstance(meta.getUid(), meta.getName(), host, port.getPort(), getServiceMetadata(service), secure); } private Map<String, String> getServiceMetadata(V1Service service) { final Map<String, String> serviceMetadata = new HashMap<>(); KubernetesDiscoveryProperties.Metadata metadataProps = this.discoveryProperties.metadata(); if (metadataProps.addLabels()) { Map<String, String> labelMetadata = KubernetesServiceInstanceMapper .getMapWithPrefixedKeys(service.getMetadata().getLabels(), metadataProps.labelsPrefix()); serviceMetadata.putAll(labelMetadata); } if (metadataProps.addAnnotations()) { Map<String, String> annotationMetadata = KubernetesServiceInstanceMapper .getMapWithPrefixedKeys(service.getMetadata().getAnnotations(), metadataProps.annotationsPrefix()); serviceMetadata.putAll(annotationMetadata); } return serviceMetadata; }
}
KubernetesClientServiceInstanceMapper实现了KubernetesServiceInstanceMapper接口,其泛型为V1Service,其map方法先通过service.getSpec().getPorts()获取port信息,之后通过createHost返回svc的短域名
<servicename>.<namespace>.svc.<clusterdomain>
,例如service-a.default.svc.cluster.local
,最后创建DefaultKubernetesServiceInstance
KubernetesClientLoadBalancerClientConfiguration
spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientLoadBalancerClientConfiguration.java
public class KubernetesClientLoadBalancerClientConfiguration {
@Bean @ConditionalOnProperty(name = "spring.cloud.kubernetes.loadbalancer.mode", havingValue = "SERVICE") ServiceInstanceListSupplier kubernetesServicesListSupplier(Environment environment, CoreV1Api coreV1Api, KubernetesClientServiceInstanceMapper mapper, KubernetesDiscoveryProperties discoveryProperties, KubernetesNamespaceProvider kubernetesNamespaceProvider, ConfigurableApplicationContext context) { return ServiceInstanceListSupplier.builder().withBase(new KubernetesClientServicesListSupplier(environment, mapper, discoveryProperties, coreV1Api, kubernetesNamespaceProvider)).withCaching().build(context); }
}
KubernetesClientLoadBalancerClientConfiguration在
spring.cloud.kubernetes.loadbalancer.mode
设置为SERVICE
的时候会自动创建kubernetesServicesListSupplier
小结
spring-cloud-loadbalancer定义了ServiceInstanceListSupplier,它继承自Supplier,其泛型为Flux<List<ServiceInstance>>
,它定义了getServiceId、get(Request)方法,并提供了builder静态方法;KubernetesServicesListSupplier声明实现ServiceInstanceListSupplier接口,它是一个抽象类,定义了get方法;KubernetesClientServicesListSupplier继承了KubernetesServicesListSupplier,其构造器依赖KubernetesServiceInstanceMapper、KubernetesDiscoveryProperties、CoreV1Api、KubernetesNamespaceProvider;其get方法使用coreV1Api.listServiceForAllNamespaces或者coreV1Api.listNamespacedService来返回指定serviceId的V1Service信息,之后通过mapper转换为ServiceInstance。
整体来看spring-cloud-kubernetes-client-loadbalancer目前仅支持spring.cloud.kubernetes.loadbalancer.mode为SERVICE的模式,其实内部还是走的k8s的service的域名解析及负载均衡,无法细粒度到个性化的负载均衡。