package controllers

import (
	"context"
	"encoding/json"
	"fmt"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
	apiequality "k8s.io/apimachinery/pkg/api/equality"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/klog"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

// the following is adapted from tidb-operator/pkg/controller/generic_control.go

const (
	// LastAppliedPodTemplate is annotation key of the last applied pod template
	LastAppliedPodTemplate = "seaweedfs.com/last-applied-podtemplate"

	// LastAppliedConfigAnnotation is annotation key of last applied configuration
	LastAppliedConfigAnnotation = "seaweedfs.com/last-applied-configuration"
)

// MergeFn is to resolve conflicts
type MergeFn func(existing, desired runtime.Object) error

// CreateOrUpdate create an object to the Kubernetes cluster for controller, if the object to create is existed,
// call mergeFn to merge the change in new object to the existing object, then update the existing object.
// The object will also be adopted by the given controller.
func (r *SeaweedReconciler) CreateOrUpdate(obj runtime.Object, mergeFn MergeFn) (runtime.Object, error) {

	// controller-runtime/client will mutate the object pointer in-place,
	// to be consistent with other methods in our controller, we copy the object
	// to avoid the in-place mutation here and hereafter.
	desired := obj.DeepCopyObject()

	// 1. try to create and see if there is any conflicts
	err := r.Create(context.TODO(), desired)
	if errors.IsAlreadyExists(err) {

		// 2. object has already existed, merge our desired changes to it
		existing, err := EmptyClone(obj)
		if err != nil {
			return nil, err
		}
		key, err := client.ObjectKeyFromObject(existing)
		if err != nil {
			return nil, err
		}
		err = r.Get(context.TODO(), key, existing)
		if err != nil {
			return nil, err
		}

		mutated := existing.DeepCopyObject()
		// 4. invoke mergeFn to mutate a copy of the existing object
		if err := mergeFn(mutated, desired); err != nil {
			return nil, err
		}

		// 5. check if the copy is actually mutated
		if !apiequality.Semantic.DeepEqual(existing, mutated) {
			err := r.Update(context.TODO(), mutated)
			return mutated, err
		}

		return mutated, nil
	}

	return desired, err
}

func (r *SeaweedReconciler) addSpecToAnnotation(d *appsv1.Deployment) error {
	b, err := json.Marshal(d.Spec.Template.Spec)
	if err != nil {
		return err
	}
	if d.Annotations == nil {
		d.Annotations = map[string]string{}
	}
	d.Annotations[LastAppliedPodTemplate] = string(b)
	return nil
}

func (r *SeaweedReconciler) CreateOrUpdateDeployment(deploy *appsv1.Deployment) (*appsv1.Deployment, error) {
	r.addSpecToAnnotation(deploy)
	result, err := r.CreateOrUpdate(deploy, func(existing, desired runtime.Object) error {
		existingDep := existing.(*appsv1.Deployment)
		desiredDep := desired.(*appsv1.Deployment)

		existingDep.Spec.Replicas = desiredDep.Spec.Replicas
		existingDep.Labels = desiredDep.Labels

		if existingDep.Annotations == nil {
			existingDep.Annotations = map[string]string{}
		}
		for k, v := range desiredDep.Annotations {
			existingDep.Annotations[k] = v
		}
		// only override the default strategy if it is explicitly set in the desiredDep
		if string(desiredDep.Spec.Strategy.Type) != "" {
			existingDep.Spec.Strategy.Type = desiredDep.Spec.Strategy.Type
			if existingDep.Spec.Strategy.RollingUpdate != nil {
				existingDep.Spec.Strategy.RollingUpdate = desiredDep.Spec.Strategy.RollingUpdate
			}
		}
		// pod selector of deployment is immutable, so we don't mutate the labels of pod
		for k, v := range desiredDep.Spec.Template.Annotations {
			existingDep.Spec.Template.Annotations[k] = v
		}
		// podSpec of deployment is hard to merge, use an annotation to assist
		if DeploymentPodSpecChanged(desiredDep, existingDep) {
			// Record last applied spec in favor of future equality check
			b, err := json.Marshal(desiredDep.Spec.Template.Spec)
			if err != nil {
				return err
			}
			existingDep.Annotations[LastAppliedConfigAnnotation] = string(b)
			existingDep.Spec.Template.Spec = desiredDep.Spec.Template.Spec
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	return result.(*appsv1.Deployment), err
}

func (r *SeaweedReconciler) CreateOrUpdateService(svc *corev1.Service) (*corev1.Service, error) {
	result, err := r.CreateOrUpdate(svc, func(existing, desired runtime.Object) error {
		existingSvc := existing.(*corev1.Service)
		desiredSvc := desired.(*corev1.Service)

		if existingSvc.Annotations == nil {
			existingSvc.Annotations = map[string]string{}
		}
		for k, v := range desiredSvc.Annotations {
			existingSvc.Annotations[k] = v
		}
		existingSvc.Labels = desiredSvc.Labels
		equal, err := ServiceEqual(desiredSvc, existingSvc)
		if err != nil {
			return err
		}
		if !equal {
			// record desiredSvc Spec in annotations in favor of future equality checks
			b, err := json.Marshal(desiredSvc.Spec)
			if err != nil {
				return err
			}
			existingSvc.Annotations[LastAppliedConfigAnnotation] = string(b)
			clusterIp := existingSvc.Spec.ClusterIP
			ports := existingSvc.Spec.Ports
			serviceType := existingSvc.Spec.Type

			existingSvc.Spec = desiredSvc.Spec
			existingSvc.Spec.ClusterIP = clusterIp

			// If the existed service and the desired service is NodePort or LoadBalancerType, we should keep the nodePort unchanged.
			if (serviceType == corev1.ServiceTypeNodePort || serviceType == corev1.ServiceTypeLoadBalancer) &&
				(desiredSvc.Spec.Type == corev1.ServiceTypeNodePort || desiredSvc.Spec.Type == corev1.ServiceTypeLoadBalancer) {
				for i, dport := range existingSvc.Spec.Ports {
					for _, eport := range ports {
						// Because the portName could be edited,
						// we use Port number to link the desired Service Port and the existed Service Port in the nested loop
						if dport.Port == eport.Port && dport.Protocol == eport.Protocol {
							dport.NodePort = eport.NodePort
							existingSvc.Spec.Ports[i] = dport
							break
						}
					}
				}
			}
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	return result.(*corev1.Service), nil
}

func (r *SeaweedReconciler) CreateOrUpdateIngress(ingress *extensionsv1beta1.Ingress) (*extensionsv1beta1.Ingress, error) {
	result, err := r.CreateOrUpdate(ingress, func(existing, desired runtime.Object) error {
		existingIngress := existing.(*extensionsv1beta1.Ingress)
		desiredIngress := desired.(*extensionsv1beta1.Ingress)

		if existingIngress.Annotations == nil {
			existingIngress.Annotations = map[string]string{}
		}
		for k, v := range desiredIngress.Annotations {
			existingIngress.Annotations[k] = v
		}
		existingIngress.Labels = desiredIngress.Labels
		equal, err := IngressEqual(desiredIngress, existingIngress)
		if err != nil {
			return err
		}
		if !equal {
			// record desiredIngress Spec in annotations in favor of future equality checks
			b, err := json.Marshal(desiredIngress.Spec)
			if err != nil {
				return err
			}
			existingIngress.Annotations[LastAppliedConfigAnnotation] = string(b)
			existingIngress.Spec = desiredIngress.Spec
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	return result.(*extensionsv1beta1.Ingress), nil
}

func (r *SeaweedReconciler) CreateOrUpdateConfigMap(configMap *corev1.ConfigMap) (*corev1.ConfigMap, error) {
	result, err := r.CreateOrUpdate(configMap, func(existing, desired runtime.Object) error {
		existingConfigMap := existing.(*corev1.ConfigMap)
		desiredConfigMap := desired.(*corev1.ConfigMap)

		if existingConfigMap.Annotations == nil {
			existingConfigMap.Annotations = map[string]string{}
		}
		for k, v := range desiredConfigMap.Annotations {
			existingConfigMap.Annotations[k] = v
		}
		existingConfigMap.Labels = desiredConfigMap.Labels
		existingConfigMap.Data = desiredConfigMap.Data
		return nil
	})
	if err != nil {
		return nil, err
	}
	return result.(*corev1.ConfigMap), nil
}

// EmptyClone create an clone of the resource with the same name and namespace (if namespace-scoped), with other fields unset
func EmptyClone(obj runtime.Object) (runtime.Object, error) {
	meta, ok := obj.(metav1.Object)
	if !ok {
		return nil, fmt.Errorf("Obj %v is not a metav1.Object, cannot call EmptyClone", obj)
	}
	gvk, err := InferObjectKind(obj)
	if err != nil {
		return nil, err
	}
	inst, err := scheme.Scheme.New(gvk)
	if err != nil {
		return nil, err
	}
	instMeta, ok := inst.(metav1.Object)
	if !ok {
		return nil, fmt.Errorf("New instatnce %v created from scheme is not a metav1.Object, EmptyClone failed", inst)
	}
	instMeta.SetName(meta.GetName())
	instMeta.SetNamespace(meta.GetNamespace())
	return inst, nil
}

// InferObjectKind infers the object kind
func InferObjectKind(obj runtime.Object) (schema.GroupVersionKind, error) {
	gvks, _, err := scheme.Scheme.ObjectKinds(obj)
	if err != nil {
		return schema.GroupVersionKind{}, err
	}
	if len(gvks) != 1 {
		return schema.GroupVersionKind{}, fmt.Errorf("Object %v has ambigious GVK", obj)
	}
	return gvks[0], nil
}

// GetDeploymentLastAppliedPodTemplate set last applied pod template from Deployment's annotation
func GetDeploymentLastAppliedPodTemplate(dep *appsv1.Deployment) (*corev1.PodSpec, error) {
	applied, ok := dep.Annotations[LastAppliedPodTemplate]
	if !ok {
		return nil, fmt.Errorf("deployment:[%s/%s] not found spec's apply config", dep.GetNamespace(), dep.GetName())
	}
	podSpec := &corev1.PodSpec{}
	err := json.Unmarshal([]byte(applied), podSpec)
	if err != nil {
		return nil, err
	}
	return podSpec, nil
}

// DeploymentPodSpecChanged checks whether the new deployment differs with the old one's last-applied-config
func DeploymentPodSpecChanged(newDep *appsv1.Deployment, oldDep *appsv1.Deployment) bool {
	lastAppliedPodTemplate, err := GetDeploymentLastAppliedPodTemplate(oldDep)
	if err != nil {
		klog.Warningf("error get last-applied-config of deployment %s/%s: %v", oldDep.Namespace, oldDep.Name, err)
		return true
	}
	return !apiequality.Semantic.DeepEqual(newDep.Spec.Template.Spec, lastAppliedPodTemplate)
}

// ServiceEqual compares the new Service's spec with old Service's last applied config
func ServiceEqual(newSvc, oldSvc *corev1.Service) (bool, error) {
	oldSpec := corev1.ServiceSpec{}
	if lastAppliedConfig, ok := oldSvc.Annotations[LastAppliedConfigAnnotation]; ok {
		err := json.Unmarshal([]byte(lastAppliedConfig), &oldSpec)
		if err != nil {
			klog.Errorf("unmarshal ServiceSpec: [%s/%s]'s applied config failed,error: %v", oldSvc.GetNamespace(), oldSvc.GetName(), err)
			return false, err
		}
		return apiequality.Semantic.DeepEqual(oldSpec, newSvc.Spec), nil
	}
	return false, nil
}

func IngressEqual(newIngress, oldIngres *extensionsv1beta1.Ingress) (bool, error) {
	oldIngressSpec := extensionsv1beta1.IngressSpec{}
	if lastAppliedConfig, ok := oldIngres.Annotations[LastAppliedConfigAnnotation]; ok {
		err := json.Unmarshal([]byte(lastAppliedConfig), &oldIngressSpec)
		if err != nil {
			klog.Errorf("unmarshal IngressSpec: [%s/%s]'s applied config failed,error: %v", oldIngres.GetNamespace(), oldIngres.GetName(), err)
			return false, err
		}
		return apiequality.Semantic.DeepEqual(oldIngressSpec, newIngress.Spec), nil
	}
	return false, nil
}