kubernetes endpointslice_controller_test 源码
kubernetes endpointslice_controller_test 代码
文件路径:/pkg/controller/endpointslice/endpointslice_controller_test.go
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpointslice
import (
"context"
"fmt"
"reflect"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"
)
// Most of the tests related to EndpointSlice allocation can be found in reconciler_test.go
// Tests here primarily focus on unique controller functionality before the reconciler begins
var alwaysReady = func() bool { return true }
type endpointSliceController struct {
*Controller
endpointSliceStore cache.Store
nodeStore cache.Store
podStore cache.Store
serviceStore cache.Store
}
func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceController) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
nodeInformer := informerFactory.Core().V1().Nodes()
indexer := nodeInformer.Informer().GetIndexer()
for _, nodeName := range nodeNames {
indexer.Add(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
}
esInformer := informerFactory.Discovery().V1().EndpointSlices()
esIndexer := esInformer.Informer().GetIndexer()
// These reactors are required to mock functionality that would be covered
// automatically if we weren't using the fake client.
client.PrependReactor("create", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
if endpointSlice.ObjectMeta.GenerateName != "" {
endpointSlice.ObjectMeta.Name = fmt.Sprintf("%s-%s", endpointSlice.ObjectMeta.GenerateName, rand.String(8))
endpointSlice.ObjectMeta.GenerateName = ""
}
endpointSlice.Generation = 1
esIndexer.Add(endpointSlice)
return false, endpointSlice, nil
}))
client.PrependReactor("update", "endpointslices", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
endpointSlice := action.(k8stesting.CreateAction).GetObject().(*discovery.EndpointSlice)
endpointSlice.Generation++
esIndexer.Update(endpointSlice)
return false, endpointSlice, nil
}))
esController := NewController(
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Services(),
nodeInformer,
esInformer,
int32(100),
client,
batchPeriod)
esController.nodesSynced = alwaysReady
esController.podsSynced = alwaysReady
esController.servicesSynced = alwaysReady
esController.endpointSlicesSynced = alwaysReady
return client, &endpointSliceController{
esController,
informerFactory.Discovery().V1().EndpointSlices().Informer().GetStore(),
informerFactory.Core().V1().Nodes().Informer().GetStore(),
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Core().V1().Services().Informer().GetStore(),
}
}
// Ensure SyncService for service with no selector results in no action
func TestSyncServiceNoSelector(t *testing.T) {
ns := metav1.NamespaceDefault
serviceName := "testing-1"
client, esController := newController([]string{"node-1"}, time.Duration(0))
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
},
})
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
assert.NoError(t, err)
assert.Len(t, client.Actions(), 0)
}
// Ensure SyncService for service with pending deletion results in no action
func TestSyncServicePendingDeletion(t *testing.T) {
ns := metav1.NamespaceDefault
serviceName := "testing-1"
deletionTimestamp := metav1.Now()
client, esController := newController([]string{"node-1"}, time.Duration(0))
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns, DeletionTimestamp: &deletionTimestamp},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
},
})
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
assert.NoError(t, err)
assert.Len(t, client.Actions(), 0)
}
// Ensure SyncService for service with selector but no pods results in placeholder EndpointSlice
func TestSyncServiceWithSelector(t *testing.T) {
ns := metav1.NamespaceDefault
serviceName := "testing-1"
client, esController := newController([]string{"node-1"}, time.Duration(0))
standardSyncService(t, esController, ns, serviceName)
expectActions(t, client.Actions(), 1, "create", "endpointslices")
sliceList, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
assert.Nil(t, err, "Expected no error fetching endpoint slices")
assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices")
slice := sliceList.Items[0]
assert.Regexp(t, "^"+serviceName, slice.Name)
assert.Equal(t, serviceName, slice.Labels[discovery.LabelServiceName])
assert.EqualValues(t, []discovery.EndpointPort{}, slice.Ports)
assert.EqualValues(t, []discovery.Endpoint{}, slice.Endpoints)
assert.NotEmpty(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"])
}
// Ensure SyncService gracefully handles a missing service. This test also
// populates another existing service to ensure a clean up process doesn't
// remove too much.
func TestSyncServiceMissing(t *testing.T) {
namespace := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, time.Duration(0))
// Build up existing service
existingServiceName := "stillthere"
existingServiceKey := endpointutil.ServiceKey{Name: existingServiceName, Namespace: namespace}
esController.triggerTimeTracker.ServiceStates[existingServiceKey] = endpointutil.ServiceState{}
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: existingServiceName, Namespace: namespace},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
Selector: map[string]string{"foo": "bar"},
},
})
// Add missing service to triggerTimeTracker to ensure the reference is cleaned up
missingServiceName := "notthere"
missingServiceKey := endpointutil.ServiceKey{Name: missingServiceName, Namespace: namespace}
esController.triggerTimeTracker.ServiceStates[missingServiceKey] = endpointutil.ServiceState{}
err := esController.syncService(fmt.Sprintf("%s/%s", namespace, missingServiceName))
// nil should be returned when the service doesn't exist
assert.Nil(t, err, "Expected no error syncing service")
// That should mean no client actions were performed
assert.Len(t, client.Actions(), 0)
// TriggerTimeTracker should have removed the reference to the missing service
assert.NotContains(t, esController.triggerTimeTracker.ServiceStates, missingServiceKey)
// TriggerTimeTracker should have left the reference to the missing service
assert.Contains(t, esController.triggerTimeTracker.ServiceStates, existingServiceKey)
}
// Ensure SyncService correctly selects Pods.
func TestSyncServicePodSelection(t *testing.T) {
client, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
pod1 := newPod(1, ns, true, 0, false)
esController.podStore.Add(pod1)
// ensure this pod will not match the selector
pod2 := newPod(2, ns, true, 0, false)
pod2.Labels["foo"] = "boo"
esController.podStore.Add(pod2)
standardSyncService(t, esController, ns, "testing-1")
expectActions(t, client.Actions(), 1, "create", "endpointslices")
// an endpoint slice should be created, it should only reference pod1 (not pod2)
slices, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
assert.Nil(t, err, "Expected no error fetching endpoint slices")
assert.Len(t, slices.Items, 1, "Expected 1 endpoint slices")
slice := slices.Items[0]
assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoint in first slice")
assert.NotEmpty(t, slice.Annotations[v1.EndpointsLastChangeTriggerTime])
endpoint := slice.Endpoints[0]
assert.EqualValues(t, endpoint.TargetRef, &v1.ObjectReference{Kind: "Pod", Namespace: ns, Name: pod1.Name})
}
func TestSyncServiceEndpointSlicePendingDeletion(t *testing.T) {
client, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
serviceName := "testing-1"
service := createService(t, esController, ns, serviceName)
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
assert.Nil(t, err, "Expected no error syncing service")
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
ownerRef := metav1.NewControllerRef(service, gvk)
deletedTs := metav1.Now()
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "epSlice-1",
Namespace: ns,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
discovery.LabelManagedBy: controllerName,
},
DeletionTimestamp: &deletedTs,
},
AddressType: discovery.AddressTypeIPv4,
}
err = esController.endpointSliceStore.Add(endpointSlice)
if err != nil {
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
}
_, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Expected no error creating EndpointSlice: %v", err)
}
numActionsBefore := len(client.Actions())
err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
assert.Nil(t, err, "Expected no error syncing service")
// The EndpointSlice marked for deletion should be ignored by the controller, and thus
// should not result in any action.
if len(client.Actions()) != numActionsBefore {
t.Errorf("Expected 0 more actions, got %d", len(client.Actions())-numActionsBefore)
}
}
// Ensure SyncService correctly selects and labels EndpointSlices.
func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
client, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
serviceName := "testing-1"
service := createService(t, esController, ns, serviceName)
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
ownerRef := metav1.NewControllerRef(service, gvk)
// 5 slices, 3 with matching labels for our service
endpointSlices := []*discovery.EndpointSlice{{
ObjectMeta: metav1.ObjectMeta{
Name: "matching-1",
Namespace: ns,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
discovery.LabelManagedBy: controllerName,
},
},
AddressType: discovery.AddressTypeIPv4,
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "matching-2",
Namespace: ns,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
discovery.LabelManagedBy: controllerName,
},
},
AddressType: discovery.AddressTypeIPv4,
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "partially-matching-1",
Namespace: ns,
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
},
},
AddressType: discovery.AddressTypeIPv4,
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "not-matching-1",
Namespace: ns,
Labels: map[string]string{
discovery.LabelServiceName: "something-else",
discovery.LabelManagedBy: controllerName,
},
},
AddressType: discovery.AddressTypeIPv4,
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "not-matching-2",
Namespace: ns,
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
discovery.LabelManagedBy: "something-else",
},
},
AddressType: discovery.AddressTypeIPv4,
}}
cmc := newCacheMutationCheck(endpointSlices)
// need to add them to both store and fake clientset
for _, endpointSlice := range endpointSlices {
err := esController.endpointSliceStore.Add(endpointSlice)
if err != nil {
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
}
_, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Expected no error creating EndpointSlice: %v", err)
}
}
numActionsBefore := len(client.Actions())
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
assert.Nil(t, err, "Expected no error syncing service")
if len(client.Actions()) != numActionsBefore+2 {
t.Errorf("Expected 2 more actions, got %d", len(client.Actions())-numActionsBefore)
}
// only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder
expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices")
expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices")
// ensure cache mutation has not occurred
cmc.Check(t)
}
func TestOnEndpointSliceUpdate(t *testing.T) {
_, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
serviceName := "testing-1"
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "matching-1",
Namespace: ns,
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
discovery.LabelManagedBy: controllerName,
},
},
AddressType: discovery.AddressTypeIPv4,
}
epSlice2 := epSlice1.DeepCopy()
epSlice2.Labels[discovery.LabelManagedBy] = "something else"
assert.Equal(t, 0, esController.queue.Len())
esController.onEndpointSliceUpdate(epSlice1, epSlice2)
err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) {
if esController.queue.Len() > 0 {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("unexpected error waiting for add to queue")
}
assert.Equal(t, 1, esController.queue.Len())
}
func TestSyncService(t *testing.T) {
creationTimestamp := metav1.Now()
deletionTimestamp := metav1.Now()
testcases := []struct {
name string
service *v1.Service
pods []*v1.Pod
expectedEndpointPorts []discovery.EndpointPort
expectedEndpoints []discovery.Endpoint
terminatingGateEnabled bool
}{
{
name: "pods with multiple IPs and Service with ipFamilies=ipv4",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "default",
CreationTimestamp: creationTimestamp,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod0",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: nil,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{{
IP: "10.0.0.1",
}},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: nil,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.2",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.2",
},
{
IP: "fd08::5678:0000:0000:9abc:def0",
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
},
expectedEndpointPorts: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("sctp-example"),
Protocol: protoPtr(v1.ProtocolSCTP),
Port: utilpointer.Int32Ptr(int32(3456)),
},
{
Name: utilpointer.StringPtr("udp-example"),
Protocol: protoPtr(v1.ProtocolUDP),
Port: utilpointer.Int32Ptr(int32(161)),
},
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
expectedEndpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
NodeName: utilpointer.StringPtr("node-1"),
},
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Addresses: []string{"10.0.0.2"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
NodeName: utilpointer.StringPtr("node-1"),
},
},
},
{
name: "pods with multiple IPs and Service with ipFamilies=ipv6",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "default",
CreationTimestamp: creationTimestamp,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv6Protocol},
},
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod0",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: nil,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{{
IP: "10.0.0.1",
}},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: nil,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.2",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.2",
},
{
IP: "fd08::5678:0000:0000:9abc:def0",
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
},
expectedEndpointPorts: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("sctp-example"),
Protocol: protoPtr(v1.ProtocolSCTP),
Port: utilpointer.Int32Ptr(int32(3456)),
},
{
Name: utilpointer.StringPtr("udp-example"),
Protocol: protoPtr(v1.ProtocolUDP),
Port: utilpointer.Int32Ptr(int32(161)),
},
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
expectedEndpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Addresses: []string{"fd08::5678:0000:0000:9abc:def0"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
NodeName: utilpointer.StringPtr("node-1"),
},
},
},
{
name: "Terminating pods with EndpointSliceTerminatingCondition enabled",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "default",
CreationTimestamp: creationTimestamp,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
},
pods: []*v1.Pod{
{
// one ready pod for comparison
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod0",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: nil,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{{
IP: "10.0.0.1",
}},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: &deletionTimestamp,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.2",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.2",
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
},
expectedEndpointPorts: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("sctp-example"),
Protocol: protoPtr(v1.ProtocolSCTP),
Port: utilpointer.Int32Ptr(int32(3456)),
},
{
Name: utilpointer.StringPtr("udp-example"),
Protocol: protoPtr(v1.ProtocolUDP),
Port: utilpointer.Int32Ptr(int32(161)),
},
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
expectedEndpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
NodeName: utilpointer.StringPtr("node-1"),
},
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(true),
},
Addresses: []string{"10.0.0.2"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
NodeName: utilpointer.StringPtr("node-1"),
},
},
terminatingGateEnabled: true,
},
{
name: "Terminating pods with EndpointSliceTerminatingCondition disabled",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "default",
CreationTimestamp: creationTimestamp,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
},
pods: []*v1.Pod{
{
// one ready pod for comparison
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod0",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: nil,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{{
IP: "10.0.0.1",
}},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: &deletionTimestamp,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.2",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.2",
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
},
expectedEndpointPorts: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("sctp-example"),
Protocol: protoPtr(v1.ProtocolSCTP),
Port: utilpointer.Int32Ptr(int32(3456)),
},
{
Name: utilpointer.StringPtr("udp-example"),
Protocol: protoPtr(v1.ProtocolUDP),
Port: utilpointer.Int32Ptr(int32(161)),
},
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
expectedEndpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
NodeName: utilpointer.StringPtr("node-1"),
},
},
terminatingGateEnabled: false,
},
{
name: "Not ready terminating pods with EndpointSliceTerminatingCondition enabled",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "default",
CreationTimestamp: creationTimestamp,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
},
pods: []*v1.Pod{
{
// one ready pod for comparison
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod0",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: nil,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{{
IP: "10.0.0.1",
}},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: &deletionTimestamp,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.2",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.2",
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
},
},
},
},
},
expectedEndpointPorts: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("sctp-example"),
Protocol: protoPtr(v1.ProtocolSCTP),
Port: utilpointer.Int32Ptr(int32(3456)),
},
{
Name: utilpointer.StringPtr("udp-example"),
Protocol: protoPtr(v1.ProtocolUDP),
Port: utilpointer.Int32Ptr(int32(161)),
},
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
expectedEndpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
Serving: utilpointer.BoolPtr(true),
Terminating: utilpointer.BoolPtr(false),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
NodeName: utilpointer.StringPtr("node-1"),
},
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
Terminating: utilpointer.BoolPtr(true),
},
Addresses: []string{"10.0.0.2"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
NodeName: utilpointer.StringPtr("node-1"),
},
},
terminatingGateEnabled: true,
},
{
name: "Not ready terminating pods with EndpointSliceTerminatingCondition disabled",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "default",
CreationTimestamp: creationTimestamp,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{Name: "tcp-example", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP},
{Name: "udp-example", TargetPort: intstr.FromInt(161), Protocol: v1.ProtocolUDP},
{Name: "sctp-example", TargetPort: intstr.FromInt(3456), Protocol: v1.ProtocolSCTP},
},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
},
pods: []*v1.Pod{
{
// one ready pod for comparison
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod0",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: nil,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{{
IP: "10.0.0.1",
}},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{"foo": "bar"},
DeletionTimestamp: &deletionTimestamp,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "container-1",
}},
NodeName: "node-1",
},
Status: v1.PodStatus{
PodIP: "10.0.0.2",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.2",
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
},
},
},
},
},
expectedEndpointPorts: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("sctp-example"),
Protocol: protoPtr(v1.ProtocolSCTP),
Port: utilpointer.Int32Ptr(int32(3456)),
},
{
Name: utilpointer.StringPtr("udp-example"),
Protocol: protoPtr(v1.ProtocolUDP),
Port: utilpointer.Int32Ptr(int32(161)),
},
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
expectedEndpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Addresses: []string{"10.0.0.1"},
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
NodeName: utilpointer.StringPtr("node-1"),
},
},
terminatingGateEnabled: false,
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testcase.terminatingGateEnabled)()
client, esController := newController([]string{"node-1"}, time.Duration(0))
for _, pod := range testcase.pods {
esController.podStore.Add(pod)
}
esController.serviceStore.Add(testcase.service)
_, err := esController.client.CoreV1().Services(testcase.service.Namespace).Create(context.TODO(), testcase.service, metav1.CreateOptions{})
assert.Nil(t, err, "Expected no error creating service")
err = esController.syncService(fmt.Sprintf("%s/%s", testcase.service.Namespace, testcase.service.Name))
assert.Nil(t, err)
// last action should be to create endpoint slice
expectActions(t, client.Actions(), 1, "create", "endpointslices")
sliceList, err := client.DiscoveryV1().EndpointSlices(testcase.service.Namespace).List(context.TODO(), metav1.ListOptions{})
assert.Nil(t, err, "Expected no error fetching endpoint slices")
assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices")
// ensure all attributes of endpoint slice match expected state
slice := sliceList.Items[0]
assert.Equal(t, slice.Annotations[v1.EndpointsLastChangeTriggerTime], creationTimestamp.UTC().Format(time.RFC3339Nano))
assert.ElementsMatch(t, testcase.expectedEndpointPorts, slice.Ports)
assert.ElementsMatch(t, testcase.expectedEndpoints, slice.Endpoints)
})
}
}
// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodAddsBatching(t *testing.T) {
t.Parallel()
type podAdd struct {
delay time.Duration
}
tests := []struct {
name string
batchPeriod time.Duration
adds []podAdd
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three adds with no batching",
batchPeriod: 0 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three adds in one batch",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three adds in two batches",
batchPeriod: 1 * time.Second,
adds: []podAdd{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
},
{
delay: 100 * time.Millisecond,
},
{
delay: 1 * time.Second,
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for i, add := range tc.adds {
time.Sleep(add.delay)
p := newPod(i, ns, true, 0, false)
esController.podStore.Add(p)
esController.addPod(p)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodUpdatesBatching(t *testing.T) {
t.Parallel()
resourceVersion := 1
type podUpdate struct {
delay time.Duration
podName string
podIP string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
updates []podUpdate
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three updates with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three updates in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three updates in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
updates: []podUpdate{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
podIP: "10.0.0.0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
podIP: "10.0.0.1",
},
{
delay: 1 * time.Second,
podName: "pod2",
podIP: "10.0.0.2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
addPods(t, esController, ns, tc.podsCount)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.updates {
time.Sleep(update.delay)
old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
if err != nil {
t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
}
if !exists {
t.Fatalf("Pod %q doesn't exist", update.podName)
}
oldPod := old.(*v1.Pod)
newPod := oldPod.DeepCopy()
newPod.Status.PodIPs[0].IP = update.podIP
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
resourceVersion++
esController.podStore.Update(newPod)
esController.updatePod(oldPod, newPod)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
// TODO(mborsz): Migrate this test to mock clock when possible.
func TestPodDeleteBatching(t *testing.T) {
t.Parallel()
type podDelete struct {
delay time.Duration
podName string
}
tests := []struct {
name string
batchPeriod time.Duration
podsCount int
deletes []podDelete
finalDelay time.Duration
wantRequestCount int
}{
{
name: "three deletes with no batching",
batchPeriod: 0 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 3,
},
{
name: "three deletes in one batch",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 100 * time.Millisecond,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 1,
},
{
name: "three deletes in two batches",
batchPeriod: 1 * time.Second,
podsCount: 10,
deletes: []podDelete{
{
// endpoints.Run needs ~100 ms to start processing updates.
delay: 200 * time.Millisecond,
podName: "pod0",
},
{
delay: 100 * time.Millisecond,
podName: "pod1",
},
{
delay: 1 * time.Second,
podName: "pod2",
},
},
finalDelay: 3 * time.Second,
wantRequestCount: 2,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ns := metav1.NamespaceDefault
client, esController := newController([]string{"node-1"}, tc.batchPeriod)
stopCh := make(chan struct{})
defer close(stopCh)
go esController.Run(1, stopCh)
addPods(t, esController, ns, tc.podsCount)
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
Ports: []v1.ServicePort{{Port: 80}},
},
})
for _, update := range tc.deletes {
time.Sleep(update.delay)
old, exists, err := esController.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
assert.Nil(t, err, "error while retrieving old value of %q: %v", update.podName, err)
assert.Equal(t, true, exists, "pod should exist")
esController.podStore.Delete(old)
esController.deletePod(old)
}
time.Sleep(tc.finalDelay)
assert.Len(t, client.Actions(), tc.wantRequestCount)
// In case of error, make debugging easier.
for _, action := range client.Actions() {
t.Logf("action: %v %v", action.GetVerb(), action.GetResource())
}
})
}
}
func TestSyncServiceStaleInformer(t *testing.T) {
testcases := []struct {
name string
informerGenerationNumber int64
trackerGenerationNumber int64
expectError bool
}{
{
name: "informer cache outdated",
informerGenerationNumber: 10,
trackerGenerationNumber: 12,
expectError: true,
},
{
name: "cache and tracker synced",
informerGenerationNumber: 10,
trackerGenerationNumber: 10,
expectError: false,
},
{
name: "tracker outdated",
informerGenerationNumber: 10,
trackerGenerationNumber: 1,
expectError: false,
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
_, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
serviceName := "testing-1"
// Store Service in the cache
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
},
})
// Create EndpointSlice in the informer cache with informerGenerationNumber
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "matching-1",
Namespace: ns,
Generation: testcase.informerGenerationNumber,
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
discovery.LabelManagedBy: controllerName,
},
},
AddressType: discovery.AddressTypeIPv4,
}
err := esController.endpointSliceStore.Add(epSlice1)
if err != nil {
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
}
// Create EndpointSlice in the tracker with trackerGenerationNumber
epSlice2 := epSlice1.DeepCopy()
epSlice2.Generation = testcase.trackerGenerationNumber
esController.endpointSliceTracker.Update(epSlice2)
err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
// Check if we got a StaleInformerCache error
if endpointsliceutil.IsStaleInformerCacheErr(err) != testcase.expectError {
t.Fatalf("Expected error because informer cache is outdated")
}
})
}
}
func Test_checkNodeTopologyDistribution(t *testing.T) {
zoneA := "zone-a"
zoneB := "zone-b"
zoneC := "zone-c"
readyTrue := true
readyFalse := false
cpu100 := resource.MustParse("100m")
cpu1000 := resource.MustParse("1000m")
cpu2000 := resource.MustParse("2000m")
type nodeInfo struct {
zoneLabel *string
ready *bool
cpu *resource.Quantity
}
testCases := []struct {
name string
nodes []nodeInfo
topologyCacheEnabled bool
endpointZoneInfo map[string]topologycache.EndpointZoneInfo
expectedQueueLen int
}{{
name: "empty",
nodes: []nodeInfo{},
topologyCacheEnabled: false,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{},
expectedQueueLen: 0,
}, {
name: "lopsided, queue required",
nodes: []nodeInfo{
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu100},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3},
},
expectedQueueLen: 1,
}, {
name: "lopsided but 1 unready, queue required because unready node means 0 CPU in one zone",
nodes: []nodeInfo{
{zoneLabel: &zoneA, ready: &readyFalse, cpu: &cpu100},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3},
},
expectedQueueLen: 1,
}, {
name: "even zones, uneven endpoint distribution but within threshold, no sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneB: 5, zoneC: 4},
},
expectedQueueLen: 0,
}, {
name: "even zones but node missing zone, sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneB: 5, zoneC: 4},
},
expectedQueueLen: 1,
}, {
name: "even zones but node missing cpu, sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneB: 5, zoneC: 4},
},
expectedQueueLen: 1,
}, {
name: "even zones, uneven endpoint distribution beyond threshold, no sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneB: 6, zoneC: 4},
},
expectedQueueLen: 1,
}, {
name: "3 uneven zones, matching endpoint distribution, no sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 1},
},
expectedQueueLen: 0,
}, {
name: "3 uneven zones, endpoint distribution within threshold but below 1, sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 0},
},
expectedQueueLen: 1,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, esController := newController([]string{}, time.Duration(0))
for i, nodeInfo := range tc.nodes {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("node-%d", i)},
Status: v1.NodeStatus{},
}
if nodeInfo.zoneLabel != nil {
node.Labels = map[string]string{v1.LabelTopologyZone: *nodeInfo.zoneLabel}
}
if nodeInfo.ready != nil {
status := v1.ConditionFalse
if *nodeInfo.ready {
status = v1.ConditionTrue
}
node.Status.Conditions = []v1.NodeCondition{{
Type: v1.NodeReady,
Status: status,
}}
}
if nodeInfo.cpu != nil {
node.Status.Allocatable = v1.ResourceList{
v1.ResourceCPU: *nodeInfo.cpu,
}
}
esController.nodeStore.Add(node)
if tc.topologyCacheEnabled {
esController.topologyCache = topologycache.NewTopologyCache()
for serviceKey, endpointZoneInfo := range tc.endpointZoneInfo {
esController.topologyCache.SetHints(serviceKey, discovery.AddressTypeIPv4, endpointZoneInfo)
}
}
}
esController.checkNodeTopologyDistribution()
if esController.queue.Len() != tc.expectedQueueLen {
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len())
}
})
}
}
// Test helpers
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
t.Helper()
for i := 0; i < podsCount; i++ {
pod := newPod(i, namespace, true, 0, false)
esController.podStore.Add(pod)
}
}
func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) {
t.Helper()
createService(t, esController, namespace, serviceName)
err := esController.syncService(fmt.Sprintf("%s/%s", namespace, serviceName))
assert.Nil(t, err, "Expected no error syncing service")
}
func createService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) *v1.Service {
t.Helper()
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: namespace,
CreationTimestamp: metav1.NewTime(time.Now()),
UID: types.UID(namespace + "-" + serviceName),
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
Selector: map[string]string{"foo": "bar"},
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
}
esController.serviceStore.Add(service)
_, err := esController.client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
assert.Nil(t, err, "Expected no error creating service")
return service
}
func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, resource string) {
t.Helper()
if len(actions) <= index {
t.Fatalf("Expected at least %d actions, got %d", index+1, len(actions))
}
action := actions[index]
if action.GetVerb() != verb {
t.Errorf("Expected action %d verb to be %s, got %s", index, verb, action.GetVerb())
}
if action.GetResource().Resource != resource {
t.Errorf("Expected action %d resource to be %s, got %s", index, resource, action.GetResource().Resource)
}
}
// protoPtr takes a Protocol and returns a pointer to it.
func protoPtr(proto v1.Protocol) *v1.Protocol {
return &proto
}
// cacheMutationCheck helps ensure that cached objects have not been changed
// in any way throughout a test run.
type cacheMutationCheck struct {
objects []cacheObject
}
// cacheObject stores a reference to an original object as well as a deep copy
// of that object to track any mutations in the original object.
type cacheObject struct {
original runtime.Object
deepCopy runtime.Object
}
// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
cmc := cacheMutationCheck{}
for _, endpointSlice := range endpointSlices {
cmc.Add(endpointSlice)
}
return cmc
}
// Add appends a runtime.Object and a deep copy of that object into the
// cacheMutationCheck.
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
cmc.objects = append(cmc.objects, cacheObject{
original: o,
deepCopy: o.DeepCopyObject(),
})
}
// Check verifies that no objects in the cacheMutationCheck have been mutated.
func (cmc *cacheMutationCheck) Check(t *testing.T) {
for _, o := range cmc.objects {
if !reflect.DeepEqual(o.original, o.deepCopy) {
// Cached objects can't be safely mutated and instead should be deep
// copied before changed in any way.
t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
}
}
}
func Test_dropEndpointSlicesPendingDeletion(t *testing.T) {
now := metav1.Now()
endpointSlices := []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "epSlice1",
DeletionTimestamp: &now,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "epSlice2",
},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"172.18.0.2"},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "epSlice3",
},
AddressType: discovery.AddressTypeIPv6,
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"3001:0da8:75a3:0000:0000:8a2e:0370:7334"},
},
},
},
}
epSlice2 := endpointSlices[1]
epSlice3 := endpointSlices[2]
result := dropEndpointSlicesPendingDeletion(endpointSlices)
assert.Len(t, result, 2)
for _, epSlice := range result {
if epSlice.Name == "epSlice1" {
t.Errorf("Expected EndpointSlice marked for deletion to be dropped.")
}
}
// We don't use endpointSlices and instead check manually for equality, because
// `dropEndpointSlicesPendingDeletion` mutates the slice it receives, so it's easy
// to break this test later. This way, we can be absolutely sure that the result
// has exactly what we expect it to.
if !reflect.DeepEqual(epSlice2, result[0]) {
t.Errorf("EndpointSlice was unexpectedly mutated. Expected: %+v, Mutated: %+v", epSlice2, result[0])
}
if !reflect.DeepEqual(epSlice3, result[1]) {
t.Errorf("EndpointSlice was unexpectedly mutated. Expected: %+v, Mutated: %+v", epSlice3, result[1])
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
6、 golang
-
8、 openharmony