-
Notifications
You must be signed in to change notification settings - Fork 0
Automated Test: dual-storage-enhanced #334
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,8 +2,9 @@ package rest | |||||||||||||
|
|
||||||||||||||
| import ( | ||||||||||||||
| "context" | ||||||||||||||
| "errors" | ||||||||||||||
| "time" | ||||||||||||||
|
|
||||||||||||||
| apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||||||||||
| metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" | ||||||||||||||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||||||||||
| "k8s.io/apimachinery/pkg/runtime" | ||||||||||||||
|
|
@@ -21,114 +22,151 @@ type DualWriterMode3 struct { | |||||||||||||
| // newDualWriterMode3 returns a new DualWriter in mode 3. | ||||||||||||||
| // Mode 3 represents writing to LegacyStorage and Storage and reading from Storage. | ||||||||||||||
| func newDualWriterMode3(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics) *DualWriterMode3 { | ||||||||||||||
| return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3"), dualWriterMetrics: dwm} | ||||||||||||||
| return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3").WithValues("mode", mode3Str), dualWriterMetrics: dwm} | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Mode returns the mode of the dual writer. | ||||||||||||||
| func (d *DualWriterMode3) Mode() DualWriterMode { | ||||||||||||||
| return Mode3 | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| const mode3Str = "3" | ||||||||||||||
|
|
||||||||||||||
| // Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage. | ||||||||||||||
| func (d *DualWriterMode3) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { | ||||||||||||||
| log := klog.FromContext(ctx) | ||||||||||||||
| var method = "create" | ||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "method", method) | ||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||
|
|
||||||||||||||
| startStorage := time.Now() | ||||||||||||||
| created, err := d.Storage.Create(ctx, obj, createValidation, options) | ||||||||||||||
| if err != nil { | ||||||||||||||
| log.Error(err, "unable to create object in storage") | ||||||||||||||
| d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
| return created, err | ||||||||||||||
| } | ||||||||||||||
| d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
|
|
||||||||||||||
| if _, err := d.Legacy.Create(ctx, obj, createValidation, options); err != nil { | ||||||||||||||
| log.WithValues("object", created).Error(err, "unable to create object in legacy storage") | ||||||||||||||
| } | ||||||||||||||
| return created, nil | ||||||||||||||
| go func() { | ||||||||||||||
| ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy create timeout")) | ||||||||||||||
| defer cancel() | ||||||||||||||
|
|
||||||||||||||
| startLegacy := time.Now() | ||||||||||||||
| _, errObjectSt := d.Legacy.Create(ctx, obj, createValidation, options) | ||||||||||||||
| d.recordLegacyDuration(errObjectSt != nil, mode3Str, options.Kind, method, startLegacy) | ||||||||||||||
| }() | ||||||||||||||
|
|
||||||||||||||
| return created, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Get overrides the behavior of the generic DualWriter and retrieves an object from Storage. | ||||||||||||||
| func (d *DualWriterMode3) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { | ||||||||||||||
| return d.Storage.Get(ctx, name, &metav1.GetOptions{}) | ||||||||||||||
| var method = "get" | ||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "name", name, "method", method) | ||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||
|
|
||||||||||||||
| startStorage := time.Now() | ||||||||||||||
| res, err := d.Storage.Get(ctx, name, options) | ||||||||||||||
| if err != nil { | ||||||||||||||
| log.Error(err, "unable to get object in storage") | ||||||||||||||
| } | ||||||||||||||
| d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
|
|
||||||||||||||
| return res, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { | ||||||||||||||
| log := d.Log.WithValues("name", name) | ||||||||||||||
| // List overrides the behavior of the generic DualWriter and reads only from Unified Store. | ||||||||||||||
| func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { | ||||||||||||||
| var method = "list" | ||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "method", method) | ||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||
|
|
||||||||||||||
| deleted, async, err := d.Storage.Delete(ctx, name, deleteValidation, options) | ||||||||||||||
| startStorage := time.Now() | ||||||||||||||
| res, err := d.Storage.List(ctx, options) | ||||||||||||||
| if err != nil { | ||||||||||||||
| if !apierrors.IsNotFound(err) { | ||||||||||||||
| log.Error(err, "could not delete from unified store") | ||||||||||||||
| return deleted, async, err | ||||||||||||||
| } | ||||||||||||||
| log.Error(err, "unable to list object in storage") | ||||||||||||||
| } | ||||||||||||||
| d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
|
|
||||||||||||||
| _, _, errLS := d.Legacy.Delete(ctx, name, deleteValidation, options) | ||||||||||||||
| if errLS != nil { | ||||||||||||||
| if !apierrors.IsNotFound(errLS) { | ||||||||||||||
| log.WithValues("deleted", deleted).Error(errLS, "could not delete from legacy store") | ||||||||||||||
| } | ||||||||||||||
| return res, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { | ||||||||||||||
| var method = "delete" | ||||||||||||||
| log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method) | ||||||||||||||
| ctx = klog.NewContext(ctx, d.Log) | ||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent logger usage in context. This line uses 🐛 Proposed fix- ctx = klog.NewContext(ctx, d.Log)
+ ctx = klog.NewContext(ctx, log)🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| startStorage := time.Now() | ||||||||||||||
| res, async, err := d.Storage.Delete(ctx, name, deleteValidation, options) | ||||||||||||||
| if err != nil { | ||||||||||||||
| log.Error(err, "unable to delete object in storage") | ||||||||||||||
| d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
| return res, async, err | ||||||||||||||
| } | ||||||||||||||
| d.recordStorageDuration(false, mode3Str, name, method, startStorage) | ||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent metric label: Line 103 correctly uses 🐛 Proposed fix- d.recordStorageDuration(false, mode3Str, name, method, startStorage)
+ d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| return deleted, async, err | ||||||||||||||
| go func() { | ||||||||||||||
| startLegacy := time.Now() | ||||||||||||||
| ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy delete timeout")) | ||||||||||||||
| defer cancel() | ||||||||||||||
| _, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options) | ||||||||||||||
| d.recordLegacyDuration(err != nil, mode3Str, options.Kind, method, startLegacy) | ||||||||||||||
| }() | ||||||||||||||
|
|
||||||||||||||
| return res, async, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Update overrides the behavior of the generic DualWriter and writes first to Storage and then to LegacyStorage. | ||||||||||||||
| func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { | ||||||||||||||
| log := d.Log.WithValues("name", name) | ||||||||||||||
| var method = "update" | ||||||||||||||
| log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method) | ||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||
| old, err := d.Storage.Get(ctx, name, &metav1.GetOptions{}) | ||||||||||||||
| if err != nil { | ||||||||||||||
| log.WithValues("object", old).Error(err, "could not get object to update") | ||||||||||||||
| return nil, false, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| updated, err := objInfo.UpdatedObject(ctx, old) | ||||||||||||||
| startStorage := time.Now() | ||||||||||||||
| res, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) | ||||||||||||||
| if err != nil { | ||||||||||||||
| log.WithValues("object", updated).Error(err, "could not update or create object") | ||||||||||||||
| return nil, false, err | ||||||||||||||
| } | ||||||||||||||
| objInfo = &updateWrapper{ | ||||||||||||||
| upstream: objInfo, | ||||||||||||||
| updated: updated, | ||||||||||||||
| log.Error(err, "unable to update in storage") | ||||||||||||||
| d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
| return res, async, err | ||||||||||||||
|
Comment on lines
+128
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect metric recording on storage error. Same issue as in 🐛 Proposed fix if err != nil {
log.Error(err, "unable to update in storage")
- d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage)
+ d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage)
return res, async, err
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
| } | ||||||||||||||
| d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
|
|
||||||||||||||
| obj, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) | ||||||||||||||
| if err != nil { | ||||||||||||||
| log.WithValues("object", obj).Error(err, "could not write to US") | ||||||||||||||
| return obj, created, err | ||||||||||||||
| } | ||||||||||||||
| go func() { | ||||||||||||||
| ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy update timeout")) | ||||||||||||||
|
|
||||||||||||||
| _, _, errLeg := d.Legacy.Update(ctx, name, &updateWrapper{ | ||||||||||||||
| upstream: objInfo, | ||||||||||||||
| updated: obj, | ||||||||||||||
| }, createValidation, updateValidation, forceAllowCreate, options) | ||||||||||||||
| if errLeg != nil { | ||||||||||||||
| log.Error(errLeg, "could not update object in legacy store") | ||||||||||||||
| } | ||||||||||||||
| return obj, created, err | ||||||||||||||
| startLegacy := time.Now() | ||||||||||||||
| defer cancel() | ||||||||||||||
| _, _, errObjectSt := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) | ||||||||||||||
| d.recordLegacyDuration(errObjectSt != nil, mode3Str, options.Kind, method, startLegacy) | ||||||||||||||
| }() | ||||||||||||||
|
|
||||||||||||||
| return res, async, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage. | ||||||||||||||
| func (d *DualWriterMode3) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { | ||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion) | ||||||||||||||
| var method = "delete-collection" | ||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion, "method", method) | ||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||
|
|
||||||||||||||
| deleted, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) | ||||||||||||||
| startStorage := time.Now() | ||||||||||||||
| res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) | ||||||||||||||
| if err != nil { | ||||||||||||||
| log.Error(err, "failed to delete collection successfully from Storage") | ||||||||||||||
| log.Error(err, "unable to delete collection in storage") | ||||||||||||||
| d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
| return res, err | ||||||||||||||
| } | ||||||||||||||
| d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage) | ||||||||||||||
|
|
||||||||||||||
| if deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions); err != nil { | ||||||||||||||
| log.WithValues("deleted", deleted).Error(err, "failed to delete collection successfully from LegacyStorage") | ||||||||||||||
| } | ||||||||||||||
| go func() { | ||||||||||||||
| startLegacy := time.Now() | ||||||||||||||
| ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy deletecollection timeout")) | ||||||||||||||
| defer cancel() | ||||||||||||||
| _, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions) | ||||||||||||||
| d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startLegacy) | ||||||||||||||
|
Comment on lines
+165
to
+166
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect metric function for legacy duration. This calls 🐛 Proposed fix _, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
- d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startLegacy)
+ d.recordLegacyDuration(err != nil, mode3Str, options.Kind, method, startLegacy)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
| }() | ||||||||||||||
|
|
||||||||||||||
| return deleted, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { | ||||||||||||||
| //TODO: implement List | ||||||||||||||
| klog.Error("List not implemented") | ||||||||||||||
| return nil, nil | ||||||||||||||
| return res, err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (d *DualWriterMode3) Destroy() { | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect metric recording on storage error.
When
Storage.Createfails, the code callsrecordLegacyDurationbut should callrecordStorageDuration. The error occurred in storage, not legacy.🐛 Proposed fix
if err != nil { log.Error(err, "unable to create object in storage") - d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage) + d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage) return created, err }📝 Committable suggestion
🤖 Prompt for AI Agents