Operator 是指一类基于 Kubernetes 自定义资源对象(CRD)和控制器(Controller)的云原生拓展服务,其中 CRD 定义了每个 operator 所创建和管理的自定义资源对象,Controller 则包含了管理这些对象所相关的运维逻辑代码。

其实operator和控制器是差不多,只不过operator是针对特定应用程序的控制器,比如数据库etcd,需要结合很多etcd的专业部署运维知识做逻辑处理。

Operator Pattern

Operator是由CoreOS公司开发的,用来扩展 Kubernetes API,特定的应用程序控制器,它用来创建、配置和管理复杂的有状态应用,如数据库、缓存和监控系统。Operator基于 Kubernetes 的资源和控制器概念之上构建,但同时又包含了应用程序特定的一些专业知识,比如创建一个数据库的Operator,则必须对创建的数据库的各种运维方式非常了解,创建Operator的关键是CRD(自定义资源)的设计。

CRD是对 Kubernetes API 的扩展,Kubernetes 中的每个资源都是一个 API 对象的集合,例如我们在YAML文件里定义的那些spec都是对 Kubernetes 中的资源对象的定义,所有的自定义资源可以跟 Kubernetes 中内建的资源一样使用 kubectl 操作。

Operator是将运维人员对软件操作的知识给代码化,同时利用 Kubernetes 强大的抽象来管理大规模的软件应用。目前CoreOS官方提供了几种Operator的实现,其中就包括我们今天的主角:Prometheus Operator,Operator的核心实现就是基于 Kubernetes 的以下两个概念:

  • 资源:对象的状态定义
  • 控制器:观测、分析和行动,以调节资源的分布

目前已经有很多已经实现的operator,可以直接去社区查看Kubernetes 社区推荐的一些 Operator 范例。

实例分析

以mysql-operator这个operator为例,我们具体分析一下一个Kubernetes Operator具体是如何实现的。

入口函数main

主要启动run

...
  if err := app.Run(opts); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
...

run函数

/ Run starts the mysql-operator controllers. This should never exit.
func Run(s *operatoropts.MySQLOperatorOpts) error {
// 构造kubeconfig以便连接kubernetes的APIServer
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.KubeConfig)
if err != nil {
    return err
}

...

// 构造kubeClient、 mysqlopClient, 以便操作Kubernetes里的一些资源
kubeClient := kubernetes.NewForConfigOrDie(kubeconfig)
mysqlopClient := clientset.NewForConfigOrDie(kubeconfig)
 // 构造一些共享的informer,以便监听自定义对象及kubernetes里的一些核心资源
// Shared informers (non namespace specific).
operatorInformerFactory := informers.NewFilteredSharedInformerFactory(mysqlopClient, resyncPeriod(s)(), s.Namespace, nil)
kubeInformerFactory := kubeinformers.NewFilteredSharedInformerFactory(kubeClient, resyncPeriod(s)(), s.Namespace, nil)
 var wg sync.WaitGroup
 // 构造自定义类型mysqlcluster的控制器
clusterController := cluster.NewController(
    *s,
    mysqlopClient,
    kubeClient,
    operatorInformerFactory.MySQL().V1alpha1().Clusters(),
    kubeInformerFactory.Apps().V1beta1().StatefulSets(),
    kubeInformerFactory.Core().V1().Pods(),
    kubeInformerFactory.Core().V1().Services(),
    30*time.Second,
    s.Namespace,
)
wg.Add(1)
go func() {
    defer wg.Done()
    clusterController.Run(ctx, 5)
}()

// 下面分别为每个自定义类型构造了相应的控制器
...

控制器

Kubernetes Operator的核心逻辑就在自定义类型的控制器里面

// NewController creates a new MySQLController.
func NewController(
    ...
) *MySQLController {
  // 构造MySQLController
  m := MySQLController{
        ...
    }
  // 监控自定义类型mysqlcluster的变化(增加、更新、删除),这里看一看m.enqueueCluster函数可以发现都只是把发生变化的自定义对象的名称放入工作队列中
    clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: m.enqueueCluster,
        UpdateFunc: func(old, new interface{}) {
            m.enqueueCluster(new)
        },
        DeleteFunc: func(obj interface{}) {
            cluster, ok := obj.(*v1alpha1.Cluster)
            if ok {
                m.onClusterDeleted(cluster.Name)
            }
        },
    })


// Run函数里会启动工作协程处理上述放入工作队列的自定义对象的名称
func (m *MySQLController) Run(ctx context.Context, threadiness int) {
  ...
  // Launch two workers to process Foo resources
    for i := 0; i < threadiness; i++ {
        go wait.Until(m.runWorker, time.Second, ctx.Done())
    }
    ...
}

从runWorker函数一步步跟踪过程,发现真正干活的是syncHandler函数

func (m *MySQLController) syncHandler(key string) error {
    ...
    nsName := types.NamespacedName{Namespace: namespace, Name: name}
    // Get the Cluster resource with this namespace/name.
    cluster, err := m.clusterLister.Clusters(namespace).Get(name)
    if err != nil {
      // 如果自定义资源对象已不存在,则不用处理
        // The Cluster resource may no longer exist, in which case we stop processing.
        if apierrors.IsNotFound(err) {
            utilruntime.HandleError(fmt.Errorf("mysqlcluster '%s' in work queue no longer exists", key))
            return nil
        }
        return err
    }
    cluster.EnsureDefaults()
    // 校验自定义资源对象
    if err = cluster.Validate(); err != nil {
        return errors.Wrap(err, "validating Cluster")
    }
  // 给自定义资源对象设置一些默认属性
    if cluster.Spec.Repository == "" {
        cluster.Spec.Repository = m.opConfig.Images.DefaultMySQLServerImage
    }
    ...
    svc, err := m.serviceLister.Services(cluster.Namespace).Get(cluster.Name)
    // If the resource doesn't exist, we'll create it
    // 如果该自定义资源对象存在,则应该要创建相应的Serivce,如Serivce不存在,则创建
    if apierrors.IsNotFound(err) {
        glog.V(2).Infof("Creating a new Service for cluster %q", nsName)
        svc = services.NewForCluster(cluster)
        err = m.serviceControl.CreateService(svc)
    }
    // If an error occurs during Get/Create, we'll requeue the item so we can
    // attempt processing again later. This could have been caused by a
    // temporary network failure, or any other transient reason.
    if err != nil {
        return err
    }
    // If the Service is not controlled by this Cluster resource, we should
    // log a warning to the event recorder and return.
    if !metav1.IsControlledBy(svc, cluster) {
        msg := fmt.Sprintf(MessageResourceExists, "Service", svc.Namespace, svc.Name)
        m.recorder.Event(cluster, corev1.EventTypeWarning, ErrResourceExists, msg)
        return errors.New(msg)
    }
    ss, err := m.statefulSetLister.StatefulSets(cluster.Namespace).Get(cluster.Name)
    // If the resource doesn't exist, we'll create it
    // 如果该自定义资源对象存在,则应该要创建相应的StatefulSet,如StatefulSet不存在,则创建
    if apierrors.IsNotFound(err) {
        glog.V(2).Infof("Creating a new StatefulSet for cluster %q", nsName)
        ss = statefulsets.NewForCluster(cluster, m.opConfig.Images, svc.Name)
        err = m.statefulSetControl.CreateStatefulSet(ss)
    }
    // If an error occurs during Get/Create, we'll requeue the item so we can
    // attempt processing again later. This could have been caused by a
    // temporary network failure, or any other transient reason.
    if err != nil {
        return err
    }
    // If the StatefulSet is not controlled by this Cluster resource, we
    // should log a warning to the event recorder and return.
    if !metav1.IsControlledBy(ss, cluster) {
        msg := fmt.Sprintf(MessageResourceExists, "StatefulSet", ss.Namespace, ss.Name)
        m.recorder.Event(cluster, corev1.EventTypeWarning, ErrResourceExists, msg)
        return fmt.Errorf(msg)
    }
    // Upgrade the required component resources the current MySQLOperator version.
    // 确保StatefulSet上的BuildVersion与自定义资源对象上的一致,如不一致,则修改得一致
    if err := m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion()); err != nil {
        return errors.Wrap(err, "ensuring MySQL Operator version")
    }
    // Upgrade the MySQL server version if required.
    if err := m.ensureMySQLVersion(cluster, ss); err != nil {
        return errors.Wrap(err, "ensuring MySQL version")
    }
    // If this number of the members on the Cluster does not equal the
    // current desired replicas on the StatefulSet, we should update the
    // StatefulSet resource.
    // 如果StatefulSet的Replicas值与自定义资源对象上配置不一致,则更新StatefulSet
    if cluster.Spec.Members != *ss.Spec.Replicas {
        glog.V(4).Infof("Updating %q: clusterMembers=%d statefulSetReplicas=%d",
            nsName, cluster.Spec.Members, ss.Spec.Replicas)
        old := ss.DeepCopy()
        ss = statefulsets.NewForCluster(cluster, m.opConfig.Images, svc.Name)
        if err := m.statefulSetControl.Patch(old, ss); err != nil {
            // Requeue the item so we can attempt processing again later.
            // This could have been caused by a temporary network failure etc.
            return err
        }
    }
    // Finally, we update the status block of the Cluster resource to
    // reflect the current state of the world.
    // 最后更新自定义资源对象的状态
    err = m.updateClusterStatus(cluster, ss)
    if err != nil {
        return err
    }
    m.recorder.Event(cluster, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    return nil
}

