滑窗迭代算法图解(一文读懂SuperEdge拓扑算法)

作者杜杨浩,腾讯云高级工程师,热衷于开源、容器和kubernetes。目前主要从事镜像仓库、Kubernetes集群高可用&备份还原,以及边缘计算相关研发工作。

前言SuperEdge 介绍

SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好地实现了云端对边缘端的管理和控制。同时 superedge 自研了 service group 实现了基于边缘计算的服务访问控制,极大简化了应用从云端部署到边缘端的过程。

SuperEdge service group拓扑感知特性

SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个任务 nodeunit 内服务的闭环访问。

在深入分析 application-grid-wrapper 之前,这里先简单介绍一下社区 Kubernetes 原生支持的拓扑感知特性[1]

Kubernetes service topology awareness 特性于v1.17发布 alpha 版本,用于实现路由拓扑以及就近访问的特性。用户需要在service 中添加 topologyKeys 字段标示拓扑key类型,只有具有相同拓扑域的 endpoint 会被访问到,目前有三种情况 topologyKeys 可供选择:

  • "kubernetes.io/hostname":访问本节点内(kubernetes.io/hostname label value相同)的endpoint,如果没有则service访问失败
  • "topology.kubernetes.io/zone":访问相同zone域内(topology.kubernetes.io/zone label value相同)的endpoint,如果没有则service访问失败
  • "topology.kubernetes.io/region":访问相同region域内(topology.kubernetes.io/region label value 相同)的 endpoint,如果没有则 service 访问失败

除了单独填写如上某一个拓扑key之外,还可以将这些key构造成列表进行填写,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],这表示:优先访问本节点内地 endpoint;如果不存在,则访问同一个 zone 内的 endpoint;如果再不存在,则会访问同一个 region 内地 endpoint,如果都不存在则访问失败。

另外,还可以在列表最后(只能最后一项)添加"*"表示:如果前面拓扑域都失败,则访问任何有效的 endpoint,也即没有限制拓扑了,示例如下:

# A Service that prefers node local, zonal, then regional endpoints but falls back to cluster wide endpoints. apiVersion: v1 kind: Service metadata: name: my-service spec: selector: app: my-app ports: - protocol: TCP port: 80 targetPort: 9376 topologyKeys: - "kubernetes.io/hostname" - "topology.kubernetes.io/zone" - "topology.kubernetes.io/region" - "*"

而service group 实现的拓扑感知和社区对比,有如下区别:

  • service group 拓扑 key 可以自定义,也可以为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"。
  • service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint 的访问。

service group 实现的拓扑感知,service 配置如下:

# A Service that only prefers node zone1al endpoints. apiVersion: v1 kind: Service metadata: annotations: topologyKeys: '["zone1"]' labels: superedge.io/grid-selector: servicegrid-demo name: servicegrid-demo-svc spec: ports: - port: 80 protocol: TCP targetPort: 8080 selector: appGrid: echo

在介绍完 service group 实现的拓扑感知后,我们深入到源码分析实现细节。同样的,这里以一个使用示例开始分析:

# step1: labels edge nodes $ kubectl get nodes NAME STATUS ROLES AGE VERSIO Nnode0 Ready <none> 16d v1.16.7 node1 Ready <none> 16d v1.16.7 node2 Ready <none> 16d v1.16.7 # nodeunit1(nodegroup and servicegroup zone1) $ kubectl --kubeconfig config label nodes node0 zone1=nodeunit1 # nodeunit2(nodegroup and servicegroup zone1) $ kubectl --kubeconfig config label nodes node1 zone1=nodeunit2 $ kubectl --kubeconfig config label nodes node2 zone1=nodeunit2 ... # step3: deploy echo ServiceGrid $ cat <<EOF | kubectl --kubeconfig config apply -f - apiVersion: superedge.io/v1 kind: ServiceGrid metadata: name: servicegrid-demo namespace: default spec: gridUniqKey: zone1 template: selector: appGrid: echo ports: - protocol: TCP port: 80 targetPort: 8080 EOF servicegrid.superedge.io/servicegrid-demo created # note that there is only one relevant service generated $ kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kubernetes ClusterIP 192.168.0.1 <none> 443/TCP 16d servicegrid-demo-svc ClusterIP 192.168.6.139 <none> 80/TCP 10m # step4: access servicegrid-demo-svc(service topology and closed-looped) # execute on node0 $ curl 192.168.6.139|grep "node name" node name: node0 # execute on node1 and node2 $ curl 192.168.6.139|grep "node name" node name: node2 $ curl 192.168.6.139|grep "node name" node name: node1

