kubernetes endpointslice_controller_test 源码

  • 2022-09-18
  • 浏览 (154)

kubernetes endpointslice_controller_test 代码

文件路径:/pkg/controller/endpointslice/endpointslice_controller_test.go

/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endpointslice

import (
	"context"
	"fmt"
	"reflect"
	"strconv"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	v1 "k8s.io/api/core/v1"
	discovery "k8s.io/api/discovery/v1"
	"k8s.io/apimachinery/pkg/api/resource"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/intstr"
	"k8s.io/apimachinery/pkg/util/rand"
	"k8s.io/apimachinery/pkg/util/wait"
	utilfeature "k8s.io/apiserver/pkg/util/feature"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes/fake"
	k8stesting "k8s.io/client-go/testing"
	"k8s.io/client-go/tools/cache"
	featuregatetesting "k8s.io/component-base/featuregate/testing"
	"k8s.io/kubernetes/pkg/controller"
	"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
	endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
	endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
	"k8s.io/kubernetes/pkg/features"
	utilpointer "k8s.io/utils/pointer"
)

// Most of the tests related to EndpointSlice allocation can be found in reconciler_test.go
// Tests here primarily focus on unique controller functionality before the reconciler begins

var alwaysReady = func() bool { return true }

type endpointSliceController struct {
	*Controller
	endpointSliceStore cache.Store
	nodeStore          cache.Store
	podStore           cache.Store
	serviceStore       cache.Store
}

func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceController) {
	client := fake.NewSimpleClientset()

	informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
	nodeInformer := informerFactory.Core().V1().Nodes()
	indexer := nodeInformer.Informer().GetIndexer()
	for _, nodeName := range nodeNames {
		indexer.Add(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
	}

	esInformer := informerFactory.Discovery().V1().EndpointSlices()
	esIndexer := esInformer.Informer().GetIndexer()

	// These reactors are required to mock functionality that would be covered
	// automatically if we weren't using the fake client.
	client.PrependReactor("create", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
		endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)

		if endpointSlice.ObjectMeta.GenerateName != "" {
			endpointSlice.ObjectMeta.Name = fmt.Sprintf("%s-%s", endpointSlice.ObjectMeta.GenerateName, rand.String(8))
			endpointSlice.ObjectMeta.GenerateName = ""
		}
		endpointSlice.Generation = 1
		esIndexer.Add(endpointSlice)

		return false, endpointSlice, nil
	}))
	client.PrependReactor("update", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
		endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
		endpointSlice.Generation++
		esIndexer.Update(endpointSlice)

		return false, endpointSlice, nil
	}))

	esController := NewController(
		informerFactory.Core().V1().Pods(),
		informerFactory.Core().V1().Services(),
		nodeInformer,
		esInformer,
		int32(100),
		client,
		batchPeriod)

	esController.nodesSynced = alwaysReady
	esController.podsSynced = alwaysReady
	esController.servicesSynced = alwaysReady
	esController.endpointSlicesSynced = alwaysReady

	return client, &endpointSliceController{
		esController,
		informerFactory.Discovery().V1().EndpointSlices().Informer().GetStore(),
		informerFactory.Core().V1().Nodes().Informer().GetStore(),
		informerFactory.Core().V1().Pods().Informer().GetStore(),
		informerFactory.Core().V1().Services().Informer().GetStore(),
	}
}

// Ensure SyncService for service with no selector results in no action
func TestSyncServiceNoSelector(t *testing.T) {
	ns := metav1.NamespaceDefault
	serviceName := "testing-1"
	client, esController := newController([]string{"node-1"}, time.Duration(0))
	esController.serviceStore.Add(&v1.Service{
		ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
		Spec: v1.ServiceSpec{
			Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
		},
	})

	err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
	assert.NoError(t, err)
	assert.Len(t, client.Actions(), 0)
}

// Ensure SyncService for service with pending deletion results in no action
func TestSyncServicePendingDeletion(t *testing.T) {
	ns := metav1.NamespaceDefault
	serviceName := "testing-1"
	deletionTimestamp := metav1.Now()
	client, esController := newController([]string{"node-1"}, time.Duration(0))
	esController.serviceStore.Add(&v1.Service{
		ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns, DeletionTimestamp: &deletionTimestamp},
		Spec: v1.ServiceSpec{
			Selector: map[string]string{"foo": "bar"},
			Ports:    []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
		},
	})

	err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
	assert.NoError(t, err)
	assert.Len(t, client.Actions(), 0)
}