整个Operator大概就是这样了。

Operator 其实就是一段代码,这段代码 Watch 了 etcd 里一个描述分布式应用集群的API 对象,然后这段代码通过实现 Kubernetes 的控制器模式,来保证这个集群始终跟用户的定义完全相同。而在这个过程中,Operator 也有能力利用 Kubernetes 的存储、网络插件等外部资源,协同的为应用状态的保持提供帮助。

​所以说,Operator 本身在实现上,其实是在 Kubernetes 声明式 API 基础上的一种“微创新”。它合理的利用了 Kubernetes API 可以添加自定义 API 类型的能力,然后又巧妙的通过 Kubernetes 原生的“控制器模式”,完成了一个面向分布式应用终态的调谐过程。

自己开发一个operator

上面的代码是在官方推出的工具之前来写的,大部分代码的逻辑其实就是控制器的逻辑,很多都是一样的,所以官方推出了一个工具operator-sdk来生成相关模块代码,让我们专注于控制器逻辑的开发处理。

还有很多类似的工具

  • KUDO (Kubernetes 通用声明式 Operator)
  • kubebuilder,kubernetes SIG 在维护的一个项目,用于写控制器的,所以也是可以写operator
  • Metacontroller,可与 Webhook 结合使用,以实现自己的功能。

安装operator sdk

export RELEASE_VERSION=v0.13.0
curl -LO https://github.com/operator-framework/operator-sdk/releases/download/${RELEASE_VERSION}/operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu
chmod +x operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu && sudo mkdir -p /usr/local/bin/ && sudo cp operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu /usr/local/bin/operator-sdk && rm operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu

创建项目

用operator sdk 创建项目模板,这里用官方提供的一个sample-controller的模板:

operator-sdk new <controller-name> --repo github.com/kubernetes/sample-controller

创建项目

项目结构目录创建完成,如下:

$ operator-sdk new test-controller --repo github.com/kubernetes/sample-controller
$ tree
.
├── build
│   ├── bin
│   │   ├── entrypoint
│   │   └── user_setup
│   └── Dockerfile
├── cmd
│   └── manager
│       └── main.go
├── deploy
│   ├── operator.yaml
│   ├── role_binding.yaml
│   ├── role.yaml
│   └── service_account.yaml
├── go.mod
├── go.sum
├── pkg
│   ├── apis
│   │   └── apis.go
│   └── controller
│       └── controller.go
├── tools.go
└── version
    └── version.go

简单做一个说明

  • cmd - 包含 main.go 文件,使用 operator-sdk API 初始化和启动当前 Operator 的入口。
  • deploy - 包含一组用于在 Kubernetes 集群上进行部署的通用的 Kubernetes 资源清单文件。
  • pkg/apis - 包含定义的 API 和自定义资源(CRD)的目录树,这些文件允许 sdk 为 CRD 生成代码并注册对应的类型,以便正确解码自定义资源对象。
  • pkg/controller - 用于编写所有的操作业务逻辑的地方
  • vendor - golang vendor 文件夹,其中包含满足当前项目的所有外部依赖包,通过 go dep 管理该目录。

创建CRD

创建CRD:

operator-sdk add api --api-version=<api的版本> --kind=<类型名称>

创建CRD后,多出来了文件夹:

$ operator-sdk add api --api-version=test.k8s.realibox.com/v1 --kind=Realibox

