聊聊spring-cloud-kubernetes-client-loadbalancer

本文主要研究一下spring-cloud-kubernetes-client-loadbalancer

ServiceInstanceListSupplier

org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplier.java

代码语言:javascript
复制
public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {
String getServiceId();

default Flux&lt;List&lt;ServiceInstance&gt;&gt; get(Request request) {
	return get();
}

static ServiceInstanceListSupplierBuilder builder() {
	return new ServiceInstanceListSupplierBuilder();
}

}

spring-cloud-loadbalancer定义了ServiceInstanceListSupplier,它继承自Supplier,其泛型为Flux<List<ServiceInstance>>,它定义了getServiceId、get(Request)方法,并提供了builder静态方法

Request

org/springframework/cloud/client/loadbalancer/Request.java

代码语言:javascript
复制
public interface Request<C> {

// Avoid breaking backward compatibility
default C getContext() {
	return null;
}

// TODO: define contents

}

Request提供了getContext方法,默认返回null

DefaultRequest

org/springframework/cloud/client/loadbalancer/DefaultRequest.java

代码语言:javascript
复制
public class DefaultRequest<T> implements Request<T> {

private T context;

public DefaultRequest() {
	new DefaultRequestContext();
}

public DefaultRequest(T context) {
	this.context = context;
}

@Override
public T getContext() {
	return context;
}

public void setContext(T context) {
	this.context = context;
}

@Override
public String toString() {
	ToStringCreator to = new ToStringCreator(this);
	to.append(&#34;context&#34;, context);
	return to.toString();
}

@Override
public boolean equals(Object o) {
	if (this == o) {
		return true;
	}
	if (!(o instanceof DefaultRequest&lt;?&gt; that)) {
		return false;
	}
	return Objects.equals(context, that.context);
}

@Override
public int hashCode() {
	return Objects.hash(context);
}

}

DefaultRequest实现了Request,其定义的泛型为context的类型

ServiceInstanceListSupplier

spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/loadbalancer/KubernetesServicesListSupplier.java

代码语言:javascript
复制
public abstract class KubernetesServicesListSupplier implements ServiceInstanceListSupplier {

protected final Environment environment;

protected final KubernetesDiscoveryProperties discoveryProperties;

protected final KubernetesServiceInstanceMapper mapper;

public KubernetesServicesListSupplier(Environment environment, KubernetesServiceInstanceMapper mapper,
		KubernetesDiscoveryProperties discoveryProperties) {
	this.environment = environment;
	this.discoveryProperties = discoveryProperties;
	this.mapper = mapper;
}

@Override
public String getServiceId() {
	return environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
}

@Override
public abstract Flux&lt;List&lt;ServiceInstance&gt;&gt; get();

}

KubernetesServicesListSupplier声明实现ServiceInstanceListSupplier接口,它是一个抽象类,定义了get方法,这里忽略了get(Request),没有将Request传递下来

KubernetesClientServicesListSupplier

spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientServicesListSupplier.java

代码语言:javascript
复制
public class KubernetesClientServicesListSupplier extends KubernetesServicesListSupplier {

private static final Log LOG = LogFactory.getLog(KubernetesClientServicesListSupplier.class);

private CoreV1Api coreV1Api;

private KubernetesClientProperties kubernetesClientProperties;

private KubernetesNamespaceProvider kubernetesNamespaceProvider;

public KubernetesClientServicesListSupplier(Environment environment, KubernetesServiceInstanceMapper mapper,
		KubernetesDiscoveryProperties discoveryProperties, CoreV1Api coreV1Api,
		KubernetesNamespaceProvider kubernetesNamespaceProvider) {
	super(environment, mapper, discoveryProperties);
	this.coreV1Api = coreV1Api;
	this.kubernetesNamespaceProvider = kubernetesNamespaceProvider;
}

private String getNamespace() {
	return kubernetesNamespaceProvider != null ? kubernetesNamespaceProvider.getNamespace()
			: kubernetesClientProperties.namespace();
}

@Override
public Flux&lt;List&lt;ServiceInstance&gt;&gt; get() {
	LOG.info(&#34;Getting services with id &#34; + this.getServiceId());
	List&lt;ServiceInstance&gt; result = new ArrayList&lt;&gt;();
	List&lt;V1Service&gt; services;
	try {
		if (discoveryProperties.allNamespaces()) {
			services = coreV1Api.listServiceForAllNamespaces(null, null, &#34;metadata.name=&#34; + this.getServiceId(),
					null, null, null, null, null, null, null, null).getItems();
		}
		else {
			services = coreV1Api.listNamespacedService(getNamespace(), null, null, null,
					&#34;metadata.name=&#34; + this.getServiceId(), null, null, null, null, null, null, null).getItems();
		}
		services.forEach(service -&gt; result.add(mapper.map(service)));
	}
	catch (ApiException e) {
		LOG.warn(&#34;Error retrieving service with name &#34; + this.getServiceId(), e);
	}
	LOG.info(&#34;Returning services: &#34; + result);
	return Flux.defer(() -&gt; Flux.just(result));
}

}

KubernetesClientServicesListSupplier继承了KubernetesServicesListSupplier,其构造器依赖KubernetesServiceInstanceMapper、KubernetesDiscoveryProperties、CoreV1Api、KubernetesNamespaceProvider;其get方法使用coreV1Api.listServiceForAllNamespaces或者coreV1Api.listNamespacedService来返回指定serviceId的V1Service信息,之后通过mapper转换为ServiceInstance

V1Service

io/kubernetes/client/openapi/models/V1Service.java

代码语言:javascript
复制
public class V1Service implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
private String apiVersion;

public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
private String kind;

public static final String SERIALIZED_NAME_METADATA = "metadata";
@SerializedName(SERIALIZED_NAME_METADATA)
private V1ObjectMeta metadata;

public static final String SERIALIZED_NAME_SPEC = "spec";
@SerializedName(SERIALIZED_NAME_SPEC)
private V1ServiceSpec spec;

public static final String SERIALIZED_NAME_STATUS = "status";
@SerializedName(SERIALIZED_NAME_STATUS)
private V1ServiceStatus status;

//......
}

V1Service定义了apiVersion、kind、metadata、spec、status属性

KubernetesServiceInstanceMapper

spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/loadbalancer/KubernetesServiceInstanceMapper.java

代码语言:javascript
复制
public interface KubernetesServiceInstanceMapper<T> {

KubernetesServiceInstance map(T service);

static String createHost(String serviceName, String namespace, String clusterDomain) {
	return String.format(&#34;%s.%s.svc.%s&#34;, serviceName, StringUtils.hasText(namespace) ? namespace : &#34;default&#34;,
			clusterDomain);
}

static boolean isSecure(Map&lt;String, String&gt; labels, Map&lt;String, String&gt; annotations, String servicePortName,
		Integer servicePort) {
	if (labels != null) {
		final String securedLabelValue = labels.getOrDefault(&#34;secured&#34;, &#34;false&#34;);
		if (securedLabelValue.equals(&#34;true&#34;)) {
			return true;
		}
	}

	if (annotations != null) {
		final String securedAnnotationValue = annotations.getOrDefault(&#34;secured&#34;, &#34;false&#34;);
		if (securedAnnotationValue.equals(&#34;true&#34;)) {
			return true;
		}
	}
	return (servicePortName != null &amp;&amp; servicePortName.endsWith(&#34;https&#34;)) || servicePort.toString().endsWith(&#34;443&#34;);
}

static Map&lt;String, String&gt; getMapWithPrefixedKeys(Map&lt;String, String&gt; map, String prefix) {
	if (map == null) {
		return new HashMap&lt;&gt;();
	}
	if (!StringUtils.hasText(prefix)) {
		return map;
	}
	final Map&lt;String, String&gt; result = new HashMap&lt;&gt;();
	map.forEach((k, v) -&gt; result.put(prefix + k, v));
	return result;
}

}

KubernetesServiceInstanceMapper接口定义了map方法,它提供了createHost、isSecure、getMapWithPrefixedKeys静态方法

KubernetesClientServiceInstanceMapper

spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientServiceInstanceMapper.java

代码语言:javascript
复制
public class KubernetesClientServiceInstanceMapper implements KubernetesServiceInstanceMapper<V1Service> {

private KubernetesLoadBalancerProperties properties;

private KubernetesDiscoveryProperties discoveryProperties;

public KubernetesClientServiceInstanceMapper(KubernetesLoadBalancerProperties properties,
		KubernetesDiscoveryProperties discoveryProperties) {
	this.properties = properties;
	this.discoveryProperties = discoveryProperties;
}

@Override
public KubernetesServiceInstance map(V1Service service) {
	final V1ObjectMeta meta = service.getMetadata();

	final List&lt;V1ServicePort&gt; ports = service.getSpec().getPorts();
	V1ServicePort port = null;
	if (ports.size() == 1) {
		port = ports.get(0);
	}
	else if (ports.size() &gt; 1 &amp;&amp; StringUtils.hasText(this.properties.getPortName())) {
		Optional&lt;V1ServicePort&gt; optPort = ports.stream()
				.filter(it -&gt; properties.getPortName().endsWith(it.getName())).findAny();
		if (optPort.isPresent()) {
			port = optPort.get();
		}
	}
	if (port == null) {
		return null;
	}
	final String host = KubernetesServiceInstanceMapper.createHost(service.getMetadata().getName(),
			service.getMetadata().getNamespace(), properties.getClusterDomain());
	final boolean secure = KubernetesServiceInstanceMapper.isSecure(service.getMetadata().getLabels(),
			service.getMetadata().getAnnotations(), port.getName(), port.getPort());
	return new DefaultKubernetesServiceInstance(meta.getUid(), meta.getName(), host, port.getPort(),
			getServiceMetadata(service), secure);
}

private Map&lt;String, String&gt; getServiceMetadata(V1Service service) {
	final Map&lt;String, String&gt; serviceMetadata = new HashMap&lt;&gt;();
	KubernetesDiscoveryProperties.Metadata metadataProps = this.discoveryProperties.metadata();
	if (metadataProps.addLabels()) {
		Map&lt;String, String&gt; labelMetadata = KubernetesServiceInstanceMapper
				.getMapWithPrefixedKeys(service.getMetadata().getLabels(), metadataProps.labelsPrefix());
		serviceMetadata.putAll(labelMetadata);
	}
	if (metadataProps.addAnnotations()) {
		Map&lt;String, String&gt; annotationMetadata = KubernetesServiceInstanceMapper
				.getMapWithPrefixedKeys(service.getMetadata().getAnnotations(), metadataProps.annotationsPrefix());
		serviceMetadata.putAll(annotationMetadata);
	}

	return serviceMetadata;
}

}

KubernetesClientServiceInstanceMapper实现了KubernetesServiceInstanceMapper接口,其泛型为V1Service,其map方法先通过service.getSpec().getPorts()获取port信息,之后通过createHost返回svc的短域名<servicename>.<namespace>.svc.<clusterdomain>,例如service-a.default.svc.cluster.local,最后创建DefaultKubernetesServiceInstance

KubernetesClientLoadBalancerClientConfiguration

spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientLoadBalancerClientConfiguration.java

代码语言:javascript
复制
public class KubernetesClientLoadBalancerClientConfiguration {

@Bean
@ConditionalOnProperty(name = &#34;spring.cloud.kubernetes.loadbalancer.mode&#34;, havingValue = &#34;SERVICE&#34;)
ServiceInstanceListSupplier kubernetesServicesListSupplier(Environment environment, CoreV1Api coreV1Api,
		KubernetesClientServiceInstanceMapper mapper, KubernetesDiscoveryProperties discoveryProperties,
		KubernetesNamespaceProvider kubernetesNamespaceProvider, ConfigurableApplicationContext context) {
	return ServiceInstanceListSupplier.builder().withBase(new KubernetesClientServicesListSupplier(environment,
			mapper, discoveryProperties, coreV1Api, kubernetesNamespaceProvider)).withCaching().build(context);
}

}

KubernetesClientLoadBalancerClientConfiguration在spring.cloud.kubernetes.loadbalancer.mode设置为SERVICE的时候会自动创建kubernetesServicesListSupplier

小结

spring-cloud-loadbalancer定义了ServiceInstanceListSupplier,它继承自Supplier,其泛型为Flux<List<ServiceInstance>>,它定义了getServiceId、get(Request)方法,并提供了builder静态方法;KubernetesServicesListSupplier声明实现ServiceInstanceListSupplier接口,它是一个抽象类,定义了get方法;KubernetesClientServicesListSupplier继承了KubernetesServicesListSupplier,其构造器依赖KubernetesServiceInstanceMapper、KubernetesDiscoveryProperties、CoreV1Api、KubernetesNamespaceProvider;其get方法使用coreV1Api.listServiceForAllNamespaces或者coreV1Api.listNamespacedService来返回指定serviceId的V1Service信息,之后通过mapper转换为ServiceInstance。

整体来看spring-cloud-kubernetes-client-loadbalancer目前仅支持spring.cloud.kubernetes.loadbalancer.mode为SERVICE的模式,其实内部还是走的k8s的service的域名解析及负载均衡,无法细粒度到个性化的负载均衡。