// Ensure SyncService for service with selector but no pods results in placeholder EndpointSlice
func TestSyncServiceWithSelector(t *testing.T) {
	ns := metav1.NamespaceDefault
	serviceName := "testing-1"
	client, esController := newController([]string{"node-1"}, time.Duration(0))
	standardSyncService(t, esController, ns, serviceName)
	expectActions(t, client.Actions(), 1, "create", "endpointslices")

	sliceList, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
	assert.Nil(t, err, "Expected no error fetching endpoint slices")
	assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices")
	slice := sliceList.Items[0]
	assert.Regexp(t, "^"+serviceName, slice.Name)
	assert.Equal(t, serviceName, slice.Labels[discovery.LabelServiceName])
	assert.EqualValues(t, []discovery.EndpointPort{}, slice.Ports)
	assert.EqualValues(t, []discovery.Endpoint{}, slice.Endpoints)
	assert.NotEmpty(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"])
}

// Ensure SyncService gracefully handles a missing service. This test also
// populates another existing service to ensure a clean up process doesn't
// remove too much.
func TestSyncServiceMissing(t *testing.T) {
	namespace := metav1.NamespaceDefault
	client, esController := newController([]string{"node-1"}, time.Duration(0))

	// Build up existing service
	existingServiceName := "stillthere"
	existingServiceKey := endpointutil.ServiceKey{Name: existingServiceName, Namespace: namespace}
	esController.triggerTimeTracker.ServiceStates[existingServiceKey] = endpointutil.ServiceState{}
	esController.serviceStore.Add(&v1.Service{
		ObjectMeta: metav1.ObjectMeta{Name: existingServiceName, Namespace: namespace},
		Spec: v1.ServiceSpec{
			Ports:    []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
			Selector: map[string]string{"foo": "bar"},
		},
	})

	// Add missing service to triggerTimeTracker to ensure the reference is cleaned up
	missingServiceName := "notthere"
	missingServiceKey := endpointutil.ServiceKey{Name: missingServiceName, Namespace: namespace}
	esController.triggerTimeTracker.ServiceStates[missingServiceKey] = endpointutil.ServiceState{}

	err := esController.syncService(fmt.Sprintf("%s/%s", namespace, missingServiceName))

	// nil should be returned when the service doesn't exist
	assert.Nil(t, err, "Expected no error syncing service")

	// That should mean no client actions were performed
	assert.Len(t, client.Actions(), 0)

	// TriggerTimeTracker should have removed the reference to the missing service
	assert.NotContains(t, esController.triggerTimeTracker.ServiceStates, missingServiceKey)

	// TriggerTimeTracker should have left the reference to the missing service
	assert.Contains(t, esController.triggerTimeTracker.ServiceStates, existingServiceKey)
}

// Ensure SyncService correctly selects Pods.
func TestSyncServicePodSelection(t *testing.T) {
	client, esController := newController([]string{"node-1"}, time.Duration(0))
	ns := metav1.NamespaceDefault

	pod1 := newPod(1, ns, true, 0, false)
	esController.podStore.Add(pod1)

	// ensure this pod will not match the selector
	pod2 := newPod(2, ns, true, 0, false)
	pod2.Labels["foo"] = "boo"
	esController.podStore.Add(pod2)

	standardSyncService(t, esController, ns, "testing-1")
	expectActions(t, client.Actions(), 1, "create", "endpointslices")

	// an endpoint slice should be created, it should only reference pod1 (not pod2)
	slices, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
	assert.Nil(t, err, "Expected no error fetching endpoint slices")
	assert.Len(t, slices.Items, 1, "Expected 1 endpoint slices")
	slice := slices.Items[0]
	assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoint in first slice")
	assert.NotEmpty(t, slice.Annotations[v1.EndpointsLastChangeTriggerTime])
	endpoint := slice.Endpoints[0]
	assert.EqualValues(t, endpoint.TargetRef, &v1.ObjectReference{Kind: "Pod", Namespace: ns, Name: pod1.Name})
}

func TestSyncServiceEndpointSlicePendingDeletion(t *testing.T) {
	client, esController := newController([]string{"node-1"}, time.Duration(0))
	ns := metav1.NamespaceDefault
	serviceName := "testing-1"
	service := createService(t, esController, ns, serviceName)
	err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
	assert.Nil(t, err, "Expected no error syncing service")

	gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
	ownerRef := metav1.NewControllerRef(service, gvk)

	deletedTs := metav1.Now()
	endpointSlice := &discovery.EndpointSlice{
		ObjectMeta: metav1.ObjectMeta{
			Name:            "epSlice-1",
			Namespace:       ns,
			OwnerReferences: []metav1.OwnerReference{*ownerRef},
			Labels: map[string]string{
				discovery.LabelServiceName: serviceName,
				discovery.LabelManagedBy:   controllerName,
			},
			DeletionTimestamp: &deletedTs,
		},
		AddressType: discovery.AddressTypeIPv4,
	}
	err = esController.endpointSliceStore.Add(endpointSlice)
	if err != nil {
		t.Fatalf("Expected no error adding EndpointSlice: %v", err)
	}
	_, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
	if err != nil {
		t.Fatalf("Expected no error creating EndpointSlice: %v", err)
	}

	numActionsBefore := len(client.Actions())
	err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
	assert.Nil(t, err, "Expected no error syncing service")

	// The EndpointSlice marked for deletion should be ignored by the controller, and thus
	// should not result in any action.
	if len(client.Actions()) != numActionsBefore {
		t.Errorf("Expected 0 more actions, got %d", len(client.Actions())-numActionsBefore)
	}
}

// Ensure SyncService correctly selects and labels EndpointSlices.
func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
	client, esController := newController([]string{"node-1"}, time.Duration(0))
	ns := metav1.NamespaceDefault
	serviceName := "testing-1"
	service := createService(t, esController, ns, serviceName)

	gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
	ownerRef := metav1.NewControllerRef(service, gvk)

	// 5 slices, 3 with matching labels for our service
	endpointSlices := []*discovery.EndpointSlice{{
		ObjectMeta: metav1.ObjectMeta{
			Name:            "matching-1",
			Namespace:       ns,
			OwnerReferences: []metav1.OwnerReference{*ownerRef},
			Labels: map[string]string{
				discovery.LabelServiceName: serviceName,
				discovery.LabelManagedBy:   controllerName,
			},
		},
		AddressType: discovery.AddressTypeIPv4,
	}, {
		ObjectMeta: metav1.ObjectMeta{
			Name:            "matching-2",
			Namespace:       ns,
			OwnerReferences: []metav1.OwnerReference{*ownerRef},
			Labels: map[string]string{
				discovery.LabelServiceName: serviceName,
				discovery.LabelManagedBy:   controllerName,
			},
		},
		AddressType: discovery.AddressTypeIPv4,
	}, {
		ObjectMeta: metav1.ObjectMeta{
			Name:      "partially-matching-1",
			Namespace: ns,
			Labels: map[string]string{
				discovery.LabelServiceName: serviceName,
			},
		},
		AddressType: discovery.AddressTypeIPv4,
	}, {
		ObjectMeta: metav1.ObjectMeta{
			Name:      "not-matching-1",
			Namespace: ns,
			Labels: map[string]string{
				discovery.LabelServiceName: "something-else",
				discovery.LabelManagedBy:   controllerName,
			},
		},
		AddressType: discovery.AddressTypeIPv4,
	}, {
		ObjectMeta: metav1.ObjectMeta{
			Name:      "not-matching-2",
			Namespace: ns,
			Labels: map[string]string{
				discovery.LabelServiceName: serviceName,
				discovery.LabelManagedBy:   "something-else",
			},
		},
		AddressType: discovery.AddressTypeIPv4,
	}}

	cmc := newCacheMutationCheck(endpointSlices)

	// need to add them to both store and fake clientset
	for _, endpointSlice := range endpointSlices {
		err := esController.endpointSliceStore.Add(endpointSlice)
		if err != nil {
			t.Fatalf("Expected no error adding EndpointSlice: %v", err)
		}
		_, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
		if err != nil {
			t.Fatalf("Expected no error creating EndpointSlice: %v", err)
		}
	}

	numActionsBefore := len(client.Actions())
	err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
	assert.Nil(t, err, "Expected no error syncing service")

	if len(client.Actions()) != numActionsBefore+2 {
		t.Errorf("Expected 2 more actions, got %d", len(client.Actions())-numActionsBefore)
	}

	// only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder
	expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices")
	expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices")

	// ensure cache mutation has not occurred
	cmc.Check(t)
}

