https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

  1. kubebuilder实战之一:准备工作
  2. kubebuilder实战之二:初次体验kubebuilder
  3. kubebuilder实战之三:基础知识速览
  4. kubebuilder实战之四:operator需求说明和设计
  5. kubebuilder实战之五:operator编码
  6. kubebuilder实战之六:构建部署运行
  7. kubebuilder实战之七:webhook
  8. kubebuilder实战之八:知识点小记
  • 本篇是《kubebuilder实战》系列的第五篇,前面的一切努力(环境准备、知识储备、需求分析、数据结构和业务逻辑设计),都是为了将之前的设计用编码实现;
  • 既然已经充分准备,如今无需太多言语,咱们开始动手吧!
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,kubebuilder相关的应用在kubebuilder文件夹下,如下图红框所示:

在这里插入图片描述

  • kubebuilder文件夹下有多个子文件夹,本篇对应的源码在elasticweb目录下,如下图红框所示:

在这里插入图片描述

  • 新建名为elasticweb的文件夹,在里面执行以下命令即可创建名为elasticweb的项目,domain为com.bolingcavalry
  1. go mod init elasticweb
  2. kubebuilder init --domain com.bolingcavalry
  • 然后是CRD,执行以下命令即可创建相关资源:
  1. kubebuilder create api \
  2. --group elasticweb \
  3. --version v1 \
  4. --kind ElasticWeb
  • 然后用IDE打开整个工程,我这里是goland:

在这里插入图片描述

  • 打开文件api/v1/elasticweb_types.go,做以下几步改动:
  1. 修改数据结构ElasticWebSpec,增加前文设计的四个字段;
  2. 修改数据结构ElasticWebStatus,增加前文设计的一个字段;
  3. 增加String方法,这样打印日志时方便我们查看,注意RealQPS字段是指针,因此可能为空,需要判空;
  • 完整的elasticweb_types.go如下所示:
  1. package v1
  2. import (
  3. "fmt"
  4. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  5. "strconv"
  6. )
  7. // 期望状态
  8. type ElasticWebSpec struct {
  9. // 业务服务对应的镜像,包括名称:tag
  10. Image string `json:"image"`
  11. // service占用的宿主机端口,外部请求通过此端口访问pod的服务
  12. Port *int32 `json:"port"`
  13. // 单个pod的QPS上限
  14. SinglePodQPS *int32 `json:"singlePodQPS"`
  15. // 当前整个业务的总QPS
  16. TotalQPS *int32 `json:"totalQPS"`
  17. }
  18. // 实际状态,该数据结构中的值都是业务代码计算出来的
  19. type ElasticWebStatus struct {
  20. // 当前kubernetes中实际支持的总QPS
  21. RealQPS *int32 `json:"realQPS"`
  22. }
  23. // +kubebuilder:object:root=true
  24. // ElasticWeb is the Schema for the elasticwebs API
  25. type ElasticWeb struct {
  26. metav1.TypeMeta `json:",inline"`
  27. metav1.ObjectMeta `json:"metadata,omitempty"`
  28. Spec ElasticWebSpec `json:"spec,omitempty"`
  29. Status ElasticWebStatus `json:"status,omitempty"`
  30. }
  31. func (in *ElasticWeb) String() string {
  32. var realQPS string
  33. if nil == in.Status.RealQPS {
  34. realQPS = "nil"
  35. } else {
  36. realQPS = strconv.Itoa(int(*(in.Status.RealQPS)))
  37. }
  38. return fmt.Sprintf("Image [%s], Port [%d], SinglePodQPS [%d], TotalQPS [%d], RealQPS [%s]",
  39. in.Spec.Image,
  40. *(in.Spec.Port),
  41. *(in.Spec.SinglePodQPS),
  42. *(in.Spec.TotalQPS),
  43. realQPS)
  44. }
  45. // +kubebuilder:object:root=true
  46. // ElasticWebList contains a list of ElasticWeb
  47. type ElasticWebList struct {
  48. metav1.TypeMeta `json:",inline"`
  49. metav1.ListMeta `json:"metadata,omitempty"`
  50. Items []ElasticWeb `json:"items"`
  51. }
  52. func init() {
  53. SchemeBuilder.Register(&ElasticWeb{}, &ElasticWebList{})
  54. }
  • 在elasticweb目录下执行make install即可部署CRD到kubernetes:
  1. zhaoqin@zhaoqindeMBP-2 elasticweb % make install
  2. /Users/zhaoqin/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
  3. kustomize build config/crd | kubectl apply -f -
  4. Warning: apiextensions.k8s.io/v1beta1 CustomResourceDefinition is deprecated in v1.16+, unavailable in v1.22+; use apiextensions.k8s.io/v1 CustomResourceDefinition
  5. customresourcedefinition.apiextensions.k8s.io/elasticwebs.elasticweb.com.bolingcavalry created
  • 部署成功后,用api-versions命令可以查到该GV:

