From d6e3cf7be9551bc5c3ea372f9d1d4e4f2f2d7c56 Mon Sep 17 00:00:00 2001 From: Howard Lau Date: Wed, 28 Oct 2020 06:57:38 +0000 Subject: [PATCH] use accessor to merge config and extract magic numbers Signed-off-by: Howard Lau --- api/v1/component_accessor.go | 216 +++++++++++++++++++ api/v1/seaweed_types.go | 87 ++++++-- api/v1/zz_generated.deepcopy.go | 93 ++++++++ controllers/controller_filer.go | 7 +- controllers/controller_filer_service.go | 48 +++-- controllers/controller_filer_statefulset.go | 167 +++++++------- controllers/controller_master.go | 4 - controllers/controller_master_service.go | 8 +- controllers/controller_master_statefulset.go | 135 +++++------- controllers/controller_volume_service.go | 8 +- controllers/controller_volume_statefulset.go | 141 +++++------- controllers/helper.go | 41 ++++ 12 files changed, 664 insertions(+), 291 deletions(-) create mode 100644 api/v1/component_accessor.go diff --git a/api/v1/component_accessor.go b/api/v1/component_accessor.go new file mode 100644 index 0000000..45910a0 --- /dev/null +++ b/api/v1/component_accessor.go @@ -0,0 +1,216 @@ +package v1 + +import ( + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +// ComponentAccessor is the interface to access component details, which respects the cluster-level properties +// and component-level overrides +// +kubebuilder:object:root=false +// +kubebuilder:object:generate=false +type ComponentAccessor interface { + ImagePullPolicy() corev1.PullPolicy + ImagePullSecrets() []corev1.LocalObjectReference + HostNetwork() bool + Affinity() *corev1.Affinity + PriorityClassName() *string + NodeSelector() map[string]string + Annotations() map[string]string + Tolerations() []corev1.Toleration + SchedulerName() string + DNSPolicy() corev1.DNSPolicy + BuildPodSpec() corev1.PodSpec + Env() []corev1.EnvVar + AdditionalContainers() []corev1.Container + AdditionalVolumes() []corev1.Volume + TerminationGracePeriodSeconds() *int64 + StatefulSetUpdateStrategy() appsv1.StatefulSetUpdateStrategyType +} + +type componentAccessorImpl struct { + imagePullPolicy corev1.PullPolicy + imagePullSecrets []corev1.LocalObjectReference + hostNetwork *bool + affinity *corev1.Affinity + priorityClassName *string + schedulerName string + clusterNodeSelector map[string]string + clusterAnnotations map[string]string + tolerations []corev1.Toleration + statefulSetUpdateStrategy appsv1.StatefulSetUpdateStrategyType + + // ComponentSpec is the Component Spec + ComponentSpec *ComponentSpec +} + +func (a *componentAccessorImpl) StatefulSetUpdateStrategy() appsv1.StatefulSetUpdateStrategyType { + strategy := a.ComponentSpec.StatefulSetUpdateStrategy + if len(strategy) != 0 { + return strategy + } + + strategy = a.statefulSetUpdateStrategy + if len(strategy) != 0 { + return strategy + } + + return appsv1.RollingUpdateStatefulSetStrategyType +} + +func (a *componentAccessorImpl) ImagePullPolicy() corev1.PullPolicy { + pp := a.ComponentSpec.ImagePullPolicy + if pp == nil { + return a.imagePullPolicy + } + return *pp +} + +func (a *componentAccessorImpl) ImagePullSecrets() []corev1.LocalObjectReference { + ips := a.ComponentSpec.ImagePullSecrets + if ips == nil { + return a.imagePullSecrets + } + return ips +} + +func (a *componentAccessorImpl) HostNetwork() bool { + hostNetwork := a.ComponentSpec.HostNetwork + if hostNetwork == nil { + hostNetwork = a.hostNetwork + } + if hostNetwork == nil { + return false + } + return *hostNetwork +} + +func (a *componentAccessorImpl) Affinity() *corev1.Affinity { + affi := a.ComponentSpec.Affinity + if affi == nil { + affi = a.affinity + } + return affi +} + +func (a *componentAccessorImpl) PriorityClassName() *string { + pcn := a.ComponentSpec.PriorityClassName + if pcn == nil { + pcn = a.priorityClassName + } + return pcn +} + +func (a *componentAccessorImpl) SchedulerName() string { + pcn := a.ComponentSpec.SchedulerName + if pcn == nil { + pcn = &a.schedulerName + } + return *pcn +} + +func (a *componentAccessorImpl) NodeSelector() map[string]string { + sel := map[string]string{} + for k, v := range a.clusterNodeSelector { + sel[k] = v + } + for k, v := range a.ComponentSpec.NodeSelector { + sel[k] = v + } + return sel +} + +func (a *componentAccessorImpl) Annotations() map[string]string { + anno := map[string]string{} + for k, v := range a.clusterAnnotations { + anno[k] = v + } + for k, v := range a.ComponentSpec.Annotations { + anno[k] = v + } + return anno +} + +func (a *componentAccessorImpl) Tolerations() []corev1.Toleration { + tols := a.ComponentSpec.Tolerations + if len(tols) == 0 { + tols = a.tolerations + } + return tols +} + +func (a *componentAccessorImpl) DNSPolicy() corev1.DNSPolicy { + dnsPolicy := corev1.DNSClusterFirst // same as kubernetes default + if a.HostNetwork() { + dnsPolicy = corev1.DNSClusterFirstWithHostNet + } + return dnsPolicy +} + +func (a *componentAccessorImpl) BuildPodSpec() corev1.PodSpec { + spec := corev1.PodSpec{ + SchedulerName: a.SchedulerName(), + Affinity: a.Affinity(), + NodeSelector: a.NodeSelector(), + HostNetwork: a.HostNetwork(), + RestartPolicy: corev1.RestartPolicyAlways, + Tolerations: a.Tolerations(), + } + if a.PriorityClassName() != nil { + spec.PriorityClassName = *a.PriorityClassName() + } + if a.ImagePullSecrets() != nil { + spec.ImagePullSecrets = a.ImagePullSecrets() + } + if a.TerminationGracePeriodSeconds() != nil { + spec.TerminationGracePeriodSeconds = a.TerminationGracePeriodSeconds() + } + return spec +} + +func (a *componentAccessorImpl) Env() []corev1.EnvVar { + return a.ComponentSpec.Env +} + +func (a *componentAccessorImpl) AdditionalContainers() []corev1.Container { + return a.ComponentSpec.AdditionalContainers +} + +func (a *componentAccessorImpl) AdditionalVolumes() []corev1.Volume { + return a.ComponentSpec.AdditionalVolumes +} + +func (a *componentAccessorImpl) TerminationGracePeriodSeconds() *int64 { + return a.ComponentSpec.TerminationGracePeriodSeconds +} + +func buildSeaweedComponentAccessor(spec *SeaweedSpec, componentSpec *ComponentSpec) ComponentAccessor { + return &componentAccessorImpl{ + imagePullPolicy: spec.ImagePullPolicy, + imagePullSecrets: spec.ImagePullSecrets, + hostNetwork: spec.HostNetwork, + affinity: spec.Affinity, + schedulerName: spec.SchedulerName, + clusterNodeSelector: spec.NodeSelector, + clusterAnnotations: spec.Annotations, + tolerations: spec.Tolerations, + statefulSetUpdateStrategy: spec.StatefulSetUpdateStrategy, + + ComponentSpec: componentSpec, + } +} + +// BaseMasterSpec provides merged spec of masters +func (s *Seaweed) BaseMasterSpec() ComponentAccessor { + return buildSeaweedComponentAccessor(&s.Spec, &s.Spec.Master.ComponentSpec) +} + +// BaseFilerSpec provides merged spec of filers +func (s *Seaweed) BaseFilerSpec() ComponentAccessor { + return buildSeaweedComponentAccessor(&s.Spec, &s.Spec.Filer.ComponentSpec) +} + +// BaseVolumeSpec provides merged spec of volumes +func (s *Seaweed) BaseVolumeSpec() ComponentAccessor { + return buildSeaweedComponentAccessor(&s.Spec, &s.Spec.Volume.ComponentSpec) +} diff --git a/api/v1/seaweed_types.go b/api/v1/seaweed_types.go index 3966665..3a6fa36 100644 --- a/api/v1/seaweed_types.go +++ b/api/v1/seaweed_types.go @@ -25,6 +25,20 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +// Constants +const ( + GRPCPortDelta = 10000 + + MasterHTTPPort = 9333 + VolumeHTTPPort = 8444 + FilerHTTPPort = 8888 + FilerS3Port = 8333 + + MasterGRPCPort = MasterHTTPPort + GRPCPortDelta + VolumeGRPCPort = VolumeHTTPPort + GRPCPortDelta + FilerGRPCPort = FilerHTTPPort + GRPCPortDelta +) + // SeaweedSpec defines the desired state of Seaweed type SeaweedSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster @@ -41,14 +55,51 @@ type SeaweedSpec struct { // Master Master *MasterSpec `json:"master,omitempty"` + // Volume Volume *VolumeSpec `json:"volume,omitempty"` + // Filer Filer *FilerSpec `json:"filer,omitempty"` + // SchedulerName of pods + SchedulerName string `json:"schedulerName,omitempty"` + + // Persistent volume reclaim policy + PVReclaimPolicy *corev1.PersistentVolumeReclaimPolicy `json:"pvReclaimPolicy,omitempty"` + + // ImagePullPolicy of pods + ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` + + // ImagePullSecrets is an optional list of references to secrets in the same namespace to use for pulling any of the images. + ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + + // Whether enable PVC reclaim for orphan PVC left by statefulset scale-in + EnablePVReclaim *bool `json:"enablePVReclaim,omitempty"` + + // Whether Hostnetwork is enabled for pods + HostNetwork *bool `json:"hostNetwork,omitempty"` + + // Affinity of pods + Affinity *corev1.Affinity `json:"affinity,omitempty"` + + // Base node selectors of Pods, components may add or override selectors upon this respectively + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + + // Base annotations of Pods, components may add or override selectors upon this respectively + Annotations map[string]string `json:"annotations,omitempty"` + + // Base tolerations of Pods, components may add more tolerations upon this respectively + Tolerations []corev1.Toleration `json:"tolerations,omitempty"` + + // StatefulSetUpdateStrategy indicates the StatefulSetUpdateStrategy that will be + // employed to update Pods in the StatefulSet when a revision is made to + // Template. + StatefulSetUpdateStrategy appsv1.StatefulSetUpdateStrategyType `json:"statefulSetUpdateStrategy,omitempty"` + VolumeServerDiskCount int32 `json:"volumeServerDiskCount,omitempty"` - // ingress + // Ingresses Hosts []string `json:"hosts"` } @@ -65,7 +116,8 @@ type MasterSpec struct { // The desired ready replicas // +kubebuilder:validation:Minimum=1 - Replicas int32 `json:"replicas"` + Replicas int32 `json:"replicas"` + Service *ServiceSpec `json:"service,omitempty"` } // VolumeSpec is the spec for volume servers @@ -75,7 +127,8 @@ type VolumeSpec struct { // The desired ready replicas // +kubebuilder:validation:Minimum=1 - Replicas int32 `json:"replicas"` + Replicas int32 `json:"replicas"` + Service *ServiceSpec `json:"service,omitempty"` } // FilerSpec is the spec for filers @@ -85,7 +138,8 @@ type FilerSpec struct { // The desired ready replicas // +kubebuilder:validation:Minimum=1 - Replicas int32 `json:"replicas"` + Replicas int32 `json:"replicas"` + Service *ServiceSpec `json:"service,omitempty"` } // ComponentSpec is the base spec of each component, the fields should always accessed by the BasicSpec() method to respect the cluster-level properties @@ -122,16 +176,9 @@ type ComponentSpec struct { // List of environment variables to set in the container, like // v1.Container.Env. - // Note that following env names cannot be used and may be overrided by - // tidb-operator built envs. + // Note that following env names cannot be used and may be overrided by operators // - NAMESPACE - // - TZ - // - SERVICE_NAME - // - PEER_SERVICE_NAME - // - HEADLESS_SERVICE_NAME - // - SET_NAME - // - HOSTNAME - // - CLUSTER_NAME + // - POD_IP // - POD_NAME Env []corev1.EnvVar `json:"env,omitempty"` @@ -157,6 +204,20 @@ type ComponentSpec struct { StatefulSetUpdateStrategy appsv1.StatefulSetUpdateStrategyType `json:"statefulSetUpdateStrategy,omitempty"` } +type ServiceSpec struct { + // Type of the real kubernetes service + Type corev1.ServiceType `json:"type,omitempty"` + + // Additional annotations of the kubernetes service object + Annotations map[string]string `json:"annotations,omitempty"` + + // LoadBalancerIP is the loadBalancerIP of service + LoadBalancerIP *string `json:"loadBalancerIP,omitempty"` + + // ClusterIP is the clusterIP of service + ClusterIP *string `json:"clusterIP,omitempty"` +} + // +kubebuilder:object:root=true // +kubebuilder:subresource:status diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 892a1b8..692e03a 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -127,6 +127,11 @@ func (in *FilerSpec) DeepCopyInto(out *FilerSpec) { *out = *in in.ComponentSpec.DeepCopyInto(&out.ComponentSpec) in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements) + if in.Service != nil { + in, out := &in.Service, &out.Service + *out = new(ServiceSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FilerSpec. @@ -144,6 +149,11 @@ func (in *MasterSpec) DeepCopyInto(out *MasterSpec) { *out = *in in.ComponentSpec.DeepCopyInto(&out.ComponentSpec) in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements) + if in.Service != nil { + in, out := &in.Service, &out.Service + *out = new(ServiceSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterSpec. @@ -233,6 +243,52 @@ func (in *SeaweedSpec) DeepCopyInto(out *SeaweedSpec) { *out = new(FilerSpec) (*in).DeepCopyInto(*out) } + if in.PVReclaimPolicy != nil { + in, out := &in.PVReclaimPolicy, &out.PVReclaimPolicy + *out = new(corev1.PersistentVolumeReclaimPolicy) + **out = **in + } + if in.ImagePullSecrets != nil { + in, out := &in.ImagePullSecrets, &out.ImagePullSecrets + *out = make([]corev1.LocalObjectReference, len(*in)) + copy(*out, *in) + } + if in.EnablePVReclaim != nil { + in, out := &in.EnablePVReclaim, &out.EnablePVReclaim + *out = new(bool) + **out = **in + } + if in.HostNetwork != nil { + in, out := &in.HostNetwork, &out.HostNetwork + *out = new(bool) + **out = **in + } + if in.Affinity != nil { + in, out := &in.Affinity, &out.Affinity + *out = new(corev1.Affinity) + (*in).DeepCopyInto(*out) + } + if in.NodeSelector != nil { + in, out := &in.NodeSelector, &out.NodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Annotations != nil { + in, out := &in.Annotations, &out.Annotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]corev1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.Hosts != nil { in, out := &in.Hosts, &out.Hosts *out = make([]string, len(*in)) @@ -265,11 +321,48 @@ func (in *SeaweedStatus) DeepCopy() *SeaweedStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) { + *out = *in + if in.Annotations != nil { + in, out := &in.Annotations, &out.Annotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.LoadBalancerIP != nil { + in, out := &in.LoadBalancerIP, &out.LoadBalancerIP + *out = new(string) + **out = **in + } + if in.ClusterIP != nil { + in, out := &in.ClusterIP, &out.ClusterIP + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServiceSpec. +func (in *ServiceSpec) DeepCopy() *ServiceSpec { + if in == nil { + return nil + } + out := new(ServiceSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VolumeSpec) DeepCopyInto(out *VolumeSpec) { *out = *in in.ComponentSpec.DeepCopyInto(&out.ComponentSpec) in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements) + if in.Service != nil { + in, out := &in.Service, &out.Service + *out = new(ServiceSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeSpec. diff --git a/controllers/controller_filer.go b/controllers/controller_filer.go index a4b56f6..2d42f1a 100644 --- a/controllers/controller_filer.go +++ b/controllers/controller_filer.go @@ -2,6 +2,7 @@ package controllers import ( "context" + "k8s.io/apimachinery/pkg/runtime" appsv1 "k8s.io/api/apps/v1" @@ -18,7 +19,7 @@ func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (do return } - if done, result, err = r.ensureFilerNodePortService(seaweedCR); done { + if done, result, err = r.ensureFilerService(seaweedCR); done { return } @@ -57,11 +58,11 @@ func (r *SeaweedReconciler) ensureFilerHeadlessService(seaweedCR *seaweedv1.Seaw return ReconcileResult(err) } -func (r *SeaweedReconciler) ensureFilerNodePortService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { +func (r *SeaweedReconciler) ensureFilerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { log := r.Log.WithValues("sw-filer-service", seaweedCR.Name) - filerService := r.createFilerNodePortService(seaweedCR) + filerService := r.createFilerService(seaweedCR) _, err := r.CreateOrUpdateService(filerService) log.Info("ensure filer service " + filerService.Name) diff --git a/controllers/controller_filer_service.go b/controllers/controller_filer_service.go index 1a27bb2..6bef126 100644 --- a/controllers/controller_filer_service.go +++ b/controllers/controller_filer_service.go @@ -27,20 +27,20 @@ func (r *SeaweedReconciler) createFilerHeadlessService(m *seaweedv1.Seaweed) *co { Name: "swfs-filer", Protocol: corev1.Protocol("TCP"), - Port: 8888, - TargetPort: intstr.FromInt(8888), + Port: seaweedv1.FilerHTTPPort, + TargetPort: intstr.FromInt(seaweedv1.FilerHTTPPort), }, { Name: "swfs-filer-grpc", Protocol: corev1.Protocol("TCP"), - Port: 18888, - TargetPort: intstr.FromInt(18888), + Port: seaweedv1.FilerGRPCPort, + TargetPort: intstr.FromInt(seaweedv1.FilerGRPCPort), }, { Name: "swfs-s3", Protocol: corev1.Protocol("TCP"), - Port: 8333, - TargetPort: intstr.FromInt(8333), + Port: seaweedv1.FilerS3Port, + TargetPort: intstr.FromInt(seaweedv1.FilerS3Port), }, }, Selector: labels, @@ -49,7 +49,7 @@ func (r *SeaweedReconciler) createFilerHeadlessService(m *seaweedv1.Seaweed) *co return dep } -func (r *SeaweedReconciler) createFilerNodePortService(m *seaweedv1.Seaweed) *corev1.Service { +func (r *SeaweedReconciler) createFilerService(m *seaweedv1.Seaweed) *corev1.Service { labels := labelsForFiler(m.Name) dep := &corev1.Service{ @@ -62,33 +62,47 @@ func (r *SeaweedReconciler) createFilerNodePortService(m *seaweedv1.Seaweed) *co }, }, Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeNodePort, + Type: corev1.ServiceTypeClusterIP, PublishNotReadyAddresses: true, Ports: []corev1.ServicePort{ { Name: "swfs-filer", Protocol: corev1.Protocol("TCP"), - Port: 8888, - TargetPort: intstr.FromInt(8888), - NodePort: 30888, + Port: seaweedv1.FilerHTTPPort, + TargetPort: intstr.FromInt(seaweedv1.FilerHTTPPort), }, { Name: "swfs-filer-grpc", Protocol: corev1.Protocol("TCP"), - Port: 18888, - TargetPort: intstr.FromInt(18888), - NodePort: 31888, + Port: seaweedv1.FilerGRPCPort, + TargetPort: intstr.FromInt(seaweedv1.FilerGRPCPort), }, { Name: "swfs-s3", Protocol: corev1.Protocol("TCP"), - Port: 8333, - TargetPort: intstr.FromInt(8333), - NodePort: 30833, + Port: seaweedv1.FilerS3Port, + TargetPort: intstr.FromInt(seaweedv1.FilerS3Port), }, }, Selector: labels, }, } + + if m.Spec.Filer.Service != nil { + svcSpec := m.Spec.Filer.Service + dep.Annotations = copyAnnotations(svcSpec.Annotations) + + if svcSpec.Type != "" { + dep.Spec.Type = svcSpec.Type + } + + if svcSpec.ClusterIP != nil { + dep.Spec.ClusterIP = *svcSpec.ClusterIP + } + + if svcSpec.LoadBalancerIP != nil { + dep.Spec.LoadBalancerIP = *svcSpec.LoadBalancerIP + } + } return dep } diff --git a/controllers/controller_filer_statefulset.go b/controllers/controller_filer_statefulset.go index fdefa53..e0cc7c7 100644 --- a/controllers/controller_filer_statefulset.go +++ b/controllers/controller_filer_statefulset.go @@ -17,6 +17,89 @@ func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1 rollingUpdatePartition := int32(0) enableServiceLinks := false + filerPodSpec := m.BaseFilerSpec().BuildPodSpec() + filerPodSpec.EnableServiceLinks = &enableServiceLinks + filerPodSpec.Containers = []corev1.Container{{ + Name: "seaweedfs", + Image: m.Spec.Image, + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + { + Name: "POD_IP", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + }, + Command: []string{ + "/bin/sh", + "-ec", + fmt.Sprintf("weed filer -port=8888 %s %s -s3", + fmt.Sprintf("-ip=$(POD_NAME).%s-filer", m.Name), + fmt.Sprintf("-master=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), + ), + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: seaweedv1.FilerHTTPPort, + Name: "swfs-filer", + }, + { + ContainerPort: seaweedv1.FilerGRPCPort, + }, + { + ContainerPort: seaweedv1.FilerS3Port, + Name: "swfs-s3", + }, + }, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt(seaweedv1.FilerHTTPPort), + Scheme: corev1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 10, + TimeoutSeconds: 3, + PeriodSeconds: 15, + SuccessThreshold: 1, + FailureThreshold: 100, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt(seaweedv1.FilerHTTPPort), + Scheme: corev1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 20, + TimeoutSeconds: 3, + PeriodSeconds: 30, + SuccessThreshold: 1, + FailureThreshold: 6, + }, + }} + dep := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: m.Name + "-filer", @@ -39,89 +122,7 @@ func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1 ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, - Spec: corev1.PodSpec{ - EnableServiceLinks: &enableServiceLinks, - Containers: []corev1.Container{{ - Name: "seaweedfs", - Image: m.Spec.Image, - ImagePullPolicy: corev1.PullIfNotPresent, - Env: []corev1.EnvVar{ - { - Name: "POD_IP", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "status.podIP", - }, - }, - }, - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - }, - Command: []string{ - "/bin/sh", - "-ec", - fmt.Sprintf("weed filer -port=8888 %s %s -s3", - fmt.Sprintf("-ip=$(POD_NAME).%s-filer", m.Name), - fmt.Sprintf("-master=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), - ), - }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8888, - Name: "swfs-filer", - }, - { - ContainerPort: 18888, - }, - { - ContainerPort: 8333, - Name: "swfs-s3", - }, - }, - ReadinessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/", - Port: intstr.FromInt(8888), - Scheme: corev1.URISchemeHTTP, - }, - }, - InitialDelaySeconds: 10, - TimeoutSeconds: 3, - PeriodSeconds: 15, - SuccessThreshold: 1, - FailureThreshold: 100, - }, - LivenessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/", - Port: intstr.FromInt(8888), - Scheme: corev1.URISchemeHTTP, - }, - }, - InitialDelaySeconds: 20, - TimeoutSeconds: 3, - PeriodSeconds: 30, - SuccessThreshold: 1, - FailureThreshold: 6, - }, - }}, - }, + Spec: filerPodSpec, }, }, } diff --git a/controllers/controller_master.go b/controllers/controller_master.go index 8452737..4b4f7a5 100644 --- a/controllers/controller_master.go +++ b/controllers/controller_master.go @@ -12,10 +12,6 @@ import ( seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" ) -const ( - MasterClusterSize = 3 -) - func (r *SeaweedReconciler) ensureMaster(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) { _ = context.Background() _ = r.Log.WithValues("seaweed", seaweedCR.Name) diff --git a/controllers/controller_master_service.go b/controllers/controller_master_service.go index 6cf1561..fb4cabb 100644 --- a/controllers/controller_master_service.go +++ b/controllers/controller_master_service.go @@ -27,14 +27,14 @@ func (r *SeaweedReconciler) createMasterService(m *seaweedv1.Seaweed) *corev1.Se { Name: "swfs-master", Protocol: corev1.Protocol("TCP"), - Port: 9333, - TargetPort: intstr.FromInt(9333), + Port: seaweedv1.MasterHTTPPort, + TargetPort: intstr.FromInt(seaweedv1.MasterHTTPPort), }, { Name: "swfs-master-grpc", Protocol: corev1.Protocol("TCP"), - Port: 19333, - TargetPort: intstr.FromInt(19333), + Port: seaweedv1.MasterGRPCPort, + TargetPort: intstr.FromInt(seaweedv1.MasterGRPCPort), }, }, Selector: labels, diff --git a/controllers/controller_master_statefulset.go b/controllers/controller_master_statefulset.go index c155d36..80ff4cb 100644 --- a/controllers/controller_master_statefulset.go +++ b/controllers/controller_master_statefulset.go @@ -17,6 +17,60 @@ func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv rollingUpdatePartition := int32(0) enableServiceLinks := false + masterPodSpec := m.BaseMasterSpec().BuildPodSpec() + masterPodSpec.EnableServiceLinks = &enableServiceLinks + masterPodSpec.Containers = []corev1.Container{{ + Name: "seaweedfs", + Image: m.Spec.Image, + ImagePullPolicy: corev1.PullIfNotPresent, + Env: append(m.BaseMasterSpec().Env(), kubernetesEnvVars...), + Command: []string{ + "/bin/sh", + "-ec", + fmt.Sprintf("sleep 60; weed master -volumePreallocate -volumeSizeLimitMB=1000 %s %s", + fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name), + fmt.Sprintf("-peers=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), + ), + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: seaweedv1.MasterHTTPPort, + Name: "swfs-master", + }, + { + ContainerPort: seaweedv1.MasterGRPCPort, + }, + }, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/cluster/status", + Port: intstr.FromInt(seaweedv1.MasterHTTPPort), + Scheme: corev1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 5, + TimeoutSeconds: 15, + PeriodSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 100, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/cluster/status", + Port: intstr.FromInt(seaweedv1.MasterHTTPPort), + Scheme: corev1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 15, + TimeoutSeconds: 15, + PeriodSeconds: 15, + SuccessThreshold: 1, + FailureThreshold: 6, + }, + }} + dep := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: m.Name + "-master", @@ -39,86 +93,7 @@ func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, - Spec: corev1.PodSpec{ - Affinity: m.Spec.Master.Affinity, - EnableServiceLinks: &enableServiceLinks, - Containers: []corev1.Container{{ - Name: "seaweedfs", - Image: m.Spec.Image, - ImagePullPolicy: corev1.PullIfNotPresent, - Env: []corev1.EnvVar{ - { - Name: "POD_IP", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "status.podIP", - }, - }, - }, - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - }, - Command: []string{ - "/bin/sh", - "-ec", - fmt.Sprintf("sleep 60; weed master -volumePreallocate -volumeSizeLimitMB=1000 %s %s", - fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name), - fmt.Sprintf("-peers=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), - ), - }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 9333, - Name: "swfs-master", - }, - { - ContainerPort: 19333, - }, - }, - ReadinessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/cluster/status", - Port: intstr.FromInt(9333), - Scheme: corev1.URISchemeHTTP, - }, - }, - InitialDelaySeconds: 5, - TimeoutSeconds: 15, - PeriodSeconds: 15, - SuccessThreshold: 2, - FailureThreshold: 100, - }, - LivenessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/cluster/status", - Port: intstr.FromInt(9333), - Scheme: corev1.URISchemeHTTP, - }, - }, - InitialDelaySeconds: 15, - TimeoutSeconds: 15, - PeriodSeconds: 15, - SuccessThreshold: 1, - FailureThreshold: 6, - }, - }}, - }, + Spec: masterPodSpec, }, }, } diff --git a/controllers/controller_volume_service.go b/controllers/controller_volume_service.go index c78ac6f..dcf14cd 100644 --- a/controllers/controller_volume_service.go +++ b/controllers/controller_volume_service.go @@ -27,14 +27,14 @@ func (r *SeaweedReconciler) createVolumeServerService(m *seaweedv1.Seaweed) *cor { Name: "swfs-volume", Protocol: corev1.Protocol("TCP"), - Port: 8444, - TargetPort: intstr.FromInt(8444), + Port: seaweedv1.VolumeHTTPPort, + TargetPort: intstr.FromInt(seaweedv1.VolumeHTTPPort), }, { Name: "swfs-volume-grpc", Protocol: corev1.Protocol("TCP"), - Port: 18444, - TargetPort: intstr.FromInt(18444), + Port: seaweedv1.VolumeGRPCPort, + TargetPort: intstr.FromInt(seaweedv1.VolumeGRPCPort), }, }, Selector: labels, diff --git a/controllers/controller_volume_statefulset.go b/controllers/controller_volume_statefulset.go index c6fa709..a75cf24 100644 --- a/controllers/controller_volume_statefulset.go +++ b/controllers/controller_volume_statefulset.go @@ -59,6 +59,63 @@ func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed) dirs = append(dirs, fmt.Sprintf("/data%d", i)) } + volumePodSpec := m.BaseVolumeSpec().BuildPodSpec() + volumePodSpec.EnableServiceLinks = &enableServiceLinks + volumePodSpec.Containers = []corev1.Container{{ + Name: "seaweedfs", + Image: m.Spec.Image, + ImagePullPolicy: m.BaseVolumeSpec().ImagePullPolicy(), + Env: append(m.BaseVolumeSpec().Env(), kubernetesEnvVars...), + Command: []string{ + "/bin/sh", + "-ec", + fmt.Sprintf("weed volume -port=8444 -max=0 %s %s %s", + fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name), + fmt.Sprintf("-dir=%s", strings.Join(dirs, ",")), + fmt.Sprintf("-mserver=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), + ), + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8444, + Name: "swfs-volume", + }, + { + ContainerPort: 18444, + }, + }, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/status", + Port: intstr.FromInt(8444), + Scheme: corev1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 15, + TimeoutSeconds: 5, + PeriodSeconds: 90, + SuccessThreshold: 1, + FailureThreshold: 100, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/status", + Port: intstr.FromInt(8444), + Scheme: corev1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 20, + TimeoutSeconds: 5, + PeriodSeconds: 90, + SuccessThreshold: 1, + FailureThreshold: 6, + }, + VolumeMounts: volumeMounts, + }} + volumePodSpec.Volumes = volumes + dep := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: m.Name + "-volume", @@ -81,89 +138,7 @@ func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed) ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, - Spec: corev1.PodSpec{ - EnableServiceLinks: &enableServiceLinks, - Containers: []corev1.Container{{ - Name: "seaweedfs", - Image: m.Spec.Image, - ImagePullPolicy: corev1.PullIfNotPresent, - Env: []corev1.EnvVar{ - { - Name: "POD_IP", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "status.podIP", - }, - }, - }, - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - }, - Command: []string{ - "/bin/sh", - "-ec", - fmt.Sprintf("weed volume -port=8444 -max=0 %s %s %s", - fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name), - fmt.Sprintf("-dir=%s", strings.Join(dirs, ",")), - fmt.Sprintf("-mserver=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), - ), - }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8444, - Name: "swfs-volume", - }, - { - ContainerPort: 18444, - }, - }, - ReadinessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/status", - Port: intstr.FromInt(8444), - Scheme: corev1.URISchemeHTTP, - }, - }, - InitialDelaySeconds: 15, - TimeoutSeconds: 5, - PeriodSeconds: 90, - SuccessThreshold: 1, - FailureThreshold: 100, - }, - LivenessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/status", - Port: intstr.FromInt(8444), - Scheme: corev1.URISchemeHTTP, - }, - }, - InitialDelaySeconds: 20, - TimeoutSeconds: 5, - PeriodSeconds: 90, - SuccessThreshold: 1, - FailureThreshold: 6, - }, - VolumeMounts: volumeMounts, - }}, - - Volumes: volumes, - }, + Spec: volumePodSpec, }, VolumeClaimTemplates: persistentVolumeClaims, }, diff --git a/controllers/helper.go b/controllers/helper.go index 8e7de1a..8eaeba8 100644 --- a/controllers/helper.go +++ b/controllers/helper.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" ) @@ -11,6 +12,35 @@ const ( masterPeerAddressPattern = "%s-master-%d.%s-master:9333" ) +var ( + kubernetesEnvVars = []corev1.EnvVar{ + { + Name: "POD_IP", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + } +) + func ReconcileResult(err error) (bool, ctrl.Result, error) { if err != nil { return true, ctrl.Result{}, err @@ -29,3 +59,14 @@ func getMasterAddresses(name string, replicas int32) []string { func getMasterPeersString(name string, replicas int32) string { return strings.Join(getMasterAddresses(name, replicas), ",") } + +func copyAnnotations(src map[string]string) map[string]string { + if src == nil { + return nil + } + dst := map[string]string{} + for k, v := range src { + dst[k] = v + } + return dst +}