博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kubernetes源码阅读笔记——Scheduler(之一)
阅读量:5299 次
发布时间:2019-06-14

本文共 13721 字,大约阅读时间需要 45 分钟。

Scheduler是集群中Master节点的重要组件,其功能是根据集群中各Pod的资源需求、亲和性等指标,将Pod合理调度到Kubernetes集群中的各个节点上。

一、入口函数

入口函数与Controller Manager的入口函数结构相同,同样是应用了cobra包,在命令行中注册了kube-scheduler命令。

cmd/kube-scheduler/scheduler.go func main() {	rand.Seed(time.Now().UnixNano())	command := app.NewSchedulerCommand()	pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)		logs.InitLogs()	defer logs.FlushLogs()	if err := command.Execute(); err != nil {		fmt.Fprintf(os.Stderr, "%v\n", err)		os.Exit(1)	}}

这里核心的方法仍然是NewSchedulerCommand。该方法位于app/server.go中,结构与Controller Manager几乎一样,因此不贴上来了。核心的部分仍然是在cobra.Command结构体的Run字段中调用runCommand方法。

runCommand方法为Scheduler配置Config,最终返回的是Run方法,将Scheduler运行起来。

runCommand方法中间有一行值得注意:

func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {    ...    algorithmprovider.ApplyFeatureGates()    ...
return Run(cc, stopCh)
}

这一行的作用是调用ApplyFeatureGates方法,并根据Feature Gate的配置,注册或者删除相应的预选策略。

进入ApplyFeatureGates方法,发现方法就一行,而整个包就这一个方法:

pkg/scheduler/algorithmprovider/plugin.gopackage algorithmproviderimport  "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"// ApplyFeatureGates applies algorithm by feature gates.func ApplyFeatureGates() {    defaults.ApplyFeatureGates()}

事实上,在pkg/scheduler/algorirhmprovider/defaults/defaults.go中,有一个init方法:

pkg/scheduler/algorirhmprovider/defaults/defaults.go func init() {    registerAlgorithmProvider(defaultPredicates(), defaultPriorities())}

因此,在导入defaults包时,就已经执行了registerAlgorithmProvider方法,对一些预选与优选方法进行了注册。再配合ApplyFeatureGates方法,根据k8s中一些feature的开启情况,增加或删除一些预选和优选方法。这些feature的位置在pkg/features/kube_features.go中。

详细的预选和优选方法的定义位于pkg/scheduler/algorithm和pkg/scheduler/algorithmprovider中,这里不详细展开。

二、Run