func TestOnEndpointSliceUpdate(t *testing.T) {
	_, esController := newController([]string{"node-1"}, time.Duration(0))
	ns := metav1.NamespaceDefault
	serviceName := "testing-1"
	epSlice1 := &discovery.EndpointSlice{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "matching-1",
			Namespace: ns,
			Labels: map[string]string{
				discovery.LabelServiceName: serviceName,
				discovery.LabelManagedBy:   controllerName,
			},
		},
		AddressType: discovery.AddressTypeIPv4,
	}

	epSlice2 := epSlice1.DeepCopy()
	epSlice2.Labels[discovery.LabelManagedBy] = "something else"

	assert.Equal(t, 0, esController.queue.Len())
	esController.onEndpointSliceUpdate(epSlice1, epSlice2)
	err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) {
		if esController.queue.Len() > 0 {
			return true, nil
		}
		return false, nil
	})
	if err != nil {
		t.Fatalf("unexpected error waiting for add to queue")
	}
	assert.Equal(t, 1, esController.queue.Len())
}

func TestSyncService(t *testing.T) {
	creationTimestamp := metav1.Now()
	deletionTimestamp := metav1.Now()

	testcases := []struct {
		name                   string
		service                *v1.Service
		pods                   []*v1.Pod
		expectedEndpointPorts  []discovery.EndpointPort
		expectedEndpoints      []discovery.Endpoint
		terminatingGateEnabled bool
	}{
		{
			name: "pods with multiple IPs and Service with ipFamilies=ipv4",
			service: &v1.Service{
				ObjectMeta: metav1.ObjectMeta{
					Name:              "foobar",
					Namespace:         "default",
					CreationTimestamp: creationTimestamp,
				},
				Spec: v1.ServiceSpec{
					Ports: []v1.ServicePort{
						{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
						{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
						{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
					},
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
				},
			},
			pods: []*v1.Pod{
				{
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod0",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: nil,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.1",
						PodIPs: []v1.PodIP{{
							IP: "10.0.0.1",
						}},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
				{
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod1",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: nil,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.2",
						PodIPs: []v1.PodIP{
							{
								IP: "10.0.0.2",
							},
							{
								IP: "fd08::5678:0000:0000:9abc:def0",
							},
						},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
			},
			expectedEndpointPorts: []discovery.EndpointPort{
				{
					Name:     utilpointer.StringPtr("sctp-example"),
					Protocol: protoPtr(v1.ProtocolSCTP),
					Port:     utilpointer.Int32Ptr(int32(3456)),
				},
				{
					Name:     utilpointer.StringPtr("udp-example"),
					Protocol: protoPtr(v1.ProtocolUDP),
					Port:     utilpointer.Int32Ptr(int32(161)),
				},
				{
					Name:     utilpointer.StringPtr("tcp-example"),
					Protocol: protoPtr(v1.ProtocolTCP),
					Port:     utilpointer.Int32Ptr(int32(80)),
				},
			},
			expectedEndpoints: []discovery.Endpoint{
				{
					Conditions: discovery.EndpointConditions{
						Ready: utilpointer.BoolPtr(true),
					},
					Addresses: []string{"10.0.0.1"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
				{
					Conditions: discovery.EndpointConditions{
						Ready: utilpointer.BoolPtr(true),
					},
					Addresses: []string{"10.0.0.2"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
			},
		},
		{
			name: "pods with multiple IPs and Service with ipFamilies=ipv6",
			service: &v1.Service{
				ObjectMeta: metav1.ObjectMeta{
					Name:              "foobar",
					Namespace:         "default",
					CreationTimestamp: creationTimestamp,
				},
				Spec: v1.ServiceSpec{
					Ports: []v1.ServicePort{
						{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
						{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
						{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
					},
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv6Protocol},
				},
			},
			pods: []*v1.Pod{
				{
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod0",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: nil,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.1",
						PodIPs: []v1.PodIP{{
							IP: "10.0.0.1",
						}},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
				{
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod1",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: nil,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.2",
						PodIPs: []v1.PodIP{
							{
								IP: "10.0.0.2",
							},
							{
								IP: "fd08::5678:0000:0000:9abc:def0",
							},
						},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
			},
			expectedEndpointPorts: []discovery.EndpointPort{
				{
					Name:     utilpointer.StringPtr("sctp-example"),
					Protocol: protoPtr(v1.ProtocolSCTP),
					Port:     utilpointer.Int32Ptr(int32(3456)),
				},
				{
					Name:     utilpointer.StringPtr("udp-example"),
					Protocol: protoPtr(v1.ProtocolUDP),
					Port:     utilpointer.Int32Ptr(int32(161)),
				},
				{
					Name:     utilpointer.StringPtr("tcp-example"),
					Protocol: protoPtr(v1.ProtocolTCP),
					Port:     utilpointer.Int32Ptr(int32(80)),
				},
			},
			expectedEndpoints: []discovery.Endpoint{
				{
					Conditions: discovery.EndpointConditions{
						Ready: utilpointer.BoolPtr(true),
					},
					Addresses: []string{"fd08::5678:0000:0000:9abc:def0"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
			},
		},
		{
			name: "Terminating pods with EndpointSliceTerminatingCondition enabled",
			service: &v1.Service{
				ObjectMeta: metav1.ObjectMeta{
					Name:              "foobar",
					Namespace:         "default",
					CreationTimestamp: creationTimestamp,
				},
				Spec: v1.ServiceSpec{
					Ports: []v1.ServicePort{
						{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
						{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
						{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
					},
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
				},
			},
			pods: []*v1.Pod{
				{
					// one ready pod for comparison
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod0",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: nil,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.1",
						PodIPs: []v1.PodIP{{
							IP: "10.0.0.1",
						}},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
				{
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod1",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: &deletionTimestamp,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.2",
						PodIPs: []v1.PodIP{
							{
								IP: "10.0.0.2",
							},
						},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
			},
			expectedEndpointPorts: []discovery.EndpointPort{
				{
					Name:     utilpointer.StringPtr("sctp-example"),
					Protocol: protoPtr(v1.ProtocolSCTP),
					Port:     utilpointer.Int32Ptr(int32(3456)),
				},
				{
					Name:     utilpointer.StringPtr("udp-example"),
					Protocol: protoPtr(v1.ProtocolUDP),
					Port:     utilpointer.Int32Ptr(int32(161)),
				},
				{
					Name:     utilpointer.StringPtr("tcp-example"),
					Protocol: protoPtr(v1.ProtocolTCP),
					Port:     utilpointer.Int32Ptr(int32(80)),
				},
			},
			expectedEndpoints: []discovery.Endpoint{
				{
					Conditions: discovery.EndpointConditions{
						Ready:       utilpointer.BoolPtr(true),
						Serving:     utilpointer.BoolPtr(true),
						Terminating: utilpointer.BoolPtr(false),
					},
					Addresses: []string{"10.0.0.1"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
				{
					Conditions: discovery.EndpointConditions{
						Ready:       utilpointer.BoolPtr(false),
						Serving:     utilpointer.BoolPtr(true),
						Terminating: utilpointer.BoolPtr(true),
					},
					Addresses: []string{"10.0.0.2"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
			},
			terminatingGateEnabled: true,
		},
		{
			name: "Terminating pods with EndpointSliceTerminatingCondition disabled",
			service: &v1.Service{
				ObjectMeta: metav1.ObjectMeta{
					Name:              "foobar",
					Namespace:         "default",
					CreationTimestamp: creationTimestamp,
				},
				Spec: v1.ServiceSpec{
					Ports: []v1.ServicePort{
						{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
						{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
						{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
					},
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
				},
			},
			pods: []*v1.Pod{
				{
					// one ready pod for comparison
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod0",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: nil,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.1",
						PodIPs: []v1.PodIP{{
							IP: "10.0.0.1",
						}},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
				{
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod1",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: &deletionTimestamp,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.2",
						PodIPs: []v1.PodIP{
							{
								IP: "10.0.0.2",
							},
						},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
			},
			expectedEndpointPorts: []discovery.EndpointPort{
				{
					Name:     utilpointer.StringPtr("sctp-example"),
					Protocol: protoPtr(v1.ProtocolSCTP),
					Port:     utilpointer.Int32Ptr(int32(3456)),
				},
				{
					Name:     utilpointer.StringPtr("udp-example"),
					Protocol: protoPtr(v1.ProtocolUDP),
					Port:     utilpointer.Int32Ptr(int32(161)),
				},
				{
					Name:     utilpointer.StringPtr("tcp-example"),
					Protocol: protoPtr(v1.ProtocolTCP),
					Port:     utilpointer.Int32Ptr(int32(80)),
				},
			},
			expectedEndpoints: []discovery.Endpoint{
				{
					Conditions: discovery.EndpointConditions{
						Ready: utilpointer.BoolPtr(true),
					},
					Addresses: []string{"10.0.0.1"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
			},
			terminatingGateEnabled: false,
		},
		{
			name: "Not ready terminating pods with EndpointSliceTerminatingCondition enabled",
			service: &v1.Service{
				ObjectMeta: metav1.ObjectMeta{
					Name:              "foobar",
					Namespace:         "default",
					CreationTimestamp: creationTimestamp,
				},
				Spec: v1.ServiceSpec{
					Ports: []v1.ServicePort{
						{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
						{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
						{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
					},
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
				},
			},
			pods: []*v1.Pod{
				{
					// one ready pod for comparison
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod0",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: nil,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.1",
						PodIPs: []v1.PodIP{{
							IP: "10.0.0.1",
						}},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
				{
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod1",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: &deletionTimestamp,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.2",
						PodIPs: []v1.PodIP{
							{
								IP: "10.0.0.2",
							},
						},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionFalse,
							},
						},
					},
				},
			},
			expectedEndpointPorts: []discovery.EndpointPort{
				{
					Name:     utilpointer.StringPtr("sctp-example"),
					Protocol: protoPtr(v1.ProtocolSCTP),
					Port:     utilpointer.Int32Ptr(int32(3456)),
				},
				{
					Name:     utilpointer.StringPtr("udp-example"),
					Protocol: protoPtr(v1.ProtocolUDP),
					Port:     utilpointer.Int32Ptr(int32(161)),
				},
				{
					Name:     utilpointer.StringPtr("tcp-example"),
					Protocol: protoPtr(v1.ProtocolTCP),
					Port:     utilpointer.Int32Ptr(int32(80)),
				},
			},
			expectedEndpoints: []discovery.Endpoint{
				{
					Conditions: discovery.EndpointConditions{
						Ready:       utilpointer.BoolPtr(true),
						Serving:     utilpointer.BoolPtr(true),
						Terminating: utilpointer.BoolPtr(false),
					},
					Addresses: []string{"10.0.0.1"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
				{
					Conditions: discovery.EndpointConditions{
						Ready:       utilpointer.BoolPtr(false),
						Serving:     utilpointer.BoolPtr(false),
						Terminating: utilpointer.BoolPtr(true),
					},
					Addresses: []string{"10.0.0.2"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
			},
			terminatingGateEnabled: true,
		},
		{
			name: "Not ready terminating pods with EndpointSliceTerminatingCondition disabled",
			service: &v1.Service{
				ObjectMeta: metav1.ObjectMeta{
					Name:              "foobar",
					Namespace:         "default",
					CreationTimestamp: creationTimestamp,
				},
				Spec: v1.ServiceSpec{
					Ports: []v1.ServicePort{
						{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
						{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
						{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
					},
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
				},
			},
			pods: []*v1.Pod{
				{
					// one ready pod for comparison
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod0",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: nil,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.1",
						PodIPs: []v1.PodIP{{
							IP: "10.0.0.1",
						}},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionTrue,
							},
						},
					},
				},
				{
					ObjectMeta: metav1.ObjectMeta{
						Namespace:         "default",
						Name:              "pod1",
						Labels:            map[string]string{"foo": "bar"},
						DeletionTimestamp: &deletionTimestamp,
					},
					Spec: v1.PodSpec{
						Containers: []v1.Container{{
							Name: "container-1",
						}},
						NodeName: "node-1",
					},
					Status: v1.PodStatus{
						PodIP: "10.0.0.2",
						PodIPs: []v1.PodIP{
							{
								IP: "10.0.0.2",
							},
						},
						Conditions: []v1.PodCondition{
							{
								Type:   v1.PodReady,
								Status: v1.ConditionFalse,
							},
						},
					},
				},
			},
			expectedEndpointPorts: []discovery.EndpointPort{
				{
					Name:     utilpointer.StringPtr("sctp-example"),
					Protocol: protoPtr(v1.ProtocolSCTP),
					Port:     utilpointer.Int32Ptr(int32(3456)),
				},
				{
					Name:     utilpointer.StringPtr("udp-example"),
					Protocol: protoPtr(v1.ProtocolUDP),
					Port:     utilpointer.Int32Ptr(int32(161)),
				},
				{
					Name:     utilpointer.StringPtr("tcp-example"),
					Protocol: protoPtr(v1.ProtocolTCP),
					Port:     utilpointer.Int32Ptr(int32(80)),
				},
			},
			expectedEndpoints: []discovery.Endpoint{
				{
					Conditions: discovery.EndpointConditions{
						Ready: utilpointer.BoolPtr(true),
					},
					Addresses: []string{"10.0.0.1"},
					TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
					NodeName:  utilpointer.StringPtr("node-1"),
				},
			},
			terminatingGateEnabled: false,
		},
	}

	for _, testcase := range testcases {
		t.Run(testcase.name, func(t *testing.T) {
			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testcase.terminatingGateEnabled)()

			client, esController := newController([]string{"node-1"}, time.Duration(0))

			for _, pod := range testcase.pods {
				esController.podStore.Add(pod)
			}
			esController.serviceStore.Add(testcase.service)

			_, err := esController.client.CoreV1().Services(testcase.service.Namespace).Create(context.TODO(), testcase.service, metav1.CreateOptions{})
			assert.Nil(t, err, "Expected no error creating service")

			err = esController.syncService(fmt.Sprintf("%s/%s", testcase.service.Namespace, testcase.service.Name))
			assert.Nil(t, err)

			// last action should be to create endpoint slice
			expectActions(t, client.Actions(), 1, "create", "endpointslices")
			sliceList, err := client.DiscoveryV1().EndpointSlices(testcase.service.Namespace).List(context.TODO(), metav1.ListOptions{})
			assert.Nil(t, err, "Expected no error fetching endpoint slices")
			assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices")

			// ensure all attributes of endpoint slice match expected state
			slice := sliceList.Items[0]
			assert.Equal(t, slice.Annotations[v1.EndpointsLastChangeTriggerTime], creationTimestamp.UTC().Format(time.RFC3339Nano))
			assert.ElementsMatch(t, testcase.expectedEndpointPorts, slice.Ports)
			assert.ElementsMatch(t, testcase.expectedEndpoints, slice.Endpoints)
		})
	}
}

// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodAddsBatching(t *testing.T) {
	t.Parallel()

	type podAdd struct {
		delay time.Duration
	}

	tests := []struct {
		name             string
		batchPeriod      time.Duration
		adds             []podAdd
		finalDelay       time.Duration
		wantRequestCount int
	}{
		{
			name:        "three adds with no batching",
			batchPeriod: 0 * time.Second,
			adds: []podAdd{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay: 200 * time.Millisecond,
				},
				{
					delay: 100 * time.Millisecond,
				},
				{
					delay: 100 * time.Millisecond,
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 3,
		},
		{
			name:        "three adds in one batch",
			batchPeriod: 1 * time.Second,
			adds: []podAdd{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay: 200 * time.Millisecond,
				},
				{
					delay: 100 * time.Millisecond,
				},
				{
					delay: 100 * time.Millisecond,
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 1,
		},
		{
			name:        "three adds in two batches",
			batchPeriod: 1 * time.Second,
			adds: []podAdd{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay: 200 * time.Millisecond,
				},
				{
					delay: 100 * time.Millisecond,
				},
				{
					delay: 1 * time.Second,
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 2,
		},
	}

	for _, tc := range tests {
		t.Run(tc.name, func(t *testing.T) {
			ns := metav1.NamespaceDefault
			client, esController := newController([]string{"node-1"}, tc.batchPeriod)
			stopCh := make(chan struct{})
			defer close(stopCh)

			go esController.Run(1, stopCh)

			esController.serviceStore.Add(&v1.Service{
				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
				Spec: v1.ServiceSpec{
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
					Ports:      []v1.ServicePort{{Port: 80}},
				},
			})

			for i, add := range tc.adds {
				time.Sleep(add.delay)

				p := newPod(i, ns, true, 0, false)
				esController.podStore.Add(p)
				esController.addPod(p)
			}

			time.Sleep(tc.finalDelay)
			assert.Len(t, client.Actions(), tc.wantRequestCount)
			// In case of error, make debugging easier.
			for _, action := range client.Actions() {
				t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
			}
		})
	}
}

// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodUpdatesBatching(t *testing.T) {
	t.Parallel()

	resourceVersion := 1
	type podUpdate struct {
		delay   time.Duration
		podName string
		podIP   string
	}

	tests := []struct {
		name             string
		batchPeriod      time.Duration
		podsCount        int
		updates          []podUpdate
		finalDelay       time.Duration
		wantRequestCount int
	}{
		{
			name:        "three updates with no batching",
			batchPeriod: 0 * time.Second,
			podsCount:   10,
			updates: []podUpdate{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay:   200 * time.Millisecond,
					podName: "pod0",
					podIP:   "10.0.0.0",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod1",
					podIP:   "10.0.0.1",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod2",
					podIP:   "10.0.0.2",
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 3,
		},
		{
			name:        "three updates in one batch",
			batchPeriod: 1 * time.Second,
			podsCount:   10,
			updates: []podUpdate{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay:   200 * time.Millisecond,
					podName: "pod0",
					podIP:   "10.0.0.0",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod1",
					podIP:   "10.0.0.1",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod2",
					podIP:   "10.0.0.2",
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 1,
		},
		{
			name:        "three updates in two batches",
			batchPeriod: 1 * time.Second,
			podsCount:   10,
			updates: []podUpdate{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay:   200 * time.Millisecond,
					podName: "pod0",
					podIP:   "10.0.0.0",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod1",
					podIP:   "10.0.0.1",
				},
				{
					delay:   1 * time.Second,
					podName: "pod2",
					podIP:   "10.0.0.2",
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 2,
		},
	}

	for _, tc := range tests {
		t.Run(tc.name, func(t *testing.T) {
			ns := metav1.NamespaceDefault
			client, esController := newController([]string{"node-1"}, tc.batchPeriod)
			stopCh := make(chan struct{})
			defer close(stopCh)

			go esController.Run(1, stopCh)

			addPods(t, esController, ns, tc.podsCount)

			esController.serviceStore.Add(&v1.Service{
				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
				Spec: v1.ServiceSpec{
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
					Ports:      []v1.ServicePort{{Port: 80}},
				},
			})

			for _, update := range tc.updates {
				time.Sleep(update.delay)

				old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
				if err != nil {
					t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
				}
				if !exists {
					t.Fatalf("Pod %q doesn't exist", update.podName)
				}
				oldPod := old.(*v1.Pod)
				newPod := oldPod.DeepCopy()
				newPod.Status.PodIPs[0].IP = update.podIP
				newPod.ResourceVersion = strconv.Itoa(resourceVersion)
				resourceVersion++

				esController.podStore.Update(newPod)
				esController.updatePod(oldPod, newPod)
			}

			time.Sleep(tc.finalDelay)
			assert.Len(t, client.Actions(), tc.wantRequestCount)
			// In case of error, make debugging easier.
			for _, action := range client.Actions() {
				t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
			}
		})
	}
}

// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodDeleteBatching(t *testing.T) {
	t.Parallel()

	type podDelete struct {
		delay   time.Duration
		podName string
	}

	tests := []struct {
		name             string
		batchPeriod      time.Duration
		podsCount        int
		deletes          []podDelete
		finalDelay       time.Duration
		wantRequestCount int
	}{
		{
			name:        "three deletes with no batching",
			batchPeriod: 0 * time.Second,
			podsCount:   10,
			deletes: []podDelete{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay:   200 * time.Millisecond,
					podName: "pod0",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod1",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod2",
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 3,
		},
		{
			name:        "three deletes in one batch",
			batchPeriod: 1 * time.Second,
			podsCount:   10,
			deletes: []podDelete{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay:   200 * time.Millisecond,
					podName: "pod0",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod1",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod2",
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 1,
		},
		{
			name:        "three deletes in two batches",
			batchPeriod: 1 * time.Second,
			podsCount:   10,
			deletes: []podDelete{
				{
					// endpoints.Run needs ~100 ms to start processing updates.
					delay:   200 * time.Millisecond,
					podName: "pod0",
				},
				{
					delay:   100 * time.Millisecond,
					podName: "pod1",
				},
				{
					delay:   1 * time.Second,
					podName: "pod2",
				},
			},
			finalDelay:       3 * time.Second,
			wantRequestCount: 2,
		},
	}

	for _, tc := range tests {
		t.Run(tc.name, func(t *testing.T) {
			ns := metav1.NamespaceDefault
			client, esController := newController([]string{"node-1"}, tc.batchPeriod)
			stopCh := make(chan struct{})
			defer close(stopCh)

			go esController.Run(1, stopCh)

			addPods(t, esController, ns, tc.podsCount)

			esController.serviceStore.Add(&v1.Service{
				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
				Spec: v1.ServiceSpec{
					Selector:   map[string]string{"foo": "bar"},
					IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
					Ports:      []v1.ServicePort{{Port: 80}},
				},
			})

			for _, update := range tc.deletes {
				time.Sleep(update.delay)

				old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
				assert.Nil(t, err, "error while retrieving old value of %q: %v", update.podName, err)
				assert.Equal(t, true, exists, "pod should exist")
				esController.podStore.Delete(old)
				esController.deletePod(old)
			}

			time.Sleep(tc.finalDelay)
			assert.Len(t, client.Actions(), tc.wantRequestCount)
			// In case of error, make debugging easier.
			for _, action := range client.Actions() {
				t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
			}
		})
	}
}

func TestSyncServiceStaleInformer(t *testing.T) {
	testcases := []struct {
		name                     string
		informerGenerationNumber int64
		trackerGenerationNumber  int64
		expectError              bool
	}{
		{
			name:                     "informer cache outdated",
			informerGenerationNumber: 10,
			trackerGenerationNumber:  12,
			expectError:              true,
		},
		{
			name:                     "cache and tracker synced",
			informerGenerationNumber: 10,
			trackerGenerationNumber:  10,
			expectError:              false,
		},
		{
			name:                     "tracker outdated",
			informerGenerationNumber: 10,
			trackerGenerationNumber:  1,
			expectError:              false,
		},
	}

	for _, testcase := range testcases {
		t.Run(testcase.name, func(t *testing.T) {
			_, esController := newController([]string{"node-1"}, time.Duration(0))
			ns := metav1.NamespaceDefault
			serviceName := "testing-1"

			// Store Service in the cache
			esController.serviceStore.Add(&v1.Service{
				ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
				Spec: v1.ServiceSpec{
					Selector: map[string]string{"foo": "bar"},
					Ports:    []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
				},
			})

			// Create EndpointSlice in the informer cache with informerGenerationNumber
			epSlice1 := &discovery.EndpointSlice{
				ObjectMeta: metav1.ObjectMeta{
					Name:       "matching-1",
					Namespace:  ns,
					Generation: testcase.informerGenerationNumber,
					Labels: map[string]string{
						discovery.LabelServiceName: serviceName,
						discovery.LabelManagedBy:   controllerName,
					},
				},
				AddressType: discovery.AddressTypeIPv4,
			}
			err := esController.endpointSliceStore.Add(epSlice1)
			if err != nil {
				t.Fatalf("Expected no error adding EndpointSlice: %v", err)
			}

			// Create EndpointSlice in the tracker with trackerGenerationNumber
			epSlice2 := epSlice1.DeepCopy()
			epSlice2.Generation = testcase.trackerGenerationNumber
			esController.endpointSliceTracker.Update(epSlice2)

			err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
			// Check if we got a StaleInformerCache error
			if endpointsliceutil.IsStaleInformerCacheErr(err) != testcase.expectError {
				t.Fatalf("Expected error because informer cache is outdated")
			}

		})
	}
}

func Test_checkNodeTopologyDistribution(t *testing.T) {
	zoneA := "zone-a"
	zoneB := "zone-b"
	zoneC := "zone-c"

	readyTrue := true
	readyFalse := false

	cpu100 := resource.MustParse("100m")
	cpu1000 := resource.MustParse("1000m")
	cpu2000 := resource.MustParse("2000m")

	type nodeInfo struct {
		zoneLabel *string
		ready     *bool
		cpu       *resource.Quantity
	}

	testCases := []struct {
		name                 string
		nodes                []nodeInfo
		topologyCacheEnabled bool
		endpointZoneInfo     map[string]topologycache.EndpointZoneInfo
		expectedQueueLen     int
	}{{
		name:                 "empty",
		nodes:                []nodeInfo{},
		topologyCacheEnabled: false,
		endpointZoneInfo:     map[string]topologycache.EndpointZoneInfo{},
		expectedQueueLen:     0,
	}, {
		name: "lopsided, queue required",
		nodes: []nodeInfo{
			{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu100},
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
		},
		topologyCacheEnabled: true,
		endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
			"ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3},
		},
		expectedQueueLen: 1,
	}, {
		name: "lopsided but 1 unready, queue required because unready node means 0 CPU in one zone",
		nodes: []nodeInfo{
			{zoneLabel: &zoneA, ready: &readyFalse, cpu: &cpu100},
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
		},
		topologyCacheEnabled: true,
		endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
			"ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3},
		},
		expectedQueueLen: 1,
	}, {
		name: "even zones, uneven endpoint distribution but within threshold, no sync required",
		nodes: []nodeInfo{
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
		},
		topologyCacheEnabled: true,
		endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
			"ns/svc1": {zoneB: 5, zoneC: 4},
		},
		expectedQueueLen: 0,
	}, {
		name: "even zones but node missing zone, sync required",
		nodes: []nodeInfo{
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
			{ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
		},
		topologyCacheEnabled: true,
		endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
			"ns/svc1": {zoneB: 5, zoneC: 4},
		},
		expectedQueueLen: 1,
	}, {
		name: "even zones but node missing cpu, sync required",
		nodes: []nodeInfo{
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneB, ready: &readyTrue},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
		},
		topologyCacheEnabled: true,
		endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
			"ns/svc1": {zoneB: 5, zoneC: 4},
		},
		expectedQueueLen: 1,
	}, {
		name: "even zones, uneven endpoint distribution beyond threshold, no sync required",
		nodes: []nodeInfo{
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
		},
		topologyCacheEnabled: true,
		endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
			"ns/svc1": {zoneB: 6, zoneC: 4},
		},
		expectedQueueLen: 1,
	}, {
		name: "3 uneven zones, matching endpoint distribution, no sync required",
		nodes: []nodeInfo{
			{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100},
		},
		topologyCacheEnabled: true,
		endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
			"ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 1},
		},
		expectedQueueLen: 0,
	}, {
		name: "3 uneven zones, endpoint distribution within threshold but below 1, sync required",
		nodes: []nodeInfo{
			{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000},
			{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
			{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100},
		},
		topologyCacheEnabled: true,
		endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
			"ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 0},
		},
		expectedQueueLen: 1,
	}}

	for _, tc := range testCases {
		t.Run(tc.name, func(t *testing.T) {
			_, esController := newController([]string{}, time.Duration(0))

			for i, nodeInfo := range tc.nodes {
				node := &v1.Node{
					ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("node-%d", i)},
					Status:     v1.NodeStatus{},
				}
				if nodeInfo.zoneLabel != nil {
					node.Labels = map[string]string{v1.LabelTopologyZone: *nodeInfo.zoneLabel}
				}
				if nodeInfo.ready != nil {
					status := v1.ConditionFalse
					if *nodeInfo.ready {
						status = v1.ConditionTrue
					}
					node.Status.Conditions = []v1.NodeCondition{{
						Type:   v1.NodeReady,
						Status: status,
					}}
				}
				if nodeInfo.cpu != nil {
					node.Status.Allocatable = v1.ResourceList{
						v1.ResourceCPU: *nodeInfo.cpu,
					}
				}
				esController.nodeStore.Add(node)
				if tc.topologyCacheEnabled {
					esController.topologyCache = topologycache.NewTopologyCache()
					for serviceKey, endpointZoneInfo := range tc.endpointZoneInfo {
						esController.topologyCache.SetHints(serviceKey, discovery.AddressTypeIPv4, endpointZoneInfo)
					}
				}
			}

			esController.checkNodeTopologyDistribution()

			if esController.queue.Len() != tc.expectedQueueLen {
				t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len())
			}
		})
	}
}

// Test helpers
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
	t.Helper()
	for i := 0; i < podsCount; i++ {
		pod := newPod(i, namespace, true, 0, false)
		esController.podStore.Add(pod)
	}
}

func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) {
	t.Helper()
	createService(t, esController, namespace, serviceName)

	err := esController.syncService(fmt.Sprintf("%s/%s", namespace, serviceName))
	assert.Nil(t, err, "Expected no error syncing service")
}

func createService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) *v1.Service {
	t.Helper()
	service := &v1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Name:              serviceName,
			Namespace:         namespace,
			CreationTimestamp: metav1.NewTime(time.Now()),
			UID:               types.UID(namespace + "-" + serviceName),
		},
		Spec: v1.ServiceSpec{
			Ports:      []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
			Selector:   map[string]string{"foo": "bar"},
			IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
		},
	}
	esController.serviceStore.Add(service)
	_, err := esController.client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
	assert.Nil(t, err, "Expected no error creating service")
	return service
}

func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, resource string) {
	t.Helper()
	if len(actions) <= index {
		t.Fatalf("Expected at least %d actions, got %d", index+1, len(actions))
	}

	action := actions[index]
	if action.GetVerb() != verb {
		t.Errorf("Expected action %d verb to be %s, got %s", index, verb, action.GetVerb())
	}

	if action.GetResource().Resource != resource {
		t.Errorf("Expected action %d resource to be %s, got %s", index, resource, action.GetResource().Resource)
	}
}

// protoPtr takes a Protocol and returns a pointer to it.
func protoPtr(proto v1.Protocol) *v1.Protocol {
	return &proto
}

// cacheMutationCheck helps ensure that cached objects have not been changed
// in any way throughout a test run.
type cacheMutationCheck struct {
	objects []cacheObject
}

// cacheObject stores a reference to an original object as well as a deep copy
// of that object to track any mutations in the original object.
type cacheObject struct {
	original runtime.Object
	deepCopy runtime.Object
}

// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
	cmc := cacheMutationCheck{}
	for _, endpointSlice := range endpointSlices {
		cmc.Add(endpointSlice)
	}
	return cmc
}

// Add appends a runtime.Object and a deep copy of that object into the
// cacheMutationCheck.
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
	cmc.objects = append(cmc.objects, cacheObject{
		original: o,
		deepCopy: o.DeepCopyObject(),
	})
}

// Check verifies that no objects in the cacheMutationCheck have been mutated.
func (cmc *cacheMutationCheck) Check(t *testing.T) {
	for _, o := range cmc.objects {
		if !reflect.DeepEqual(o.original, o.deepCopy) {
			// Cached objects can't be safely mutated and instead should be deep
			// copied before changed in any way.
			t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
		}
	}
}

func Test_dropEndpointSlicesPendingDeletion(t *testing.T) {
	now := metav1.Now()
	endpointSlices := []*discovery.EndpointSlice{
		{
			ObjectMeta: metav1.ObjectMeta{
				Name:              "epSlice1",
				DeletionTimestamp: &now,
			},
		},
		{
			ObjectMeta: metav1.ObjectMeta{
				Name: "epSlice2",
			},
			AddressType: discovery.AddressTypeIPv4,
			Endpoints: []discovery.Endpoint{
				{
					Addresses: []string{"172.18.0.2"},
				},
			},
		},
		{
			ObjectMeta: metav1.ObjectMeta{
				Name: "epSlice3",
			},
			AddressType: discovery.AddressTypeIPv6,
			Endpoints: []discovery.Endpoint{
				{
					Addresses: []string{"3001:0da8:75a3:0000:0000:8a2e:0370:7334"},
				},
			},
		},
	}

	epSlice2 := endpointSlices[1]
	epSlice3 := endpointSlices[2]

	result := dropEndpointSlicesPendingDeletion(endpointSlices)

	assert.Len(t, result, 2)
	for _, epSlice := range result {
		if epSlice.Name == "epSlice1" {
			t.Errorf("Expected EndpointSlice marked for deletion to be dropped.")
		}
	}

	// We don't use endpointSlices and instead check manually for equality, because
	// `dropEndpointSlicesPendingDeletion` mutates the slice it receives, so it's easy
	// to break this test later. This way, we can be absolutely sure that the result
	// has exactly what we expect it to.
	if !reflect.DeepEqual(epSlice2, result[0]) {
		t.Errorf("EndpointSlice was unexpectedly mutated. Expected: %+v, Mutated: %+v", epSlice2, result[0])
	}
	if !reflect.DeepEqual(epSlice3, result[1]) {
		t.Errorf("EndpointSlice was unexpectedly mutated. Expected: %+v, Mutated: %+v", epSlice3, result[1])
	}
}

相关信息

kubernetes 源码目录

相关文章

kubernetes endpointslice_controller 源码

kubernetes reconciler 源码

kubernetes reconciler_test 源码

kubernetes utils 源码

kubernetes utils_test 源码

0  赞