在创建完 ServiceGrid CR 后,ServiceGrid Controller 负责根据 ServiceGrid产生对应的 service (包含由 serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations);而 application-grid-wrapper 根据 service 实现拓扑感知,下面依次分析。

ServiceGrid Controller 分析

ServiceGrid Controller 逻辑和 DeploymentGrid Controller 整体一致,如下:

  • 1、创建并维护 service group 需要的若干 CRDs(包括:ServiceGrid)
  • 2、监听 ServiceGrid event,并填充 ServiceGrid 到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的系统 service
  • 3、监听 service event,并将相关的 ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑

注意这里区别于 DeploymentGrid Controller:

  • 一个 ServiceGrid 对象只产生一个 service
  • 只需额外监听 service event,无需监听 node 事件。因为 node 的CRUD与 ServiceGrid 无关
  • ServiceGrid 对应产生的 service,命名为:{ServiceGrid}-svc

func (sgc *ServiceGridController) syncServiceGrid(key string) error { startTime := time.Now() klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime) defer func() { klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name) if errors.IsNotFound(err) { klog.V(2).Infof("service grid %v has been deleted", key) return nil } if err != nil { return err } if sg.Spec.GridUniqKey == "" { sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty", "This service grid has an empty grid key") return nil } // get service workload list of this grid svcList, err := sgc.getServiceForGrid(sg) if err != nil { return err } if sg.DeletionTimestamp != nil { return nil } // sync service grid relevant services workload return sgc.reconcile(sg, svcList) } func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) { svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything()) if err != nil { return nil, err } labelSelector, err := common.GetDefaultSelector(sg.Name) if err != nil { return nil, err } canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { fresh, err := sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{}) if err != nil { return nil, err } if fresh.UID != sg.UID { return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace, sg.Name, fresh.UID, sg.UID) } return fresh, nil }) cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc) return cm.ClaimService(svcList) } func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList []*corev1.Service) error { var ( adds []*corev1.Service updates []*corev1.Service deletes []*corev1.Service ) sgTargetSvcName := util.GetServiceName(g) isExistingSvc := false for _, svc := range svcList { if svc.Name == sgTargetSvcName { isExistingSvc = true template := util.KeepConsistence(g, svc) if !apiequality.Semantic.DeepEqual(template, svc) { updates = append(updates, template) } } else { deletes = append(deletes, svc) } } if !isExistingSvc { adds = append(adds, util.CreateService(g)) } return sgc.syncService(adds, updates, deletes) } func CreateService(sg *crdv1.ServiceGrid) *corev1.Service { svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: GetServiceName(sg), Namespace: sg.Namespace, // Append existed ServiceGrid labels to service to be created Labels: func() map[string]string { if sg.Labels != nil { newLabels := sg.Labels newLabels[common.GridSelectorName] = sg.Name newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey return newLabels } else { return map[string]string{ common.GridSelectorName: sg.Name, common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey, } } }(), Annotations: make(map[string]string), }, Spec: sg.Spec.Template, } keys := make([]string, 1) keys[0] = sg.Spec.GridUniqKey keyData, _ := json.Marshal(keys) svc.Annotations[common.TopologyAnnotationsKey] = string(keyData) return svc }

由于逻辑与 DeploymentGrid 类似,这里不展开细节,重点关注 application-grid-wrapper 部分。

application-grid-wrapper 分析

在 ServiceGrid Controller 创建完 service 之后,application-grid-wrapper 的作用就开始启动了:

apiVersion: v1 kind: Service metadata: annotations: topologyKeys: '["zone1"]' creationTimestamp: "2021-03-03T07:33:30Z" labels: superedge.io/grid-selector: servicegrid-demo name: servicegrid-demo-svc namespace: default ownerReferences: - apiVersion: superedge.io/v1 blockOwnerDeletion: true controller: true kind: ServiceGrid name: servicegrid-demo uid: 78c74d3c-72ac-4e68-8c79-f1396af5a581 resourceVersion: "127987090" selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc uid: 8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfc spec: clusterIP: 192.168.161.1 ports: - port: 80 protocol: TCP targetPort: 8080 selector: appGrid: echo sessionAffinity: None type: ClusterIP status: loadBalancer: {}

为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,架构如下:

滑窗迭代算法图解(一文读懂SuperEdge拓扑算法)(1)

调用链路如下:

kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver

因此 application-grid-wrapper 会起服务,接受来自 kube-proxy 的请求,如下:

func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error { ... klog.Infof("Start to run interceptor server") /* filter */ server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)} if insecure { return server.ListenAndServe() } ... server.TLSConfig = tlsConfig return server.ListenAndServeTLS("", "") } func (s *interceptorServer) buildFilterChains(debug bool) http.Handler { handler := http.Handler(http.NewServeMux()) handler = s.interceptEndpointsRequest(handler) handler = s.interceptServiceRequest(handler) handler = s.interceptEventRequest(handler) handler = s.interceptNodeRequest(handler) handler = s.logger(handler) if debug { handler = s.debugger(handler) } return handler }