看一下Run方法:

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {    // Create the scheduler.    sched, err := scheduler.New(cc.Client,        cc.InformerFactory.Core().V1().Nodes(),        cc.PodInformer,        cc.InformerFactory.Core().V1().PersistentVolumes(),        cc.InformerFactory.Core().V1().PersistentVolumeClaims(),        cc.InformerFactory.Core().V1().ReplicationControllers(),        cc.InformerFactory.Apps().V1().ReplicaSets(),        cc.InformerFactory.Apps().V1().StatefulSets(),        cc.InformerFactory.Core().V1().Services(),        cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),        cc.InformerFactory.Storage().V1().StorageClasses(),        cc.Recorder,        cc.ComponentConfig.AlgorithmSource,        stopCh,        scheduler.WithName(cc.ComponentConfig.SchedulerName),        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))    if err != nil {        return err    }    // Prepare the event broadcaster.    ...    // Setup healthz checks.    ...// Start all informers.    go cc.PodInformer.Informer().Run(stopCh)    cc.InformerFactory.Start(stopCh)    // Wait for all caches to sync before scheduling.    cc.InformerFactory.WaitForCacheSync(stopCh)    controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)    // Prepare a reusable runCommand function.    run := func(ctx context.Context) {        sched.Run()        <-ctx.Done()    }    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here    defer cancel()    go func() {        select {        case <-stopCh:            cancel()        case <-ctx.Done():        }    }()    // If leader election is enabled, runCommand via LeaderElector until done and exit.    ...    // Leader election is disabled, so runCommand inline until done.    run(ctx)    return fmt.Errorf("finished without leader elect")}

Run方法主要包含下面几件事:

(1)创建Scheduler。

Run方法的前几行代码调用了New方法,创建了一个Scheduler对象。这个New方法位于pkg/scheduler/scheduler.go中:

pkg/scheduler/scheduler.gofunc New(client clientset.Interface,    nodeInformer coreinformers.NodeInformer,    podInformer coreinformers.PodInformer,    pvInformer coreinformers.PersistentVolumeInformer,    pvcInformer coreinformers.PersistentVolumeClaimInformer,    replicationControllerInformer coreinformers.ReplicationControllerInformer,    replicaSetInformer appsinformers.ReplicaSetInformer,    statefulSetInformer appsinformers.StatefulSetInformer,    serviceInformer coreinformers.ServiceInformer,    pdbInformer policyinformers.PodDisruptionBudgetInformer,    storageClassInformer storageinformers.StorageClassInformer,    recorder record.EventRecorder,    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,    stopCh <-chan struct{},    opts ...func(o *schedulerOptions)) (*Scheduler, error) {    options := defaultSchedulerOptions    for _, opt := range opts {        opt(&options)    }    // Set up the configurator which can create schedulers from configs.    configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{        SchedulerName:                  options.schedulerName,        Client:                         client,        NodeInformer:                   nodeInformer,        PodInformer:                    podInformer,        PvInformer:                     pvInformer,        PvcInformer:                    pvcInformer,        ReplicationControllerInformer:  replicationControllerInformer,        ReplicaSetInformer:             replicaSetInformer,        StatefulSetInformer:            statefulSetInformer,        ServiceInformer:                serviceInformer,        PdbInformer:                    pdbInformer,        StorageClassInformer:           storageClassInformer,        HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,        DisablePreemption:              options.disablePreemption,        PercentageOfNodesToScore:       options.percentageOfNodesToScore,        BindTimeoutSeconds:             options.bindTimeoutSeconds,    })    var config *factory.Config    source := schedulerAlgorithmSource    switch {    case source.Provider != nil:        // Create the config from a named algorithm provider.        ...    case source.Policy != nil:        // Create the config from a user specified policy source.        ...    default:        return nil, fmt.Errorf("unsupported algorithm source: %v", source)    }    // Additional tweaks to the config produced by the configurator.    config.Recorder = recorder    config.DisablePreemption = options.disablePreemption    config.StopEverything = stopCh    // Create the scheduler.    sched := NewFromConfig(config)    return sched, nil}

New方法逻辑相对清晰,其本质就是根据传入的Informer、算法等参数,实例化一个Config,然后调用NewFromConfig方法,通过这个Config创建一个scheduler实例并返回。可以看到,scheduler中也用到了包括nodeInformer、podInformer等在内的大量Informer,因为scheduler也需要及时掌握资源的变化,从而调整调度的策略。

中间switch一段代码会判断config的调度算法源是用户自定义的还是给定的provider。如果使用默认的provider,则会将前面注册过的预选、优选方法加载进来。

创建config的NewConfigFactory方法位于pkg/scheduler/factory/factory.go中,进入方法:

pkg/scheduler/factory/factory.go// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only// return the interface.func NewConfigFactory(args *ConfigFactoryArgs) Configurator {   stopEverything := args.StopCh   if stopEverything == nil {      stopEverything = wait.NeverStop   }   schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)   // storageClassInformer is only enabled through VolumeScheduling feature gate   var storageClassLister storagelisters.StorageClassLister   if args.StorageClassInformer != nil {      storageClassLister = args.StorageClassInformer.Lister()   }   c := &configFactory{      client:                         args.Client,      podLister:                      schedulerCache,      podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),      nodeLister:                     args.NodeInformer.Lister(),      pVLister:                       args.PvInformer.Lister(),      pVCLister:                      args.PvcInformer.Lister(),      serviceLister:                  args.ServiceInformer.Lister(),      controllerLister:               args.ReplicationControllerInformer.Lister(),      replicaSetLister:               args.ReplicaSetInformer.Lister(),      statefulSetLister:              args.StatefulSetInformer.Lister(),      pdbLister:                      args.PdbInformer.Lister(),      storageClassLister:             storageClassLister,      schedulerCache:                 schedulerCache,      StopEverything:                 stopEverything,      schedulerName:                  args.SchedulerName,      hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,      disablePreemption:              args.DisablePreemption,      percentageOfNodesToScore:       args.PercentageOfNodesToScore,   }   c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced   // scheduled pod cache   args.PodInformer.Informer().AddEventHandler(      cache.FilteringResourceEventHandler{         FilterFunc: func(obj interface{}) bool {            switch t := obj.(type) {            case *v1.Pod:               return assignedPod(t)            case cache.DeletedFinalStateUnknown:               if pod, ok := t.Obj.(*v1.Pod); ok {                  return assignedPod(pod)               }               runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))               return false            default:               runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))               return false            }         },         Handler: cache.ResourceEventHandlerFuncs{            AddFunc:    c.addPodToCache,            UpdateFunc: c.updatePodInCache,            DeleteFunc: c.deletePodFromCache,         },      },   )   // unscheduled pod queue   args.PodInformer.Informer().AddEventHandler(      cache.FilteringResourceEventHandler{         FilterFunc: func(obj interface{}) bool {            switch t := obj.(type) {            case *v1.Pod:               return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)            case cache.DeletedFinalStateUnknown:               if pod, ok := t.Obj.(*v1.Pod); ok {                  return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)               }               runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))               return false            default:               runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))               return false            }         },         Handler: cache.ResourceEventHandlerFuncs{            AddFunc:    c.addPodToSchedulingQueue,            UpdateFunc: c.updatePodInSchedulingQueue,            DeleteFunc: c.deletePodFromSchedulingQueue,         },      },   )   // ScheduledPodLister is something we provide to plug-in functions that   // they may need to call.   c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}   args.NodeInformer.Informer().AddEventHandler(      cache.ResourceEventHandlerFuncs{         AddFunc:    c.addNodeToCache,         UpdateFunc: c.updateNodeInCache,         DeleteFunc: c.deleteNodeFromCache,      },   )   args.PvInformer.Informer().AddEventHandler(      cache.ResourceEventHandlerFuncs{         // MaxPDVolumeCountPredicate: since it relies on the counts of PV.         AddFunc:    c.onPvAdd,         UpdateFunc: c.onPvUpdate,      },   )   // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.   args.PvcInformer.Informer().AddEventHandler(      cache.ResourceEventHandlerFuncs{         AddFunc:    c.onPvcAdd,         UpdateFunc: c.onPvcUpdate,      },   )   // This is for ServiceAffinity: affected by the selector of the service is updated.   args.ServiceInformer.Informer().AddEventHandler(      cache.ResourceEventHandlerFuncs{         AddFunc:    c.onServiceAdd,         UpdateFunc: c.onServiceUpdate,         DeleteFunc: c.onServiceDelete,      },   )   // Setup volume binder   c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)   args.StorageClassInformer.Informer().AddEventHandler(      cache.ResourceEventHandlerFuncs{         AddFunc: c.onStorageClassAdd,      },   )   // Setup cache debugger   ...   go func() {      <-c.StopEverything      c.podQueue.Close()   }()   return c}

该方法为一系列Informer初始化了回调函数。其中最重要的是PodInformer的两个回调函数。

可以看到,方法调用了两次AddEventHandler方法,都经过了过滤。第一次只处理已调度的Pod,第二次只处理未调度的Pod,并定义了对两种Pod的增、改、删方法,分别在缓存和队列中对这两种Pod进行更新。这样,就将已调度和未调度的Pod区分开。

后面为其他informer添加的回调函数,除了NodeInformer的回调函数会在缓存中更新node信息,其他回调函数最终都会调用MoveAllToActiveQueue方法,将待调度的Pod添加进队列。

此外,可以看到,在ConfigFactory中,有一个podQueue字段,维护了一个队列,用于存放待调度的Pod。

(2)运行广播和健康检查。

中间有几行是为Scheduler配置广播和健康检查相关内容,与Controller Manager类似,不提。

(3)Informer启动。

值得注意的是,Scheduler将PodInformer从其他的Informer中独立出来,因为对Pod的调度才是Scheduler的核心。

(4)运行Scheduler。

这是整个方法的核心。通过调用Scheduler的Run方法,将Scheduler运行起来。

进入Run方法,我们发现方法非常简洁,就做了2件事:

pkg/scheduler/scheduler.gofunc (sched *Scheduler) Run() {	if !sched.config.WaitForCacheSync() {		return	}	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)}

第一件事是等待各informer的缓存同步,第二件事是调用scheduleOne方法,执行Pod的调度操作。wait.Until的作用是每隔一段时间执行一次sched.scheduleOne方法,除非sched.config.StopEverything被关闭。这里时间段被设置为0,所以scheduleOne方法会一个接一个不停地被调用。

scheduleOne方法的具体逻辑我们下一篇文章再继续分析。

三、总结

总结Scheduler的逻辑,大体上是通过cobra注册一个kube-scheduler命令并运行。命令运行时,首先应用给定的调度算法,然后基于ConfigFactory,创建一个Scheduler的实例,启动相关的Informer,然后开始执行调度。

转载于:https://www.cnblogs.com/00986014w/p/10305425.html

你可能感兴趣的文章
记录magento通过csv文件与zip(图片压缩)上传产品到数据库的过程
查看>>
BZOJ_3039_玉蟾宫_(动态规划+悬线法)
查看>>
Struts2 OGNL 自动转换Date类型的一些注意事项
查看>>
vue-cli + webpack自动生成项目
查看>>
定义Bash提示符中显示IP
查看>>
两个div如何并列 (转)
查看>>
SSH2框架下数据库语句的编写格式(一)
查看>>
返回结果数据帮助类
查看>>
SVN部署和使用
查看>>
Build Tools
查看>>
Mysql的基础使用之MariaDB安装
查看>>
单链表操作B 分类: 链表 2015-06-0...
查看>>
周赛-Heros and Swords 分类: 比赛 ...
查看>>
Error:No suitable device found: no device found for connection "System eth0"
查看>>
Go beego框架使用笔记(一)
查看>>
jQuery各种效果举例
查看>>
Day47:HTML(简介及常用标签)
查看>>
Redis.md
查看>>
软件工程课堂小测01
查看>>
大道至简阅读笔记02
查看>>