kubernetes nestedpendingoperations_test 源码
kubernetes nestedpendingoperations_test 代码
文件路径:/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nestedpendingoperations
import (
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
const (
// testTimeout is a timeout of goroutines to finish. This _should_ be just a
// "context switch" and it should take several ms, however, Clayton says "We
// have had flakes due to tests that assumed that 15s is long enough to sleep")
testTimeout time.Duration = 1 * time.Minute
// initialOperationWaitTimeShort is the initial amount of time the test will
// wait for an operation to complete (each successive failure results in
// exponential backoff).
initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond
// initialOperationWaitTimeLong is the initial amount of time the test will
// wait for an operation to complete (each successive failure results in
// exponential backoff).
initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
)
func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
// Act
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
// Assert
if err != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
}
}
func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volume1Name := v1.UniqueVolumeName("volume1-name")
volume2Name := v1.UniqueVolumeName("volume2-name")
// Act
err1 := grm.Run(volume1Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
err2 := grm.Run(volume2Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
// Assert
if err1 != nil {
t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1)
}
if err2 != nil {
t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2)
}
}
func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1PodName := volumetypes.UniquePodName("operation1-podname")
operation2PodName := volumetypes.UniquePodName("operation2-podname")
// Act
err1 := grm.Run(volumeName, operation1PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
err2 := grm.Run(volumeName, operation2PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
// Assert
if err1 != nil {
t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1)
}
if err2 != nil {
t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2)
}
}
func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
// Act
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
// Assert
if err != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
}
}
func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{})
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
<-operation1DoneCh // Force operation1 to complete
// Act
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err != nil {
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{})
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
<-operation1DoneCh // Force operation1 to complete
// Act
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err != nil {
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1 := panicFunc
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err != nil {
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1 := panicFunc
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
func() (bool, error) {
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err != nil {
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
}
}
func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
op1Name := "mount_volume"
operation1 := errorFunc
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
// Shorter than exponential backoff period, so as to trigger exponential backoff error on second
// operation.
operation2 := errorFunc
err2 := retryWithExponentialBackOff(
initialOperationWaitTimeShort,
func() (bool, error) {
err := grm.Run(volumeName,
EmptyUniquePodName,
EmptyNodeName,
volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name})
if exponentialbackoff.IsExponentialBackoff(err) {
return true, nil
}
return false, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name)
}
operation3 := noopFunc
op3Name := "unmount_volume"
// Act
err3 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name})
if err3 != nil {
t.Fatalf("NestedPendingOperations failed. Expected <no error> Actual: <%v>", err3)
}
}
func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operationPodName := volumetypes.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
}
}
func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operationPodName := volumetypes.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
}
}
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
}
}
func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
operation3 := noopFunc
// Act
err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
}
// Act
operation1DoneCh <- true // Force operation1 to complete
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3})
if err != nil {
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err3 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
}
}
func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
operation3 := noopFunc
// Act
err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
}
// Act
operation1DoneCh <- true // Force operation1 to complete
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3})
if err != nil {
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err3 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
}
}
func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) {
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.Wait()
waitDoneCh <- true
}()
// Assert
err := waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}
func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.Wait()
waitDoneCh <- true
}()
// Assert
err := waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}
func Test_NestedPendingOperations_Positive_Wait(t *testing.T) {
// Test that Wait() really blocks until the last operation succeeds
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
}
// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.Wait()
waitDoneCh <- true
}()
// Finish the operation
operation1DoneCh <- true
// Assert
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}
func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) {
// Test that Wait() really blocks until the last operation succeeds
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
}
// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.Wait()
waitDoneCh <- true
}()
// Finish the operation
operation1DoneCh <- true
// Assert
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}
/* Concurrent operations tests */
// "None" means volume, pod, and node names are all empty
// "Volume" means volume name is set, but pod name and node name are empty
// "Volume Pod" means volume and pod names are set, but the node name is empty
// "Volume Node" means volume and node names are set, but the pod name is empty
// The same volume, pod, and node names are used (where they are not empty).
// Covered cases:
// FIRST OP | SECOND OP | RESULT
// None | None | Positive
// None | Volume | Positive
// None | Volume Pod | Positive
// None | Volume Node | Positive
// Volume | None | Positive
// Volume | Volume | Negative (covered in Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes above)
// Volume | Volume Pod | Negative
// Volume | Volume Node | Negative
// Volume Pod | None | Positive
// Volume Pod | Volume | Negative
// Volume Pod | Volume Pod | Negative (covered in Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes above)
// Volume Node | None | Positive
// Volume Node | Volume | Negative
// Volume Node | Volume Node | Negative
// These cases are not covered because they will never occur within the same
// binary, so either result works.
// Volume Pod | Volume Node
// Volume Node | Volume Pod
func Test_NestedPendingOperations_SecondOpBeforeFirstCompletes(t *testing.T) {
const (
keyNone = iota
keyVolume
keyVolumePod
keyVolumeNode
)
type testCase struct {
testID int
keyTypes []int // only 2 elements are supported
expectPass bool
}
tests := []testCase{
{testID: 1, keyTypes: []int{keyNone, keyNone}, expectPass: true},
{testID: 2, keyTypes: []int{keyNone, keyVolume}, expectPass: true},
{testID: 3, keyTypes: []int{keyNone, keyVolumePod}, expectPass: true},
{testID: 4, keyTypes: []int{keyNone, keyVolumeNode}, expectPass: true},
{testID: 5, keyTypes: []int{keyVolume, keyNone}, expectPass: true},
{testID: 6, keyTypes: []int{keyVolume, keyVolumePod}, expectPass: false},
{testID: 7, keyTypes: []int{keyVolume, keyVolumeNode}, expectPass: false},
{testID: 8, keyTypes: []int{keyVolumePod, keyNone}, expectPass: true},
{testID: 9, keyTypes: []int{keyVolumePod, keyVolume}, expectPass: false},
{testID: 10, keyTypes: []int{keyVolumeNode, keyNone}, expectPass: true},
{testID: 11, keyTypes: []int{keyVolumeNode, keyVolume}, expectPass: false},
{testID: 12, keyTypes: []int{keyVolumeNode, keyVolumeNode}, expectPass: false},
}
for _, test := range tests {
var (
volumeNames []v1.UniqueVolumeName
podNames []volumetypes.UniquePodName
nodeNames []types.NodeName
)
for _, keyType := range test.keyTypes {
var (
v v1.UniqueVolumeName
p volumetypes.UniquePodName
n types.NodeName
)
switch keyType {
case keyNone:
v = EmptyUniqueVolumeName
p = EmptyUniquePodName
n = EmptyNodeName
case keyVolume:
v = v1.UniqueVolumeName("volume-name")
p = EmptyUniquePodName
n = EmptyNodeName
case keyVolumePod:
v = v1.UniqueVolumeName("volume-name")
p = volumetypes.UniquePodName("operation-podname")
n = EmptyNodeName
case keyVolumeNode:
v = v1.UniqueVolumeName("volume-name")
p = EmptyUniquePodName
n = types.NodeName("operation-nodename")
}
volumeNames = append(volumeNames, v)
podNames = append(podNames, p)
nodeNames = append(nodeNames, n)
}
t.Run(fmt.Sprintf("Test %d", test.testID), func(t *testing.T) {
if test.expectPass {
testConcurrentOperationsPositive(t,
volumeNames[0], podNames[0], nodeNames[0],
volumeNames[1], podNames[1], nodeNames[1],
)
} else {
testConcurrentOperationsNegative(t,
volumeNames[0], podNames[0], nodeNames[0],
volumeNames[1], podNames[1], nodeNames[1],
)
}
})
}
}
func Test_NestedPendingOperations_Positive_Issue_88355(t *testing.T) {
// This test reproduces the scenario that is likely to have caused
// kubernetes/kubernetes issue #88355.
// Please refer to the issue for more context:
// https://github.com/kubernetes/kubernetes/issues/88355
// Below, vx is a volume name, and nx is a node name.
// Operation sequence:
// opZ(v0) starts (operates on a different volume from all other operations)
// op1(v1, n1) starts
// op2(v1, n2) starts
// opZ(v0) ends with success
// op2(v1, n2) ends with an error (exponential backoff should be triggered)
// op1(v1, n1) ends with success
// op3(v1, n2) starts (continuously retried on exponential backoff error)
// op3(v1, n2) ends with success
// op4(v1, n2) starts
// op4(v1, n2) ends with success
const (
mainVolumeName = "main-volume"
opZVolumeName = "other-volume"
node1 = "node1"
node2 = "node2"
// delay after an operation is signaled to finish to ensure it actually
// finishes before running the next operation.
delay = 50 * time.Millisecond
// Replicates the default AttachDetachController reconcile period
reconcilerPeriod = 100 * time.Millisecond
)
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
opZContinueCh := make(chan interface{})
op1ContinueCh := make(chan interface{})
op2ContinueCh := make(chan interface{})
operationZ := generateWaitFunc(opZContinueCh)
operation1 := generateWaitFunc(op1ContinueCh)
operation2 := generateWaitWithErrorFunc(op2ContinueCh)
operation3 := noopFunc
operation4 := noopFunc
errZ := grm.Run(opZVolumeName, "" /* podName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operationZ})
if errZ != nil {
t.Fatalf("NestedPendingOperations failed for operationZ. Expected: <no error> Actual: <%v>", errZ)
}
err1 := grm.Run(mainVolumeName, "" /* podName */, node1, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed for operation1. Expected: <no error> Actual: <%v>", err1)
}
err2 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err2 != nil {
t.Fatalf("NestedPendingOperations failed for operation2. Expected: <no error> Actual: <%v>", err2)
}
opZContinueCh <- true
time.Sleep(delay)
op2ContinueCh <- true
time.Sleep(delay)
op1ContinueCh <- true
time.Sleep(delay)
for {
err3 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation3})
if err3 == nil {
break
} else if !exponentialbackoff.IsExponentialBackoff(err3) {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
}
time.Sleep(reconcilerPeriod)
}
time.Sleep(delay)
err4 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation4})
if err4 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4)
}
}
// testConcurrentOperationsPositive passes if the two operations keyed by the
// provided parameters are executed in parallel, and fails otherwise.
func testConcurrentOperationsPositive(
t *testing.T,
volumeName1 v1.UniqueVolumeName,
podName1 volumetypes.UniquePodName,
nodeName1 types.NodeName,
volumeName2 v1.UniqueVolumeName,
podName2 volumetypes.UniquePodName,
nodeName2 types.NodeName) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}
}
// testConcurrentOperationsNegative passes if the creation of the second
// operation returns an alreadyExists error, and fails otherwise.
func testConcurrentOperationsNegative(
t *testing.T,
volumeName1 v1.UniqueVolumeName,
podName1 volumetypes.UniquePodName,
nodeName1 types.NodeName,
volumeName2 v1.UniqueVolumeName,
podName2 volumetypes.UniquePodName,
nodeName2 types.NodeName) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
operation1DoneCh := make(chan interface{})
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist")
}
if !IsAlreadyExists(err2) {
t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
}
}
/* END concurrent operations tests */
func generateCallbackFunc(done chan<- interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
done <- true
return volumetypes.NewOperationContext(nil, nil, false)
}
}
func generateWaitFunc(done <-chan interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
<-done
return volumetypes.NewOperationContext(nil, nil, false)
}
}
func panicFunc() volumetypes.OperationContext {
panic("testing panic")
}
func errorFunc() volumetypes.OperationContext {
return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false)
}
func generateWaitWithErrorFunc(done <-chan interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
<-done
return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false)
}
}
func noopFunc() volumetypes.OperationContext {
return volumetypes.NewOperationContext(nil, nil, false)
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{
Duration: initialDuration,
Factor: 3,
Jitter: 0,
Steps: 4,
}
return wait.ExponentialBackoff(backoff, fn)
}
func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-ch:
// Success!
return nil
case <-timer.C:
return fmt.Errorf("timeout after %v", timeout)
}
}
func Test_NestedPendingOperations_OperationExists_PendingFirst(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("test-volume")
podName1 := volumetypes.UniquePodName("pod1")
podName2 := volumetypes.UniquePodName("pod2")
podName3 := volumetypes.UniquePodName("pod3")
podName4 := EmptyUniquePodName
nodeName := EmptyNodeName
// delay after an operation is signaled to finish to ensure it actually
// finishes before running the next operation.
delay := 50 * time.Millisecond
// fake operation1 for pod1 failed
operation1DoneCh := make(chan interface{})
operation1 := generateWaitWithErrorFunc(operation1DoneCh)
err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"})
if err1 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
// fake operation2 for pod2 fails
operation2DoneCh := make(chan interface{})
operation2 := generateWaitWithErrorFunc(operation2DoneCh)
err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"})
if err2 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}
// fake operation3 for pod3 pending
operation3DoneCh := make(chan interface{})
operation3 := generateWaitFunc(operation3DoneCh)
defer func() {
close(operation3DoneCh)
}()
err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"})
if err3 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
}
operation1DoneCh <- true
operation2DoneCh <- true
time.Sleep(delay)
// fake operation4 for EmptyUniquePodName should be rejected as operation3 is still pending
operation4DoneCh := make(chan interface{})
operation4 := generateWaitFunc(operation4DoneCh)
defer func() {
close(operation4DoneCh)
}()
err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"})
// Assert
if err4 == nil {
t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist")
}
if !IsAlreadyExists(err4) {
t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err4)
}
}
func Test_NestedPendingOperations_OperationExists_ExactMatchFirstNoPending(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("test-volume")
podName1 := volumetypes.UniquePodName("pod1")
podName2 := volumetypes.UniquePodName("pod2")
podName3 := volumetypes.UniquePodName("pod3")
podName4 := EmptyUniquePodName
nodeName := EmptyNodeName
// delay after an operation is signaled to finish to ensure it actually
// finishes before running the next operation.
delay := 50 * time.Millisecond
backoffDelay := 500 * time.Millisecond
// fake operation1 for pod1 fails
operation1DoneCh := make(chan interface{})
operation1 := generateWaitWithErrorFunc(operation1DoneCh)
err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"})
if err1 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
// fake operation2 for pod2 fails
operation2DoneCh := make(chan interface{})
operation2 := generateWaitWithErrorFunc(operation2DoneCh)
err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"})
if err2 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}
// fake operation3 for pod3 fails
operation3DoneCh := make(chan interface{})
operation3 := generateWaitWithErrorFunc(operation3DoneCh)
err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"})
if err3 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
}
operation1DoneCh <- true
operation2DoneCh <- true
operation3DoneCh <- true
time.Sleep(delay)
// fake operation4 with EmptyUniquePodName fails
operation4DoneCh := make(chan interface{})
operation4 := generateWaitWithErrorFunc(operation4DoneCh)
err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"})
if err4 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4)
}
operation4DoneCh <- true
// operation for pod2 retry
time.Sleep(backoffDelay)
operation5 := noopFunc
err5 := grm.Run(volumeName, podName2, nodeName, volumetypes.GeneratedOperations{OperationFunc: operation5, OperationName: "umount"})
if err5 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err5)
}
time.Sleep(delay)
// Assert
// Operation5 will override operation2, since we successfully finished unmount operation on pod2, it should be removed from operations array
grm.(*nestedPendingOperations).lock.Lock()
defer grm.(*nestedPendingOperations).lock.Unlock()
for _, op := range grm.(*nestedPendingOperations).operations {
if op.key.podName == podName2 {
t.Errorf("NestedPendingOperations failed. Operation for pod2 should be removed")
}
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