From 49b0d8706a848ef013a1fe0452becd754de73fca Mon Sep 17 00:00:00 2001 From: Denis Khachyan Date: Thu, 1 Dec 2022 15:39:02 +0200 Subject: [PATCH 1/2] merge vp and cvp reconcile funcs --- api/v1alpha1/clustervectorpipeline.go | 4 + api/v1alpha1/clustervectorpipeline_types.go | 2 +- api/v1alpha1/vectorpipeline.go | 4 + ...ity.kaasops.io_clustervectorpipelines.yaml | 2 +- config/rbac/role.yaml | 52 ------- ...bility_v1alpha1_clustervectorpipeline.yaml | 12 +- .../clustervectorpipeline_controller.go | 143 ------------------ controllers/factory/config/utils.go | 5 +- controllers/factory/pipeline/pipeline.go | 1 + ...e_controller.go => pipeline_controller.go} | 83 +++++----- main.go | 10 +- 11 files changed, 67 insertions(+), 251 deletions(-) delete mode 100644 controllers/clustervectorpipeline_controller.go rename controllers/{vectorpipeline_controller.go => pipeline_controller.go} (51%) diff --git a/api/v1alpha1/clustervectorpipeline.go b/api/v1alpha1/clustervectorpipeline.go index b27b437a..aba26cc5 100644 --- a/api/v1alpha1/clustervectorpipeline.go +++ b/api/v1alpha1/clustervectorpipeline.go @@ -42,6 +42,10 @@ func (vp *ClusterVectorPipeline) SetConfigCheck(value bool) { vp.Status.ConfigCheckResult = &value } +func (vp *ClusterVectorPipeline) GetConfigCheckResult() *bool { + return vp.Status.ConfigCheckResult +} + func (vp *ClusterVectorPipeline) SetReason(reason *string) { vp.Status.Reason = reason } diff --git a/api/v1alpha1/clustervectorpipeline_types.go b/api/v1alpha1/clustervectorpipeline_types.go index 41fe3883..898aaf52 100644 --- a/api/v1alpha1/clustervectorpipeline_types.go +++ b/api/v1alpha1/clustervectorpipeline_types.go @@ -25,7 +25,7 @@ import ( //+kubebuilder:object:root=true //+kubebuilder:subresource:status -//+kubebuilder:resource:shortName=cvp +//+kubebuilder:resource:scope=Cluster,shortName=cvp //+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" //+kubebuilder:printcolumn:name="Valid",type="boolean",JSONPath=".status.configCheckResult" diff --git a/api/v1alpha1/vectorpipeline.go b/api/v1alpha1/vectorpipeline.go index c99d3a5c..ebcde2f4 100644 --- a/api/v1alpha1/vectorpipeline.go +++ b/api/v1alpha1/vectorpipeline.go @@ -42,6 +42,10 @@ func (vp *VectorPipeline) SetConfigCheck(value bool) { vp.Status.ConfigCheckResult = &value } +func (vp *VectorPipeline) GetConfigCheckResult() *bool { + return vp.Status.ConfigCheckResult +} + func (vp *VectorPipeline) SetReason(reason *string) { vp.Status.Reason = reason } diff --git a/config/crd/bases/observability.kaasops.io_clustervectorpipelines.yaml b/config/crd/bases/observability.kaasops.io_clustervectorpipelines.yaml index 715b8145..760cca22 100644 --- a/config/crd/bases/observability.kaasops.io_clustervectorpipelines.yaml +++ b/config/crd/bases/observability.kaasops.io_clustervectorpipelines.yaml @@ -15,7 +15,7 @@ spec: shortNames: - cvp singular: clustervectorpipeline - scope: Namespaced + scope: Cluster versions: - additionalPrinterColumns: - jsonPath: .metadata.creationTimestamp diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 276b6e63..187fbbb9 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -5,58 +5,6 @@ metadata: creationTimestamp: null name: manager-role rules: -- apiGroups: - - observability.kaasops.io - resources: - - clustervectorpipelines - verbs: - - create - - delete - - get - - list - - patch - - update - - watch -- apiGroups: - - observability.kaasops.io - resources: - - clustervectorpipelines/finalizers - verbs: - - update -- apiGroups: - - observability.kaasops.io - resources: - - clustervectorpipelines/status - verbs: - - get - - patch - - update -- apiGroups: - - observability.kaasops.io - resources: - - vectorpipelines - verbs: - - create - - delete - - get - - list - - patch - - update - - watch -- apiGroups: - - observability.kaasops.io - resources: - - vectorpipelines/finalizers - verbs: - - update -- apiGroups: - - observability.kaasops.io - resources: - - vectorpipelines/status - verbs: - - get - - patch - - update - apiGroups: - observability.kaasops.io resources: diff --git a/config/samples/observability_v1alpha1_clustervectorpipeline.yaml b/config/samples/observability_v1alpha1_clustervectorpipeline.yaml index 4a453702..98b57654 100644 --- a/config/samples/observability_v1alpha1_clustervectorpipeline.yaml +++ b/config/samples/observability_v1alpha1_clustervectorpipeline.yaml @@ -3,4 +3,14 @@ kind: ClusterVectorPipeline metadata: name: clustervectorpipeline-sample spec: - # TODO(user): Add fields here + sources: + test1: + type: "kubernetes_logs" + extra_label_selector: "app!=testdeployment3" + sinks: + test2: + type: "console" + encoding: + codec: "json" + inputs: + - test1 \ No newline at end of file diff --git a/controllers/clustervectorpipeline_controller.go b/controllers/clustervectorpipeline_controller.go deleted file mode 100644 index 3ea09462..00000000 --- a/controllers/clustervectorpipeline_controller.go +++ /dev/null @@ -1,143 +0,0 @@ -/* -Copyright 2022. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "context" - - api_errors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/log" - - vectorv1alpha1 "github.com/kaasops/vector-operator/api/v1alpha1" - "github.com/kaasops/vector-operator/controllers/factory/config" - "github.com/kaasops/vector-operator/controllers/factory/pipeline" - "github.com/kaasops/vector-operator/controllers/factory/vector/vectoragent" -) - -// ClusterVectorPipelineReconciler reconciles a ClusterVectorPipeline object -type ClusterVectorPipelineReconciler struct { - client.Client - Scheme *runtime.Scheme - - // Temp. Wait this issue - https://github.com/kubernetes-sigs/controller-runtime/issues/452 - Clientset *kubernetes.Clientset -} - -//+kubebuilder:rbac:groups=observability.kaasops.io,resources=clustervectorpipelines,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=observability.kaasops.io,resources=clustervectorpipelines/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=observability.kaasops.io,resources=clustervectorpipelines/finalizers,verbs=update - -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the ClusterVectorPipeline object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.2/pkg/reconcile -func (r *ClusterVectorPipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := log.FromContext(ctx).WithValues("ClusterVectorPipeline", req.Name) - - log.Info("start Reconcile VectorPipeline") - - // Get CR VectorPipeline - vectorPipelineCR, err := r.findClusterVectorPipelineCustomResourceInstance(ctx, req) - if err != nil { - log.Error(err, "Failed to get Vector Pipeline") - return ctrl.Result{}, err - } - - vectorInstances, err := listVectorCustomResourceInstances(ctx, r.Client) - - if err != nil { - log.Error(err, "Failed to get Vector Instances") - return ctrl.Result{}, nil - } - - if len(vectorInstances) == 0 { - log.Info("Vertors not found") - return ctrl.Result{}, nil - } - - if vectorPipelineCR == nil || vectorPipelineCR.DeletionTimestamp != nil { - log.Info("ClusterVectorPIpeline CR not found. Ignoring since object must be deleted") - for _, vector := range vectorInstances { - VectorAgentReconciliationSourceChannel <- event.GenericEvent{Object: vector} - return ctrl.Result{}, nil - } - } - - // Check Pipeline hash - checkResult, err := pipeline.CheckHash(vectorPipelineCR) - if err != nil { - return ctrl.Result{}, err - } - if checkResult { - log.Info("ClusterVectorPipeline has no changes. Finish Reconcile VectorPipeline") - return ctrl.Result{}, nil - } - - for _, vector := range vectorInstances { - if vector.DeletionTimestamp != nil { - continue - } - - // Init Controller for Vector Agent - vaCtrl := vectoragent.NewController(vector, r.Client, r.Clientset) - if vaCtrl.Vector.Spec.Agent.DataDir == "" { - vaCtrl.Vector.Spec.Agent.DataDir = "/vector-data-dir" - } - - if err := config.ReconcileConfig(ctx, r.Client, vectorPipelineCR, vaCtrl); err != nil { - return ctrl.Result{}, err - } - - // Start vector reconcilation - if *vectorPipelineCR.Status.ConfigCheckResult { - VectorAgentReconciliationSourceChannel <- event.GenericEvent{Object: vector} - } - } - - log.Info("finish Reconcile ClusterVectorPipeline") - return ctrl.Result{}, nil -} - -func (r *ClusterVectorPipelineReconciler) findClusterVectorPipelineCustomResourceInstance(ctx context.Context, req ctrl.Request) (*vectorv1alpha1.ClusterVectorPipeline, error) { - // fetch the master instance - cvp := &vectorv1alpha1.ClusterVectorPipeline{} - err := r.Get(ctx, req.NamespacedName, cvp) - if err != nil { - if api_errors.IsNotFound(err) { - return nil, nil - } - return nil, err - } - return cvp, nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *ClusterVectorPipelineReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&vectorv1alpha1.ClusterVectorPipeline{}). - Complete(r) -} diff --git a/controllers/factory/config/utils.go b/controllers/factory/config/utils.go index fc37bf91..4d8ab0f4 100644 --- a/controllers/factory/config/utils.go +++ b/controllers/factory/config/utils.go @@ -24,5 +24,8 @@ func addPrefix(Namespace, Name, componentName string) string { } func generateName(Namespace, Name string) string { - return Namespace + "-" + Name + if Namespace != "" { + return Namespace + "-" + Name + } + return Name } diff --git a/controllers/factory/pipeline/pipeline.go b/controllers/factory/pipeline/pipeline.go index 699dc894..59c7297d 100644 --- a/controllers/factory/pipeline/pipeline.go +++ b/controllers/factory/pipeline/pipeline.go @@ -32,6 +32,7 @@ type Pipeline interface { SetReason(*string) GetLastAppliedPipeline() *uint32 SetLastAppliedPipeline(*uint32) + GetConfigCheckResult() *bool IsValid() bool IsDeleted() bool UpdateStatus(context.Context, client.Client) error diff --git a/controllers/vectorpipeline_controller.go b/controllers/pipeline_controller.go similarity index 51% rename from controllers/vectorpipeline_controller.go rename to controllers/pipeline_controller.go index db96f682..b02b26a1 100644 --- a/controllers/vectorpipeline_controller.go +++ b/controllers/pipeline_controller.go @@ -25,7 +25,9 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" vectorv1alpha1 "github.com/kaasops/vector-operator/api/v1alpha1" "github.com/kaasops/vector-operator/controllers/factory/config" @@ -33,8 +35,7 @@ import ( "github.com/kaasops/vector-operator/controllers/factory/vector/vectoragent" ) -// VectorPipelineReconciler reconciles a VectorPipeline object -type VectorPipelineReconciler struct { +type PipelineReconciler struct { client.Client Scheme *runtime.Scheme @@ -42,35 +43,19 @@ type VectorPipelineReconciler struct { Clientset *kubernetes.Clientset } -//+kubebuilder:rbac:groups=observability.kaasops.io,resources=vectorpipelines,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=observability.kaasops.io,resources=vectorpipelines/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=observability.kaasops.io,resources=vectorpipelines/finalizers,verbs=update - -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the VectorPipeline object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.2/pkg/reconcile -func (r *VectorPipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := log.FromContext(ctx).WithValues("VectorPipeline", req.Name) - - log.Info("start Reconcile VectorPipeline") - - // Get CR VectorPipeline - vectorPipelineCR, err := r.findVectorPipelineCustomResourceInstance(ctx, req) +func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := log.FromContext(ctx).WithValues("Pipeline", req.Name) + + log.Info("start Reconcile Pipeline") + pipelineCR, err := r.findPipelineCustomResourceInstance(ctx, req) if err != nil { - log.Error(err, "Failed to get Vector Pipeline") + log.Error(err, "Failed to get Pipeline") return ctrl.Result{}, err } - vectorInstances, err := listVectorCustomResourceInstances(ctx, r.Client) if err != nil { - log.Error(err, "Failed to get Vector Instances") + log.Error(err, "Failed to get Instances") return ctrl.Result{}, nil } @@ -79,9 +64,8 @@ func (r *VectorPipelineReconciler) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, nil } - if vectorPipelineCR == nil || vectorPipelineCR.DeletionTimestamp != nil { - log.Info("VectorPIpeline CR not found. Ignoring since object must be deleted") - // Start vector reconcilation + if pipelineCR == nil { + log.Info("Pipeline CR not found. Ignoring since object must be deleted") for _, vector := range vectorInstances { VectorAgentReconciliationSourceChannel <- event.GenericEvent{Object: vector} return ctrl.Result{}, nil @@ -89,12 +73,12 @@ func (r *VectorPipelineReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // Check Pipeline hash - checkResult, err := pipeline.CheckHash(vectorPipelineCR) + checkResult, err := pipeline.CheckHash(pipelineCR) if err != nil { return ctrl.Result{}, err } if checkResult { - log.Info("VectorPipeline has no changes. Finish Reconcile VectorPipeline") + log.Info("Pipeline has no changes. Finish Reconcile Pipeline") return ctrl.Result{}, nil } @@ -109,35 +93,48 @@ func (r *VectorPipelineReconciler) Reconcile(ctx context.Context, req ctrl.Reque vaCtrl.Vector.Spec.Agent.DataDir = "/vector-data-dir" } - if err := config.ReconcileConfig(ctx, r.Client, vectorPipelineCR, vaCtrl); err != nil { + if err := config.ReconcileConfig(ctx, r.Client, pipelineCR, vaCtrl); err != nil { return ctrl.Result{}, err } + // Start vector reconcilation - if *vectorPipelineCR.Status.ConfigCheckResult { + if *pipelineCR.GetConfigCheckResult() { VectorAgentReconciliationSourceChannel <- event.GenericEvent{Object: vector} } } - log.Info("finish Reconcile VectorPipeline") + log.Info("finish Reconcile Pipeline") return ctrl.Result{}, nil } -func (r *VectorPipelineReconciler) findVectorPipelineCustomResourceInstance(ctx context.Context, req ctrl.Request) (*vectorv1alpha1.VectorPipeline, error) { - // fetch the master instance - vectorPipelineCR := &vectorv1alpha1.VectorPipeline{} - err := r.Get(ctx, req.NamespacedName, vectorPipelineCR) - if err != nil { - if api_errors.IsNotFound(err) { - return nil, nil +func (r *PipelineReconciler) findPipelineCustomResourceInstance(ctx context.Context, req ctrl.Request) (pipeline pipeline.Pipeline, err error) { + if req.Namespace != "" { + vp := &vectorv1alpha1.VectorPipeline{} + err := r.Get(ctx, req.NamespacedName, vp) + if err != nil { + if api_errors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + return vp, nil + } else { + cvp := &vectorv1alpha1.ClusterVectorPipeline{} + err := r.Get(ctx, req.NamespacedName, cvp) + if err != nil { + if api_errors.IsNotFound(err) { + return nil, nil + } + return nil, err } - return nil, err + return cvp, nil } - return vectorPipelineCR, nil } // SetupWithManager sets up the controller with the Manager. -func (r *VectorPipelineReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&vectorv1alpha1.VectorPipeline{}). + Watches(&source.Kind{Type: &vectorv1alpha1.ClusterVectorPipeline{}}, &handler.EnqueueRequestForObject{}). Complete(r) } diff --git a/main.go b/main.go index be7ddce3..aa6255ff 100644 --- a/main.go +++ b/main.go @@ -113,7 +113,7 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Vector") os.Exit(1) } - if err = (&controllers.VectorPipelineReconciler{ + if err = (&controllers.PipelineReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Clientset: clientset, @@ -121,14 +121,6 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "VectorPipeline") os.Exit(1) } - if err = (&controllers.ClusterVectorPipelineReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Clientset: clientset, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "ClusterVectorPipeline") - os.Exit(1) - } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { From 2fa76e96d48a9a93eddfa3a996e82c95f7427e00 Mon Sep 17 00:00:00 2001 From: Denis Khachyan Date: Thu, 1 Dec 2022 15:58:41 +0200 Subject: [PATCH 2/2] update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35a61b2d..5a142700 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ### Added +- [[65]](https://github.com/kaasops/vector-operator/pull/65) **Refactor**: merge vp and cvp reconcile funcs +- [[64]](https://github.com/kaasops/vector-operator/pull/64) **Fix** Do not reconсile vector if vp check fail +- [[63]](https://github.com/kaasops/vector-operator/pull/63) **Fix** Fix configcheck gc ### v0.0.7 - [[61]](https://github.com/kaasops/vector-operator/pull/61) **Feature** Filter cache and disable time reconcile