这里会首先创建 interceptorServer,然后注册处理函数,由外到内依次如下:

  • debug:接受 debug 请求,返回 wrapper pprof 运行信息
  • logger:打印请求日志
  • node:接受 kube-proxy node GET(/api/v1/nodes/{node})请求,并返回 node信息
  • event:接受 kube-proxy events POST(/events)请求,并将请求转发给 lite-apiserver

func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") { handler.ServeHTTP(w, r) return } targetURL, _ := url.Parse(s.restConfig.Host) reverseProxy := httputil.NewSingleHostReverseProxy(targetURL) reverseProxy.Transport, _ = rest.TransportFor(s.restConfig) reverseProxy.ServeHTTP(w, r) }) }

  • service:接受 kube-proxy service List&Watch(/api/v1/services)请求,并根据 storageCache 内容返回(GetServices)
  • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints)请求,并根据 storageCache 内容返回(GetEndpoints)

下面先重点分析 cache 部分的逻辑,然后再回过头来分析具体的 http handler List&Watch 处理逻辑。

wrapper 为了实现拓扑感知,自己维护了一个 cache,包括:node,service,endpoint。可以看到在 setupInformers 中注册了这三类资源的处理函数:

type storageCache struct { // hostName is the nodeName of node which application-grid-wrapper deploys on hostName string wrapperInCluster bool // mu lock protect the following map structure mu sync.RWMutex servicesMap map[types.NamespacedName]*serviceContainer endpointsMap map[types.NamespacedName]*endpointsContainer nodesMap map[types.NamespacedName]*nodeContainer // service watch channel serviceChan chan<- watch.Event // endpoints watch channel endpointsChan chan<- watch.Event } ... func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache { msc := &storageCache{ hostName: hostName, wrapperInCluster: wrapperInCluster, servicesMap: make(map[types.NamespacedName]*serviceContainer), endpointsMap: make(map[types.NamespacedName]*endpointsContainer), nodesMap: make(map[types.NamespacedName]*nodeContainer), serviceChan: serviceNotifier, endpointsChan: endpointsNotifier, } return msc } ... func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error { ... if err := s.setupInformers(ctx.Done()); err != nil { return err } klog.Infof("Start to run interceptor server") /* filter */ server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)} ... return server.ListenAndServeTLS("", "") } func (s *interceptorServer) setupInformers(stop <-chan struct{}) error { klog.Infof("Start to run service and endpoints informers") noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) if err != nil { klog.Errorf("can't parse proxy label, %v", err) return err } noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil) if err != nil { klog.Errorf("can't parse headless label, %v", err) return err } labelSelector := labels.NewSelector() labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) resyncPeriod := time.Minute * 5 client := kubernetes.NewForConfigOrDie(s.restConfig) nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod) informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector.String() })) nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer() serviceInformer := informerFactory.Core().V1().Services().Informer() endpointsInformer := informerFactory.Core().V1().Endpoints().Informer() /* */ nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod) serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod) endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod) go nodeInformer.Run(stop) go serviceInformer.Run(stop) go endpointsInformer.Run(stop) if !cache.WaitForNamedCacheSync("node", stop, nodeInformer.HasSynced, serviceInformer.HasSynced, endpointsInformer.HasSynced) { return fmt.Errorf("can't sync informers") } return nil } func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler { return &nodeHandler{cache: sc} } func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler { return &serviceHandler{cache: sc} } func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler { return &endpointsHandler{cache: sc} }

这里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:

1、NodeEventHandler

NodeEventHandler 负责监听 node 资源相关 event,并将 node 以及 node Labels 添加到 storageCache.nodesMap 中(key为nodeName,value为node以及node labels)。

func (nh *nodeHandler) add(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Adding node %v", nodeKey) sc.nodesMap[nodeKey] = &nodeContainer{ node: node, labels: node.Labels, } // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } func (nh *nodeHandler) update(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Updating node %v", nodeKey) nodeContainer, found := sc.nodesMap[nodeKey] if !found { sc.mu.Unlock() klog.Errorf("Updating non-existed node %v", nodeKey) return } nodeContainer.node = node // return directly when labels of node stay unchanged if reflect.DeepEqual(node.Labels, nodeContainer.labels) { sc.mu.Unlock() return } nodeContainer.labels = node.Labels // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } ...

同时由于 node 的改变会影响 endpoint,因此会调用 rebuildEndpointsMap 刷新 storageCache.endpointsMap。

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events func (sc *storageCache) rebuildEndpointsMap() []watch.Event { evts := make([]watch.Event, 0) for name, endpointsContainer := range sc.endpointsMap { newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster) if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) { continue } sc.endpointsMap[name].modified = newEps evts = append(evts, watch.Event{ Type: watch.Modified, Object: newEps, }) } return evts }