在这里插入图片描述

  • 核心数据结构设计编码完毕,接下来该编写业务逻辑代码了,大家还记得前文设计的业务流程吧,简单回顾一下,如下图:
    在这里插入图片描述

  • 打开文件elasticweb_controller.go,接下来咱们逐渐添加内容;

  • 咱们的elasticweb会对service、deployment这两种资源做查询、新增、修改等操作,因此需要这些资源的操作权限,增加下图红框中的两行注释,这样代码生成工具就会在RBAC配置中增加对应的权限:

在这里插入图片描述

  • 先把常量准备好,可见每个pod使用的CPU和内存都是在此固定的,您也可以改成在Spec中定义,这样就可以从外部传入了,另外这里为每个pod只分配了0.1个CPU,主要是因为我穷买不起好的CPU,您可以酌情调整该值:
  1. const (
  2. // deployment中的APP标签名
  3. APP_NAME = "elastic-app"
  4. // tomcat容器的端口号
  5. CONTAINER_PORT = 8080
  6. // 单个POD的CPU资源申请
  7. CPU_REQUEST = "100m"
  8. // 单个POD的CPU资源上限
  9. CPU_LIMIT = "100m"
  10. // 单个POD的内存资源申请
  11. MEM_REQUEST = "512Mi"
  12. // 单个POD的内存资源上限
  13. MEM_LIMIT = "512Mi"
  14. )
  • 有个很重要的逻辑:根据单个pod的QPS和总QPS,计算需要多少个pod,咱们将这个逻辑封装到一个方法中以便使用:
  1. / 根据单个QPS和总QPS计算pod数量
  2. func getExpectReplicas(elasticWeb *elasticwebv1.ElasticWeb) int32 {
  3. // 单个pod的QPS
  4. singlePodQPS := *(elasticWeb.Spec.SinglePodQPS)
  5. // 期望的总QPS
  6. totalQPS := *(elasticWeb.Spec.TotalQPS)
  7. // Replicas就是要创建的副本数
  8. replicas := totalQPS / singlePodQPS
  9. if totalQPS%singlePodQPS > 0 {
  10. replicas++
  11. }
  12. return replicas
  13. }
  • 将创建service的操作封装到一个方法中,是的主干代码的逻辑更清晰,可读性更强;
  • 创建service的时候,有几处要注意:
  1. 先查看service是否存在,不存在才创建;
  2. 将service和CRD实例elasticWeb建立关联(controllerutil.SetControllerReference方法),这样当elasticWeb被删除的时候,service会被自动删除而无需我们干预;
  3. 创建service的时候用到了client-go工具,推荐您阅读《client-go实战系列》,工具越熟练,编码越尽兴;
  • 创建service的完整方法如下:
  1. // 新建service
  2. func createServiceIfNotExists(ctx context.Context, r *ElasticWebReconciler, elasticWeb *elasticwebv1.ElasticWeb, req ctrl.Request) error {
  3. log := r.Log.WithValues("func", "createService")
  4. service := &corev1.Service{}
  5. err := r.Get(ctx, req.NamespacedName, service)
  6. // 如果查询结果没有错误,证明service正常,就不做任何操作
  7. if err == nil {
  8. log.Info("service exists")
  9. return nil
  10. }
  11. // 如果错误不是NotFound,就返回错误
  12. if !errors.IsNotFound(err) {
  13. log.Error(err, "query service error")
  14. return err
  15. }
  16. // 实例化一个数据结构
  17. service = &corev1.Service{
  18. ObjectMeta: metav1.ObjectMeta{
  19. Namespace: elasticWeb.Namespace,
  20. Name: elasticWeb.Name,
  21. },
  22. Spec: corev1.ServiceSpec{
  23. Ports: []corev1.ServicePort{{
  24. Name: "http",
  25. Port: 8080,
  26. NodePort: *elasticWeb.Spec.Port,
  27. },
  28. },
  29. Selector: map[string]string{
  30. "app": APP_NAME,
  31. },
  32. Type: corev1.ServiceTypeNodePort,
  33. },
  34. }
  35. // 这一步非常关键!
  36. // 建立关联后,删除elasticweb资源时就会将deployment也删除掉
  37. log.Info("set reference")
  38. if err := controllerutil.SetControllerReference(elasticWeb, service, r.Scheme); err != nil {
  39. log.Error(err, "SetControllerReference error")
  40. return err
  41. }
  42. // 创建service
  43. log.Info("start create service")
  44. if err := r.Create(ctx, service); err != nil {
  45. log.Error(err, "create service error")
  46. return err
  47. }
  48. log.Info("create service success")
  49. return nil
  50. }
  • 将创建deployment的操作封装在一个方法中,同样是为了将主干逻辑保持简洁;
  • 创建deployment的方法也有几处要注意:
  1. 调用getExpectReplicas方法得到要创建的pod的数量,该数量是创建deployment时的一个重要参数;
  2. 每个pod所需的CPU和内存资源也是deployment的参数;
  3. 将deployment和elasticweb建立关联,这样删除elasticweb的时候deplyment就会被自动删除了;
  4. 同样是使用client-go客户端工具创建deployment资源;
  1. // 新建deployment
  2. func createDeployment(ctx context.Context, r *ElasticWebReconciler, elasticWeb *elasticwebv1.ElasticWeb) error {
  3. log := r.Log.WithValues("func", "createDeployment")
  4. // 计算期望的pod数量
  5. expectReplicas := getExpectReplicas(elasticWeb)
  6. log.Info(fmt.Sprintf("expectReplicas [%d]", expectReplicas))
  7. // 实例化一个数据结构
  8. deployment := &appsv1.Deployment{
  9. ObjectMeta: metav1.ObjectMeta{
  10. Namespace: elasticWeb.Namespace,
  11. Name: elasticWeb.Name,
  12. },
  13. Spec: appsv1.DeploymentSpec{
  14. // 副本数是计算出来的
  15. Replicas: pointer.Int32Ptr(expectReplicas),
  16. Selector: &metav1.LabelSelector{
  17. MatchLabels: map[string]string{
  18. "app": APP_NAME,
  19. },
  20. },
  21. Template: corev1.PodTemplateSpec{
  22. ObjectMeta: metav1.ObjectMeta{
  23. Labels: map[string]string{
  24. "app": APP_NAME,
  25. },
  26. },
  27. Spec: corev1.PodSpec{
  28. Containers: []corev1.Container{
  29. {
  30. Name: APP_NAME,
  31. // 用指定的镜像
  32. Image: elasticWeb.Spec.Image,
  33. ImagePullPolicy: "IfNotPresent",
  34. Ports: []corev1.ContainerPort{
  35. {
  36. Name: "http",
  37. Protocol: corev1.ProtocolSCTP,
  38. ContainerPort: CONTAINER_PORT,
  39. },
  40. },
  41. Resources: corev1.ResourceRequirements{
  42. Requests: corev1.ResourceList{
  43. "cpu": resource.MustParse(CPU_REQUEST),
  44. "memory": resource.MustParse(MEM_REQUEST),
  45. },
  46. Limits: corev1.ResourceList{
  47. "cpu": resource.MustParse(CPU_LIMIT),
  48. "memory": resource.MustParse(MEM_LIMIT),
  49. },
  50. },
  51. },
  52. },
  53. },
  54. },
  55. },
  56. }
  57. // 这一步非常关键!
  58. // 建立关联后,删除elasticweb资源时就会将deployment也删除掉
  59. log.Info("set reference")
  60. if err := controllerutil.SetControllerReference(elasticWeb, deployment, r.Scheme); err != nil {
  61. log.Error(err, "SetControllerReference error")
  62. return err
  63. }
  64. // 创建deployment
  65. log.Info("start create deployment")
  66. if err := r.Create(ctx, deployment); err != nil {
  67. log.Error(err, "create deployment error")
  68. return err
  69. }
  70. log.Info("create deployment success")
  71. return nil
  72. }
  • 不论是创建deployment资源对象,还是对已有的deployment的pod数量做调整,这些操作完成后都要去修改Status,既实际的状态,这样外部才能随时随地知道当前elasticweb支持多大的QPS,因此需要将修改Status的操作封装到一个方法中,给多个场景使用,Status的计算逻辑很简单:pod数量乘以每个pod的QPS就是总QPS了,代码如下:
  1. // 完成了pod的处理后,更新最新状态
  2. func updateStatus(ctx context.Context, r *ElasticWebReconciler, elasticWeb *elasticwebv1.ElasticWeb) error {
  3. log := r.Log.WithValues("func", "updateStatus")
  4. // 单个pod的QPS
  5. singlePodQPS := *(elasticWeb.Spec.SinglePodQPS)
  6. // pod总数
  7. replicas := getExpectReplicas(elasticWeb)
  8. // 当pod创建完毕后,当前系统实际的QPS:单个pod的QPS * pod总数
  9. // 如果该字段还没有初始化,就先做初始化
  10. if nil == elasticWeb.Status.RealQPS {
  11. elasticWeb.Status.RealQPS = new(int32)
  12. }
  13. *(elasticWeb.Status.RealQPS) = singlePodQPS * replicas
  14. log.Info(fmt.Sprintf("singlePodQPS [%d], replicas [%d], realQPS[%d]", singlePodQPS, replicas, *(elasticWeb.Status.RealQPS)))
  15. if err := r.Update(ctx, elasticWeb); err != nil {
  16. log.Error(err, "update instance error")
  17. return err
  18. }
  19. return nil
  20. }
  • 前面细枝末节都处理完毕,可以开始主流程了,有了前面的流程图的赋值,主流程的代码很容就写出来了,如下所示,已经添加了足够的注释,就不再赘述了:
  1. func (r *ElasticWebReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
  2. // 会用到context
  3. ctx := context.Background()
  4. log := r.Log.WithValues("elasticweb", req.NamespacedName)
  5. // your logic here
  6. log.Info("1. start reconcile logic")
  7. // 实例化数据结构
  8. instance := &elasticwebv1.ElasticWeb{}
  9. // 通过客户端工具查询,查询条件是
  10. err := r.Get(ctx, req.NamespacedName, instance)
  11. if err != nil {
  12. // 如果没有实例,就返回空结果,这样外部就不再立即调用Reconcile方法了
  13. if errors.IsNotFound(err) {
  14. log.Info("2.1. instance not found, maybe removed")
  15. return reconcile.Result{}, nil
  16. }
  17. log.Error(err, "2.2 error")
  18. // 返回错误信息给外部
  19. return ctrl.Result{}, err
  20. }
  21. log.Info("3. instance : " + instance.String())
  22. // 查找deployment
  23. deployment := &appsv1.Deployment{}
  24. // 用客户端工具查询
  25. err = r.Get(ctx, req.NamespacedName, deployment)
  26. // 查找时发生异常,以及查出来没有结果的处理逻辑
  27. if err != nil {
  28. // 如果没有实例就要创建了
  29. if errors.IsNotFound(err) {
  30. log.Info("4. deployment not exists")
  31. // 如果对QPS没有需求,此时又没有deployment,就啥事都不做了
  32. if *(instance.Spec.TotalQPS) < 1 {
  33. log.Info("5.1 not need deployment")
  34. // 返回
  35. return ctrl.Result{}, nil
  36. }
  37. // 先要创建service
  38. if err = createServiceIfNotExists(ctx, r, instance, req); err != nil {
  39. log.Error(err, "5.2 error")
  40. // 返回错误信息给外部
  41. return ctrl.Result{}, err
  42. }
  43. // 立即创建deployment
  44. if err = createDeployment(ctx, r, instance); err != nil {
  45. log.Error(err, "5.3 error")
  46. // 返回错误信息给外部
  47. return ctrl.Result{}, err
  48. }
  49. // 如果创建成功就更新状态
  50. if err = updateStatus(ctx, r, instance); err != nil {
  51. log.Error(err, "5.4. error")
  52. // 返回错误信息给外部
  53. return ctrl.Result{}, err
  54. }
  55. // 创建成功就可以返回了
  56. return ctrl.Result{}, nil
  57. } else {
  58. log.Error(err, "7. error")
  59. // 返回错误信息给外部
  60. return ctrl.Result{}, err
  61. }
  62. }
  63. // 如果查到了deployment,并且没有返回错误,就走下面的逻辑
  64. // 根据单QPS和总QPS计算期望的副本数
  65. expectReplicas := getExpectReplicas(instance)
  66. // 当前deployment的期望副本数
  67. realReplicas := *deployment.Spec.Replicas
  68. log.Info(fmt.Sprintf("9. expectReplicas [%d], realReplicas [%d]", expectReplicas, realReplicas))
  69. // 如果相等,就直接返回了
  70. if expectReplicas == realReplicas {
  71. log.Info("10. return now")
  72. return ctrl.Result{}, nil
  73. }
  74. // 如果不等,就要调整
  75. *(deployment.Spec.Replicas) = expectReplicas
  76. log.Info("11. update deployment's Replicas")
  77. // 通过客户端更新deployment
  78. if err = r.Update(ctx, deployment); err != nil {
  79. log.Error(err, "12. update deployment replicas error")
  80. // 返回错误信息给外部
  81. return ctrl.Result{}, err
  82. }
  83. log.Info("13. update status")
  84. // 如果更新deployment的Replicas成功,就更新状态
  85. if err = updateStatus(ctx, r, instance); err != nil {
  86. log.Error(err, "14. update status error")
  87. // 返回错误信息给外部
  88. return ctrl.Result{}, err
  89. }
  90. return ctrl.Result{}, nil
  91. }
  • 至此,整个elasticweb operator编码就完成了,限于篇幅,咱们把部署、运行、镜像制作等操作放在下一篇文章吧;
  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…
https://github.com/zq2599/blog_demos

版权声明:本文为bolingcavalry原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/bolingcavalry/p/15204333.html