INFO[0000] Generating api version test.k8s.realibox.com/v1 for kind Realibox.
INFO[0000] Created pkg/apis/test/group.go
INFO[0002] Created pkg/apis/test/v1/realibox_types.go
INFO[0002] Created pkg/apis/addtoscheme_test_v1.go
INFO[0002] Created pkg/apis/test/v1/register.go
INFO[0002] Created pkg/apis/test/v1/doc.go
INFO[0002] Created deploy/crds/test.k8s.realibox.com_v1_realibox_cr.yaml
INFO[0004] Created deploy/crds/test.k8s.realibox.com_realiboxes_crd.yaml
INFO[0004] Running deepcopy code-generation for Custom Resource group versions: [test:[v1], ]
INFO[0014] Code-generation complete.
INFO[0014] Running CRD generation for Custom Resource group versions: [test:[v1], ]
INFO[0014] Created deploy/crds/test.k8s.realibox.com_realiboxes_crd.yaml
INFO[0014] CRD generation complete.
INFO[0014] API generation complete.

$ tree
...
├── pkg
│   ├── apis
│   │   ├── addtoscheme_test_v1.go
│   │   ├── apis.go
│   │   └── test
│   │       ├── group.go
│   │       └── v1
│   │           ├── doc.go
│   │           ├── realibox_types.go
│   │           ├── register.go
│   │           └── zz_generated.deepcopy.go
│   └── controller
│       └── controller.go
...

test 文件夹下面放得就是 CRD,我们通过pkg/apis/test/v1/*_types.go文件定义我们的CRD结构,主要是Spec和Status:

vim pkg/apis/test/v1/realibox_types.go
...
// RealiboxSpec defines the desired state of Realibox
type RealiboxSpec struct {
        // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
        // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
        // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
}

type RealiboxStatus struct {
    // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
    // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
    // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
}
...

这里我们只改Spec字段,将RealiboxSpec结构体改为:

type RealiboxSpec struct {
    Domain string `json:"domain,omitempty"`
    OSS string `json:"oss,omitempty"`
    Size    string `json:"size,omitempty"`
}

更新CRD文件:

operator-sdk generate k8s
operator-sdk generate crds

CRD本质是一种k8s的资源,因此要使用crd,需要在K8s集群上创建CRD:

kubectl apply -f deploy/crds/test.k8s.realibox.com_realiboxes_crd.yaml

查看集群CRD:

$ kubectl get crd
NAME                                      CREATED AT
clusterauthtokens.cluster.cattle.io       2020-08-29T06:41:42Z
clusteruserattributes.cluster.cattle.io   2020-08-29T06:41:42Z
realiboxes.test.k8s.realibox.com          2020-08-29T07:57:44Z

编写controller

创建好 CRD 后,我们可以编写 controller 了,先创建一个 controller 监听和核对新创建的realibox资源类型:

命令行说明:

operator-sdk add controller --api-version=<api的版本> --kind=<类型名称>

运行结果:

$ operator-sdk add controller --api-version=test.k8s.realibox.com/v1 --kind=Realibox

$ tree
...
├── pkg
│   ├── apis
│   │   ├── addtoscheme_test_v1.go
│   │   ├── apis.go
│   │   └── test
│   │       ├── group.go
│   │       └── v1
│   │           ├── doc.go
│   │           ├── realibox_types.go
│   │           ├── register.go
│   │           └── zz_generated.deepcopy.go
│   └── controller
│       ├── add_realibox.go
│       ├── controller.go
│       └── realibox
│           └── realibox_controller.go
...

在pkg/controller目录下生成了controller代码,在pkg/controller/realibox/realibox_controller.go编写代码逻辑即可,在这里,我将CR信息在创建pod之前打印到日志里:

...
func (r *ReconcileRealibox) Reconcile(request reconcile.Request) (reconcile.Result, error) {
  ...
    reqLogger.Info(fmt.Sprintf("Domain: %v created, oss info:%v, size: %v",instance.Spec.Domain,instance.Spec.OSS, instance.Spec.Size))
    // Define a new Pod object
    pod := newPodForCR(instance)

    ...
}
...

下面就可以运行 controller 了。

注:如果希望对集群进行更多地复杂操作,可以使用client-go来操作 Kubernetes 的资源,client-go是一个对 Kubernetes API 进行封装的库,由 Kubernetes 官方提供,还是十分好用的。

运行 controller

运行controller有两种方法,可以在本地直接运行controller,也可以打包到k8s运行。

实例

etcd-operator

发展

官方是希望通过Operator封装大部分基础服务软件的运维操作的,但目前很多Operator并不完善。比如虽然形式上给Operator划分了5个成熟度等级,但实际上大部分Operator仅只能完成安装部署而已。

还有很多Operator明确说明目前只是alpha状态,目前不建议投入生产。