rebuildEndpointsMap 是 cache 的核心函数,同时也是拓扑感知的算法实现:

// pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labels func pruneEndpoints(hostName string, nodes map[types.NamespacedName]*nodeContainer, services map[types.NamespacedName]*serviceContainer, eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints { epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name} if wrapperInCluster { eps = genLocalEndpoints(eps) } // dangling endpoints svc, ok := services[epsKey] if !ok { klog.V(4).Infof("Dangling endpoints %s, % #v", eps.Name, eps.Subsets) return eps } // normal service if len(svc.keys) == 0 { klog.V(4).Infof("Normal endpoints %s, % #v", eps.Name, eps.Subsets) return eps } // topology endpoints newEps := eps.DeepCopy() for si := range newEps.Subsets { subnet := &newEps.Subsets[si] subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses) subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses) } klog.V(4).Infof("Topology endpoints %s: subnets from % #v to % #v", eps.Name, eps.Subsets, newEps.Subsets) return newEps } // filterConcernedAddresses aims to filter out endpoints addresses within the same node unit func filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer, addresses []v1.EndpointAddress) []v1.EndpointAddress { hostNode, found := nodes[types.NamespacedName{Name: hostName}] if !found { return nil } filteredEndpointAddresses := make([]v1.EndpointAddress, 0) for i := range addresses { addr := addresses[i] if nodeName := addr.NodeName; nodeName != nil { epsNode, found := nodes[types.NamespacedName{Name: *nodeName}] if !found { continue } if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) { filteredEndpointAddresses = append(filteredEndpointAddresses, addr) } } } return filteredEndpointAddresses } func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { if n1 == nil || n2 == nil { return false } for _, key := range keys { val1, v1found := n1[key] val2, v2found := n2[key] if v1found && v2found && val1 == val2 { return true } } return false }

