create ingress, simplify ensure filer statefulset
This commit is contained in:
parent
9a4df148b8
commit
8ad7ee2d50
|
@ -39,6 +39,9 @@ type SeaweedSpec struct {
|
|||
|
||||
// FilerCount is the number of filers, default to 1
|
||||
FilerCount int32 `json:"filerCount,omitempty"`
|
||||
|
||||
// ingress
|
||||
Hosts []string `json:"hosts"`
|
||||
}
|
||||
|
||||
// SeaweedStatus defines the observed state of Seaweed
|
||||
|
|
|
@ -29,7 +29,7 @@ func (in *Seaweed) DeepCopyInto(out *Seaweed) {
|
|||
*out = *in
|
||||
out.TypeMeta = in.TypeMeta
|
||||
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
|
||||
out.Spec = in.Spec
|
||||
in.Spec.DeepCopyInto(&out.Spec)
|
||||
out.Status = in.Status
|
||||
}
|
||||
|
||||
|
@ -86,6 +86,11 @@ func (in *SeaweedList) DeepCopyObject() runtime.Object {
|
|||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *SeaweedSpec) DeepCopyInto(out *SeaweedSpec) {
|
||||
*out = *in
|
||||
if in.Hosts != nil {
|
||||
in, out := &in.Hosts, &out.Hosts
|
||||
*out = make([]string, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SeaweedSpec.
|
||||
|
|
|
@ -40,6 +40,11 @@ spec:
|
|||
description: FilerCount is the number of filers, default to 1
|
||||
format: int32
|
||||
type: integer
|
||||
hosts:
|
||||
description: ingress
|
||||
items:
|
||||
type: string
|
||||
type: array
|
||||
image:
|
||||
description: Image
|
||||
type: string
|
||||
|
@ -51,6 +56,8 @@ spec:
|
|||
to 1
|
||||
format: int32
|
||||
type: integer
|
||||
required:
|
||||
- hosts
|
||||
type: object
|
||||
status:
|
||||
description: SeaweedStatus defines the observed state of Seaweed
|
||||
|
|
|
@ -7,3 +7,6 @@ spec:
|
|||
image: chrislusf/seaweedfs:2.05
|
||||
volumeServerCount: 4
|
||||
filerCount: 2
|
||||
hosts:
|
||||
- "*"
|
||||
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
package controllers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
// svcName is the backend service name
|
||||
func createIngress(seaweedCR *seaweedv1.Seaweed, svcName string, port int) *extensionsv1beta1.Ingress {
|
||||
ingressLabel := map[string]string{"app": "seaweedfs", "role": "ingress", "name": svcName}
|
||||
|
||||
ingress := &extensionsv1beta1.Ingress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: svcName + "-ingress",
|
||||
Namespace: seaweedCR.Namespace,
|
||||
Labels: ingressLabel,
|
||||
},
|
||||
Spec: extensionsv1beta1.IngressSpec{
|
||||
Rules: []extensionsv1beta1.IngressRule{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, host := range seaweedCR.Spec.Hosts {
|
||||
rule := extensionsv1beta1.IngressRule{
|
||||
Host: host,
|
||||
IngressRuleValue: extensionsv1beta1.IngressRuleValue{
|
||||
HTTP: &extensionsv1beta1.HTTPIngressRuleValue{
|
||||
Paths: []extensionsv1beta1.HTTPIngressPath{
|
||||
{
|
||||
Path: "/",
|
||||
Backend: extensionsv1beta1.IngressBackend{
|
||||
ServiceName: svcName,
|
||||
ServicePort: intstr.FromInt(port),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
ingress.Spec.Rules = append(ingress.Spec.Rules, rule)
|
||||
}
|
||||
return ingress
|
||||
}
|
||||
|
||||
// the following is adapted from tidb-operator/pkg/controller/generic_control.go
|
||||
|
||||
type MergeFn func(existing, desired runtime.Object) error
|
||||
|
||||
// CreateOrUpdate create an object to the Kubernetes cluster for controller, if the object to create is existed,
|
||||
// call mergeFn to merge the change in new object to the existing object, then update the existing object.
|
||||
// The object will also be adopted by the given controller.
|
||||
func (r *SeaweedReconciler) CreateOrUpdate(controller, obj runtime.Object, mergeFn MergeFn) (runtime.Object, error) {
|
||||
|
||||
// controller-runtime/client will mutate the object pointer in-place,
|
||||
// to be consistent with other methods in our controller, we copy the object
|
||||
// to avoid the in-place mutation here and hereafter.
|
||||
desired := obj.DeepCopyObject()
|
||||
|
||||
// 1. try to create and see if there is any conflicts
|
||||
err := r.Create(context.TODO(), desired)
|
||||
if errors.IsAlreadyExists(err) {
|
||||
|
||||
// 2. object has already existed, merge our desired changes to it
|
||||
existing, err := EmptyClone(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key, err := client.ObjectKeyFromObject(existing)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = r.Get(context.TODO(), key, existing)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mutated := existing.DeepCopyObject()
|
||||
// 4. invoke mergeFn to mutate a copy of the existing object
|
||||
if err := mergeFn(mutated, desired); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 5. check if the copy is actually mutated
|
||||
if !apiequality.Semantic.DeepEqual(existing, mutated) {
|
||||
err := r.Update(context.TODO(), mutated)
|
||||
return mutated, err
|
||||
}
|
||||
|
||||
return mutated, nil
|
||||
}
|
||||
|
||||
return desired, err
|
||||
}
|
||||
|
||||
// EmptyClone create an clone of the resource with the same name and namespace (if namespace-scoped), with other fields unset
|
||||
func EmptyClone(obj runtime.Object) (runtime.Object, error) {
|
||||
meta, ok := obj.(metav1.Object)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Obj %v is not a metav1.Object, cannot call EmptyClone", obj)
|
||||
}
|
||||
gvk, err := InferObjectKind(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
inst, err := scheme.Scheme.New(gvk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
instMeta, ok := inst.(metav1.Object)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("New instatnce %v created from scheme is not a metav1.Object, EmptyClone failed", inst)
|
||||
}
|
||||
instMeta.SetName(meta.GetName())
|
||||
instMeta.SetNamespace(meta.GetNamespace())
|
||||
return inst, nil
|
||||
}
|
||||
|
||||
// InferObjectKind infers the object kind
|
||||
func InferObjectKind(obj runtime.Object) (schema.GroupVersionKind, error) {
|
||||
gvks, _, err := scheme.Scheme.ObjectKinds(obj)
|
||||
if err != nil {
|
||||
return schema.GroupVersionKind{}, err
|
||||
}
|
||||
if len(gvks) != 1 {
|
||||
return schema.GroupVersionKind{}, fmt.Errorf("Object %v has ambigious GVK", obj)
|
||||
}
|
||||
return gvks[0], nil
|
||||
}
|
|
@ -2,6 +2,7 @@ package controllers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
@ -32,40 +33,15 @@ func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (do
|
|||
}
|
||||
|
||||
func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||
ctx := context.Background()
|
||||
log := r.Log.WithValues("sw-filer-statefulset", seaweedCR.Name)
|
||||
filerStatefulSet := r.createFilerStatefulSet(seaweedCR)
|
||||
_, err := r.CreateOrUpdate(seaweedCR, filerStatefulSet, func(existing, desired runtime.Object) error {
|
||||
existingStatefulSet := existing.(*appsv1.StatefulSet)
|
||||
desiredStatefulSet := desired.(*appsv1.StatefulSet)
|
||||
|
||||
filerStatefulSet := &appsv1.StatefulSet{}
|
||||
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-filer", Namespace: seaweedCR.Namespace}, filerStatefulSet)
|
||||
if err != nil && errors.IsNotFound(err) {
|
||||
// Define a new deployment
|
||||
dep := r.createFilerStatefulSet(seaweedCR)
|
||||
log.Info("Creating a new filer statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
|
||||
err = r.Create(ctx, dep)
|
||||
if err != nil {
|
||||
log.Error(err, "Failed to create new filer statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
// Deployment created successfully - return and requeue
|
||||
return ReconcileResult(err)
|
||||
} else if err != nil {
|
||||
log.Error(err, "Failed to get filer statefulset")
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
|
||||
if *filerStatefulSet.Spec.Replicas != seaweedCR.Spec.FilerCount ||
|
||||
filerStatefulSet.Spec.Template.Spec.Containers[0].Image != seaweedCR.Spec.Image {
|
||||
filerStatefulSet.Spec.Replicas = &seaweedCR.Spec.FilerCount
|
||||
filerStatefulSet.Spec.Template.Spec.Containers[0].Image = seaweedCR.Spec.Image
|
||||
if err = r.Update(ctx, filerStatefulSet); err != nil {
|
||||
log.Error(err, "Failed to update filer statefulset", "Namespace", filerStatefulSet.Namespace, "Name", filerStatefulSet.Name)
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
// Deployment created successfully - return and requeue
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
|
||||
log.Info("Get filer stateful set " + filerStatefulSet.Name)
|
||||
existingStatefulSet.Spec.Replicas = desiredStatefulSet.Spec.Replicas
|
||||
existingStatefulSet.Spec.Template.Spec.Containers[0].Image = desiredStatefulSet.Spec.Template.Spec.Containers[0].Image
|
||||
return nil
|
||||
})
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue