Prometheus服务发现机制之Kubernetes
概述
分析过云原生监控接入方案,下面开始看下云原生服务发现机制。Prometheus本身就是作为云原生监控出现的,所以对云原生服务发现支持具有天然优势。Kubernetes
服务发现协议允许使用Kubernetes Rest API
检索出Prometheus
需要监控的targets
,并且跟着集群状态进行同步变更。
kubernetes_sd_configs
表示基于Kubernetes
进行服务发现,服务发现目标类型使用role
表示,比如:role=service
,表示针对Kubernetes
中的service
资源对象,进行具体的服务发现操作。kubernetes_sd_configs
支持的role
包括:node、service、pod、endpoints、ingress
。
原理
基于Kubernetes
进行服务发现,主要针对Kubernetes
中的service、pod、node
等资源对象进行服务发现,Prometheus
使用client-go
对role
中指定的资源对象进行监听。一般Prometheus
部署在Kubernetes
集群中的话,Prometheus
可以直接利用指定的Service Account
对Kubernetes API
进行访问。若Prometheus
在Kubernetes
集群之外,则kubernetes_sd_configs
还需指定监控集群的API Server
的URL
以及相关的认证信息,从而能够创建对应集群的Client
。
“client-go是kubernetes官方提供的go语言的客户端库,go应用使用该库可以访问kubernetes的API Server,这样我们就能通过编程来对kubernetes资源进行增删改查操作。
配置示例:
- job_name: kubernetes-pod
metrics_path: /metrics
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- 'test01'
api_server: https://apiserver.simon:6443
bearer_token_file: d:/token.k8s
tls_config:
insecure_skip_verify: true
bearer_token_file: d:/token.k8s
tls_config:
insecure_skip_verify: true
协议分析
Kubernetes
服务发现大致原理如下图:
1、通过clientset
访问API Server
,根据role
配置获取不同的集群资源对象;
2、通过List & Watch
机制,注册监听事件:
p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
podAddCount.Inc()
p.enqueue(o)
},
DeleteFunc: func(o interface{}) {
podDeleteCount.Inc()
p.enqueue(o)
},
UpdateFunc: func(_, o interface{}) {
podUpdateCount.Inc()
p.enqueue(o)
},
})
通过informer.AddEventHandler
函数可以为集群资源添加资源事件回调方法,支持3种资源事件回调方法:AddFunc、DeleteFunc、UpdateFunc
,分别对应新增资源、修改资源和删除资源时事件触发。
3、资源变更注册回调方法中,将目标资源对象转成key
放入到队列queue
中,如下pod
资源:
func (p *Pod) enqueue(obj interface{}) {
//obj是pod资源对象,通过DeletionHandlingMetaNamespaceKeyFunc将其转换成key
//比如key=test01/nginx-deployment-5ffc5bf56c-n2pl8,即namespace/pod_name格式
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
p.queue.Add(key)
}
4、后台goroutines
无限循环执行process
逻辑,process
逻辑中就是不停从queue
中提取数据进行处理,比如pod.go
对应逻辑如下:
func (p *Pod) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { keyObj, quit := p.queue.Get() if quit { return false } defer p.queue.Done(keyObj) key := keyObj.(string)
//与 MetaNamespaceKeyFunc() 功能相反的是 SplitMetaNamespaceKey() 函数,它将传入的 Key 分解,返回对象所在的命名空间和对象名称。
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return true
}
//根据key获取资源对象obj
o, exists, err := p.store.GetByKey(key)
if err != nil {
return true
}
if !exists {
//pod被删除时,exists=false
// 然后发送targets为空的tg,即移除
send(ctx, ch, &targetgroup.Group{Source: podSourceFromNamespaceAndName(namespace, name)})
return true
}
pod, err := convertToPod(o)
if err != nil {
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return true
}
//p.buildPod(pod):将资源对象信息转成target groups
send(ctx, ch, p.buildPod(pod))
return true
}
大致逻辑:
a、根据从queue
中提取的key
,使用p.store.GetByKey(key)
获取对应的资源对象,比如pod、service
等对象;
b、如果对象不存在,则表示资源对象被删除,则创建一个targets
集合为空的target groups
,这样Scrape Manager
就会移除targets
;
c、使用buildXXX(obj)
将资源对象解析成target groups
,如buildNode()、buildPod()
等;
d、最后使用send()
方法将解析的target groups
通过通道channel
传递出去,最终传递给Scrape Manager
,这样target groups
中targets
将被Prometheus
抓取监控数据。
pod
资源的target groups
结构如下示例,每个pod对象都会被解析成target groups
,其中包含targets
集合、labels
标签集合:
Discovery创建
1、假如我们定义如下抓取作业:
- job_name: kubernetes-nodes-cadvisor
metrics_path: /metrics
scheme: https
kubernetes_sd_configs:
- role: node
api_server: https://apiserver.simon:6443
bearer_token_file: d:/token.k8s
tls_config:
insecure_skip_verify: true
bearer_token_file: d:/token.k8s
tls_config:
insecure_skip_verify: true
relabel_configs:
# 将标签(.*)作为新标签名,原有值不变
- action: labelmap
regex: __meta_kubernetes_node_label_(.*)
# 修改NodeIP:10250为APIServerIP:6443
- action: replace
regex: (.*)
source_labels: ["__address__"]
target_label: __address__
replacement: 192.168.52.151:6443
- action: replace
source_labels: [__meta_kubernetes_node_name]
target_label: __metrics_path__
regex: (.*)
replacement: /api/v1/nodes/${1}/proxy/metrics/cadvisor
会被解析成kubernetes.SDConfig
如下:
kubernetes.SDConfig
定义如下:
type SDConfig struct {
APIServer config.URL yaml:"api_server,omitempty"
Role Role yaml:"role"
HTTPClientConfig config.HTTPClientConfig yaml:",inline"
NamespaceDiscovery NamespaceDiscovery yaml:"namespaces,omitempty"
Selectors []SelectorConfig yaml:"selectors,omitempty"
}
2、Discovery
创建
//创建Clientset,可看成操作Kubernetes API的客户端
c, err := kubernetes.NewForConfig(kcfg)
if err != nil {
return nil, err
}
return &Discovery{
client: c,
logger: l,
role: conf.Role,
namespaceDiscovery: &conf.NamespaceDiscovery,
discoverers: make([]discovery.Discoverer, 0),
selectors: mapSelector(conf.Selectors),
}, nil
3、Discovery
创建完成,最后会调用Discovery.Run()
启动服务发现:
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
d.Lock()
namespaces := d.getNamespaces()switch d.role {
case RoleEndpointSlice:
role=endpointslice逻辑
case RoleEndpoint:
role=endpoints逻辑
case RolePod:
role=pod逻辑
case RoleService:
role=service逻辑
case RoleIngress:
role=ingress逻辑
case RoleNode:
role=node逻辑
default:
level.Error(d.logger).Log("msg", "unknown Kubernetes discovery kind", "role", d.role)
}var wg sync.WaitGroup
for _, dd := range d.discoverers {
wg.Add(1)
go func(d discovery.Discoverer) {
defer wg.Done()
d.Run(ctx, ch)
}(dd)
}d.Unlock()
wg.Wait()
<-ctx.Done()
}
4、注册集群资源对象监听事件回调逻辑:
for _, namespace := range namespaces {
p := d.client.CoreV1().Pods(namespace)
plw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = d.selectors.pod.field
options.LabelSelector = d.selectors.pod.label
return p.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = d.selectors.pod.field
options.LabelSelector = d.selectors.pod.label
return p.Watch(ctx, options)
},
}
pod := NewPod(
log.With(d.logger, "role", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
d.discoverers = append(d.discoverers, pod)
go pod.informer.Run(ctx.Done())
}