算法逻辑如下:

  • 判断 endpoint 是否为 default kubernetes service,如果是,则将该 endpoint 转化为 wrapper 所在边缘节点的 lite-apiserver 地址(127.0.0.1)和端口(51003)。

apiVersion: v1 kind: Endpoints metadata: annotations: superedge.io/local-endpoint: 127.0.0.1 superedge.io/local-port: "51003" name: kubernetes namespace: default subsets: - addresses: - ip: 172.31.0.60 ports: - name: https port: xxx protocol: TCP

func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints { if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName { return eps } klog.V(4).Infof("begin to gen local ep %v", eps) ipAddress, e := eps.Annotations[EdgeLocalEndpoint] if !e { return eps } portStr, e := eps.Annotations[EdgeLocalPort] if !e { return eps } klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr) port, err := strconv.ParseInt(portStr, 10, 32) if err != nil { klog.Errorf("parse int %s err %v", portStr, err) return eps } ip := net.ParseIP(ipAddress) if ip == nil { klog.Warningf("parse ip %s nil", ipAddress) return eps } nep := eps.DeepCopy() nep.Subsets = []v1.EndpointSubset{ { Addresses: []v1.EndpointAddress{ { IP: ipAddress, }, }, Ports: []v1.EndpointPort{ { Protocol: v1.ProtocolTCP, Port: int32(port), Name: "https", }, }, }, } klog.V(4).Infof("gen new endpoint complete %v", nep) return nep }

这样做的目的是使边缘节点上的服务采用集群内 (InCluster) 方式访问的 apiserver 为本地的 lite-apiserver,而不是云端的 apiserver。

  • 从 storageCache.servicesMap cache 中根据 endpoint 名称(namespace/name) 取出对应 service,如果该 service 没有 topologyKeys 则无需做拓扑转化(非 service group)。

func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string { if !hasTopologyKey(objectMeta) { return nil } var keys []string keyData := objectMeta.Annotations[TopologyAnnotationsKey] if err := json.Unmarshal([]byte(keyData), &keys); err != nil { klog.Errorf("can't parse topology keys %s, %v", keyData, err) return nil } return keys }

  • 调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。

// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit func filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer, addresses []v1.EndpointAddress) []v1.EndpointAddress { hostNode, found := nodes[types.NamespacedName{Name: hostName}] if !found { return nil } filteredEndpointAddresses := make([]v1.EndpointAddress, 0) for i := range addresses { addr := addresses[i] if nodeName := addr.NodeName; nodeName != nil { epsNode, found := nodes[types.NamespacedName{Name: *nodeName}] if !found { continue } if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) { filteredEndpointAddresses = append(filteredEndpointAddresses, addr) } } } return filteredEndpointAddresses } func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { if n1 == nil || n2 == nil { return false } for _, key := range keys { val1, v1found := n1[key] val2, v2found := n2[key] if v1found && v2found && val1 == val2 { return true } } return false }

注意:如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service。

回到 rebuildEndpointsMap,在调用 pruneEndpoints 刷新了同一个拓扑域内的 endpoint 后,会将修改后的 endpoints 赋值给 storageCache .endpointsMap [endpoint]. modified (该字段记录了拓扑感知后修改的endpoints)。

