-
Notifications
You must be signed in to change notification settings - Fork 0
Automated Test: db-cleanup-optimized #342
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ import ( | |
| "time" | ||
|
|
||
| "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" | ||
| "github.com/grafana/grafana/pkg/services/sqlstore/migrator" | ||
|
|
||
| "github.com/grafana/grafana/pkg/infra/db" | ||
| "github.com/grafana/grafana/pkg/infra/log" | ||
|
|
@@ -519,52 +520,135 @@ func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.A | |
| var totalAffected int64 | ||
| if cfg.MaxAge > 0 { | ||
| cutoffDate := timeNow().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond) | ||
| deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s AND created < %v ORDER BY id DESC %s) a)` | ||
| sql := fmt.Sprintf(deleteQuery, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) | ||
| // Single-statement approaches, specifically ones using batched sub-queries, seem to deadlock with concurrent inserts on MySQL. | ||
| // We have a bounded batch size, so work around this by first loading the IDs into memory and allowing any locks to flush inside each batch. | ||
| // This may under-delete when concurrent inserts happen, but any such annotations will simply be cleaned on the next cycle. | ||
| // | ||
| // We execute the following batched operation repeatedly until either we run out of objects, the context is cancelled, or there is an error. | ||
| affected, err := untilDoneOrCancelled(ctx, func() (int64, error) { | ||
| cond := fmt.Sprintf(`%s AND created < %v ORDER BY id DESC %s`, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) | ||
| ids, err := r.fetchIDs(ctx, "annotation", cond) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| r.log.Error("Annotations to clean by time", "count", len(ids), "ids", ids, "cond", cond, "err", err) | ||
|
|
||
| affected, err := r.executeUntilDoneOrCancelled(ctx, sql) | ||
| x, y := r.deleteByIDs(ctx, "annotation", ids) | ||
| r.log.Error("cleaned annotations by time", "count", len(ids), "affected", x, "err", y) | ||
| return x, y | ||
| }) | ||
| totalAffected += affected | ||
| if err != nil { | ||
| return totalAffected, err | ||
| } | ||
| } | ||
|
|
||
| if cfg.MaxCount > 0 { | ||
| deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s ORDER BY id DESC %s) a)` | ||
| sql := fmt.Sprintf(deleteQuery, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount)) | ||
| affected, err := r.executeUntilDoneOrCancelled(ctx, sql) | ||
| // Similar strategy as the above cleanup process, to avoid deadlocks. | ||
| affected, err := untilDoneOrCancelled(ctx, func() (int64, error) { | ||
| cond := fmt.Sprintf(`%s ORDER BY id DESC %s`, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount)) | ||
| ids, err := r.fetchIDs(ctx, "annotation", cond) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| r.log.Error("Annotations to clean by count", "count", len(ids), "ids", ids, "cond", cond, "err", err) | ||
|
|
||
| x, y := r.deleteByIDs(ctx, "annotation", ids) | ||
| r.log.Error("cleaned annotations by count", "count", len(ids), "affected", x, "err", y) | ||
| return x, y | ||
| }) | ||
| totalAffected += affected | ||
| return totalAffected, err | ||
| if err != nil { | ||
| return totalAffected, err | ||
| } | ||
| } | ||
|
|
||
| return totalAffected, nil | ||
| } | ||
|
|
||
| func (r *xormRepositoryImpl) CleanOrphanedAnnotationTags(ctx context.Context) (int64, error) { | ||
| deleteQuery := `DELETE FROM annotation_tag WHERE id IN ( SELECT id FROM (SELECT id FROM annotation_tag WHERE NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s) a)` | ||
| sql := fmt.Sprintf(deleteQuery, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) | ||
| return r.executeUntilDoneOrCancelled(ctx, sql) | ||
| return untilDoneOrCancelled(ctx, func() (int64, error) { | ||
| cond := fmt.Sprintf(`NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s`, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) | ||
| ids, err := r.fetchIDs(ctx, "annotation_tag", cond) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| r.log.Error("Tags to clean", "count", len(ids), "ids", ids, "cond", cond, "err", err) | ||
|
|
||
| x, y := r.deleteByIDs(ctx, "annotation_tag", ids) | ||
| r.log.Error("cleaned tags", "count", len(ids), "affected", x, "err", y) | ||
| return x, y | ||
| }) | ||
| } | ||
|
Comment on lines
569
to
+582
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# First, find migration files related to annotation_tag
find . -type f \( -name "*.sql" -o -name "*.go" \) | grep -i migr | head -20Repository: admin-coderabbit/coderabbit-grafana-mirror Length of output: 1194 🏁 Script executed: #!/bin/bash
# Search for annotationTag struct definition
rg -n "type.*annotationTag" --type=goRepository: admin-coderabbit/coderabbit-grafana-mirror Length of output: 265 🏁 Script executed: #!/bin/bash
# Look for annotation_tag table creation in migrations
rg -n "annotation_tag" -A5 -B5 --type=go | grep -A5 -B5 "CREATE\|AddTable\|table" | head -50Repository: admin-coderabbit/coderabbit-grafana-mirror Length of output: 4561 🏁 Script executed: #!/bin/bash
# Search for the specific struct definition around line 663-666 mentioned in the review
rg -n "annotation_id|tag_id" --type=go -A2 -B2 | grep -A5 -B5 "type"Repository: admin-coderabbit/coderabbit-grafana-mirror Length of output: 1856 🏁 Script executed: #!/bin/bash
# Get full annotation_tag table definition from migration
rg -A20 "annotationTagTableV3 := Table{" pkg/services/sqlstore/migrations/annotation_mig.goRepository: admin-coderabbit/coderabbit-grafana-mirror Length of output: 209 🏁 Script executed: #!/bin/bash
# Also check the v2 definition
rg -B5 -A15 "Create annotation_tag table v2" pkg/services/sqlstore/migrations/annotation_mig.goRepository: admin-coderabbit/coderabbit-grafana-mirror Length of output: 901 🏁 Script executed: #!/bin/bash
# Look for Column definitions in the migration
rg -A30 'annotationTagTable := Table' pkg/services/sqlstore/migrations/annotation_mig.go | head -60Repository: admin-coderabbit/coderabbit-grafana-mirror Length of output: 1197 Add missing The Current struct definitionUpdate the struct to match the schema: 🤖 Prompt for AI Agents |
||
|
|
||
| func (r *xormRepositoryImpl) fetchIDs(ctx context.Context, table, condition string) ([]int64, error) { | ||
| sql := fmt.Sprintf(`SELECT id FROM %s`, table) | ||
| if condition == "" { | ||
| return nil, fmt.Errorf("condition must be supplied; cannot fetch IDs from entire table") | ||
| } | ||
| sql += fmt.Sprintf(` WHERE %s`, condition) | ||
| ids := make([]int64, 0) | ||
| err := r.db.WithDbSession(ctx, func(session *db.Session) error { | ||
| return session.SQL(sql).Find(&ids) | ||
| }) | ||
| return ids, err | ||
| } | ||
|
|
||
| func (r *xormRepositoryImpl) executeUntilDoneOrCancelled(ctx context.Context, sql string) (int64, error) { | ||
| func (r *xormRepositoryImpl) deleteByIDs(ctx context.Context, table string, ids []int64) (int64, error) { | ||
| if len(ids) == 0 { | ||
| return 0, nil | ||
| } | ||
|
|
||
| sql := "" | ||
| args := make([]any, 0) | ||
|
|
||
| // SQLite has a parameter limit of 999. | ||
| // If the batch size is bigger than that, and we're on SQLite, we have to put the IDs directly into the statement. | ||
| const sqliteParameterLimit = 999 | ||
| if r.db.GetDBType() == migrator.SQLite && r.cfg.AnnotationCleanupJobBatchSize > sqliteParameterLimit { | ||
| values := fmt.Sprint(ids[0]) | ||
| for _, v := range ids[1:] { | ||
| values = fmt.Sprintf("%s, %d", values, v) | ||
| } | ||
| sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, values) | ||
| } else { | ||
| placeholders := "?" + strings.Repeat(",?", len(ids)-1) | ||
| sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, placeholders) | ||
| args = asAny(ids) | ||
| } | ||
|
|
||
| var affected int64 | ||
| err := r.db.WithDbSession(ctx, func(session *db.Session) error { | ||
| res, err := session.Exec(append([]any{sql}, args...)...) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| affected, err = res.RowsAffected() | ||
| return err | ||
| }) | ||
| return affected, err | ||
| } | ||
|
|
||
| func asAny(vs []int64) []any { | ||
| r := make([]any, len(vs)) | ||
| for i, v := range vs { | ||
| r[i] = v | ||
| } | ||
| return r | ||
| } | ||
|
|
||
| // untilDoneOrCancelled repeatedly executes batched work until that work is either done (i.e., returns zero affected objects), | ||
| // a batch produces an error, or the provided context is cancelled. | ||
| // The work to be done is given as a callback that returns the number of affected objects for each batch, plus that batch's errors. | ||
| func untilDoneOrCancelled(ctx context.Context, batchWork func() (int64, error)) (int64, error) { | ||
| var totalAffected int64 | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return totalAffected, ctx.Err() | ||
| default: | ||
| var affected int64 | ||
| err := r.db.WithDbSession(ctx, func(session *db.Session) error { | ||
| res, err := session.Exec(sql) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| affected, err = res.RowsAffected() | ||
| totalAffected += affected | ||
|
|
||
| return err | ||
| }) | ||
| affected, err := batchWork() | ||
| totalAffected += affected | ||
| if err != nil { | ||
| return totalAffected, err | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error-level logs used for routine operational messages; terse variable names.Lines 534, 537, 554, 557, 576, 579 all use
r.log.Error(...)for what are clearly informational/debug messages (e.g., "Annotations to clean by time"). In production this will fire alerts and pollute error dashboards. Additionally, logging the fullidsslice at any level will be extremely verbose for large batches (up to 32 767 entries).The variables
x, yon Lines 536–538 (and 556–558, 578–579) should use descriptive names.Proposed fix (showing the MaxAge block; apply the same pattern to MaxCount and orphaned tags)
📝 Committable suggestion
🤖 Prompt for AI Agents