diff --git a/changelogs/unreleased/8590-Lyndon-Li b/changelogs/unreleased/8590-Lyndon-Li new file mode 100644 index 000000000..ee46f3c5c --- /dev/null +++ b/changelogs/unreleased/8590-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8579, set event burst to block event broadcaster from filtering events \ No newline at end of file diff --git a/pkg/util/kube/event.go b/pkg/util/kube/event.go index de91d3533..8b5fc9c5b 100644 --- a/pkg/util/kube/event.go +++ b/pkg/util/kube/event.go @@ -16,6 +16,7 @@ limitations under the License. package kube import ( + "math" "sync" "time" @@ -60,6 +61,9 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e } res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{ + // Bypass the built-in EventCorrelator's rate filtering, otherwise, the event will be abandoned if the rate exceeds. + // The callers (i.e., data mover pods) have controlled the rate and total number outside. E.g., the progress is designed to be updated every 10 seconds and is changeable. + BurstSize: math.MaxInt32, MaxEvents: 1, MessageFunc: func(event *v1.Event) string { return event.Message diff --git a/pkg/util/kube/event_test.go b/pkg/util/kube/event_test.go index 080d95546..2f24b2515 100644 --- a/pkg/util/kube/event_test.go +++ b/pkg/util/kube/event_test.go @@ -43,9 +43,12 @@ func TestEvent(t *testing.T) { } cases := []struct { - name string - events []testEvent - expected int + name string + events []testEvent + generateDiff int + generateSame int + generateEnding bool + expected int }{ { name: "update events, different message", @@ -116,6 +119,18 @@ func TestEvent(t *testing.T) { }, expected: -1, }, + { + name: "auto generate 200", + generateDiff: 200, + generateEnding: true, + expected: 201, + }, + { + name: "auto generate 200, update", + generateSame: 200, + generateEnding: true, + expected: 2, + }, } shutdownTimeout = time.Second * 5 @@ -143,6 +158,28 @@ func TestEvent(t *testing.T) { _, err = client.CoreV1().Pods("fake-ns").Create(context.Background(), pod, metav1.CreateOptions{}) require.NoError(t, err) + for i := 0; i < tc.generateDiff; i++ { + tc.events = append(tc.events, testEvent{ + reason: fmt.Sprintf("fake-reason-%v", i), + message: fmt.Sprintf("fake-message-%v", i), + }) + } + + for i := 0; i < tc.generateSame; i++ { + tc.events = append(tc.events, testEvent{ + reason: "fake-reason", + message: fmt.Sprintf("fake-message-%v", i), + }) + } + + if tc.generateEnding { + tc.events = append(tc.events, testEvent{ + reason: "fake-ending-reason", + message: "fake-ending-message", + ending: true, + }) + } + for _, e := range tc.events { if e.ending { recorder.EndingEvent(pod, e.warning, e.reason, e.message)