func (nh *nodeHandler) add(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Adding node %v", nodeKey) sc.nodesMap[nodeKey] = &nodeContainer{ node: node, labels: node.Labels, } // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } // rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events func (sc *storageCache) rebuildEndpointsMap() []watch.Event { evts := make([]watch.Event, 0) for name, endpointsContainer := range sc.endpointsMap { newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster) if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) { continue } sc.endpointsMap[name].modified = newEps evts = append(evts, watch.Event{ Type: watch.Modified, Object: newEps, }) } return evts }

另外,如果 endpoints (拓扑感知后修改的 endpoints)发生改变,会构建 watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。

2、ServiceEventHandler

storageCache.servicesMap 结构体 key 为 service 名称(namespace/name),value 为 serviceContainer,包含如下数据:

  • svc:service对象
  • keys:service topologyKeys

对于 service 资源的改动,这里用 Update event 说明:

func (sh *serviceHandler) update(service *v1.Service) { sc := sh.cache sc.mu.Lock() serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} klog.Infof("Updating service %v", serviceKey) newTopologyKeys := getTopologyKeys(&service.ObjectMeta) serviceContainer, found := sc.servicesMap[serviceKey] if !found { sc.mu.Unlock() klog.Errorf("update non-existed service, %v", serviceKey) return } sc.serviceChan <- watch.Event{ Type: watch.Modified, Object: service, } serviceContainer.svc = service // return directly when topologyKeys of service stay unchanged if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) { sc.mu.Unlock() return } serviceContainer.keys = newTopologyKeys // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } }

逻辑如下:

  • 获取 service topologyKeys
  • 构建 service event.Modified event
  • 比较 service topologyKeys 与已经存在的是否有差异
  • 如果有差异则更新 topologyKeys,且调用 rebuildEndpointsMap 刷新该 service 对应的endpoints,如果 endpoints 发生变化,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。

3、EndpointsEventHandler

storageCache.endpointsMap 结构体 key 为 endpoints 名称(namespace/name),value 为 endpointsContainer,包含如下数据:

  • endpoints:拓扑修改前的 endpoints
  • modified:拓扑修改后的 endpoints

对于 endpoints 资源的改动,这里用 Update event 说明:

func (eh *endpointsHandler) update(endpoints *v1.Endpoints) { sc := eh.cache sc.mu.Lock() endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} klog.Infof("Updating endpoints %v", endpointsKey) endpointsContainer, found := sc.endpointsMap[endpointsKey] if !found { sc.mu.Unlock() klog.Errorf("Updating non-existed endpoints %v", endpointsKey) return } endpointsContainer.endpoints = endpoints newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster) changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps) if changed { endpointsContainer.modified = newEps } sc.mu.Unlock() if changed { sc.endpointsChan <- watch.Event{ Type: watch.Modified, Object: newEps, } } }

逻辑如下:

  • 更新 endpointsContainer.endpoint 为新的 endpoints 对象
  • 调用 pruneEndpoints 获取拓扑刷新后的 endpoints
  • 比较 endpointsContainer.modified 与新刷新后的 endpoints
  • 如果有差异则更新 endpointsContainer.modified,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。

在分析完 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler 之后,我们回到具体的 http handler List&Watch 处理逻辑上,这里以 endpoints 为例:

func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") { handler.ServeHTTP(w, r) return } queries := r.URL.Query() acceptType := r.Header.Get("Accept") info, found := s.parseAccept(acceptType, s.mediaSerializer) if !found { klog.Errorf("can't find %s serializer", acceptType) w.WriteHeader(http.StatusBadRequest) return } encoder := scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion) // list request if queries.Get("watch") == "" { w.Header().Set("Content-Type", info.MediaType) allEndpoints := s.cache.GetEndpoints() epsItems := make([]v1.Endpoints, 0, len(allEndpoints)) for _, eps := range allEndpoints { epsItems = append(epsItems, *eps) } epsList := &v1.EndpointsList{ Items: epsItems, } err := encoder.Encode(epsList, w) if err != nil { klog.Errorf("can't marshal endpoints list, %v", err) w.WriteHeader(http.StatusInternalServerError) return } return } // watch request timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds") timeout := time.Minute if timeoutSecondsStr != "" { timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr)) } timer := time.NewTimer(timeout) defer timer.Stop() flusher, ok := w.(http.Flusher) if !ok { klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w) w.WriteHeader(http.StatusMethodNotAllowed) return } e := restclientwatch.NewEncoder( streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w), scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)), encoder) if info.MediaType == runtime.ContentTypeProtobuf { w.Header().Set("Content-Type", runtime.ContentTypeProtobuf ";stream=watch") } else { w.Header().Set("Content-Type", runtime.ContentTypeJSON) } w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush() for { select { case <-r.Context().Done(): return case <-timer.C: return case evt := <-s.endpointsWatchCh: klog.V(4).Infof("Send endpoint watch event: % #v", evt) err := e.Encode(&evt) if err != nil { klog.Errorf("can't encode watch event, %v", err) return } if len(s.endpointsWatchCh) == 0 { flusher.Flush() } } } }) }

逻辑如下:

  • 如果为 List请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回

func (sc *storageCache) GetEndpoints() []*v1.Endpoints { sc.mu.RLock() defer sc.mu.RUnlock() epList := make([]*v1.Endpoints, 0, len(sc.endpointsMap)) for _, v := range sc.endpointsMap { epList = append(epList, v.modified) } return epList }

  • 如果为 Watch 请求,则不断从 storageCache.endpointsWatchCh 管道中接受 watch event,并返回 interceptServiceRequest 逻辑与 interceptEndpointsRequest 一致,这里不再赘述 。
总结
  • SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问
  • service group 实现的拓扑感知和 Kubernetes 社区原生实现对比,有如下区别:
    • service group 拓扑 key 可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"
    • service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint的访问
  • ServiceGrid Controller 负责根据 ServiceGrid 产生对应的 service(包含由serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations),逻辑和 DeploymentGrid Controller 整体一致,如下:
    • 创建并维护 service group 需要的若干CRDs(包括:ServiceGrid)
    • 监听 ServiceGrid event,并填充 ServiceGrid到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的 service
    • 监听 service event,并将相关的 ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑
  • 为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,调用链路如下:kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
  • application-grid-wrapper 是一个 http server,接受来自 kube-proxy 的请求,同时维护一个资源缓存,处理函数由外到内依次如下:
    • debug:接受 debug 请求,返回 wrapper pprof 运行信息
    • logger:打印请求日志
    • node:接受 kube-proxy node GET (/api/v1/nodes/{node}) 请求,并返回 node 信息
    • event:接受 kube-proxy events POST (/events) 请求,并将请求转发给 lite-apiserver
    • service:接受 kube-proxy service List&Watch (/api/v1/services) 请求,并根据 storageCache 内容返回 (GetServices)
    • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints) 请求,并根据 storageCache 内容返回(GetEndpoints)
  • wrapper 为了实现拓扑感知,维护了一个资源 cache,包括:node,service,endpoint,同时注册了相关 event 处理函数。核心拓扑算法逻辑为:调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。另外,如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该service
  • wrapper 接受来自 kube-proxy 对 endpoints 以及 service 的 List&Watch 请求,以endpoints 为例:如果为List 请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回;如果为 Watch 请求,则不断从storageCache.endpointsWatchCh 管道中接受 watch event,并返回。service 逻辑与 endpoints 一致
展望

目前 SuperEdge service group 实现的拓扑算法功能更加灵活方便,如何处理与 Kubernetes 社区 service topology awareness 之间的关系值得探索,建议将 SuperEdge 拓扑算法推到社区。

【参考资料】

[1] 拓扑感知特性:【 https://kubernetes.io/docs/concepts/services-networking/service-topology/】

[2] duyanghao kubernetes-reading-notes: 【https://github.com/duyanghao/kubernetes-reading-notes/blob/master/superedge/service-group/README.md】

滑窗迭代算法图解(一文读懂SuperEdge拓扑算法)(2)

OpenCloudOS是由操作系统、云平台、软硬件厂商与个人共同倡议发起的操作系统社区项目。成立之初,即决定成为完全开放中立的开源社区,并已通过开放原子开源基金会的 TOC 评议,确认接受社区项目捐赠。社区将打造全面中立、开放、安全、稳定易用、高性能的Linux服务器操作系统为目标,与成员单位共同构建健康繁荣的国产操作系统生态。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页