Skip to content

Automated Test: unified-storage-enhancements #329

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/server/module_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func TestIntegrationWillRunInstrumentationServerWhenTargetHasNoHttpServer(t *tes
if dbType == "sqlite3" {
t.Skip("skipping - sqlite not supported for storage server target")
}
// TODO - fix this test for postgres
if dbType == "postgres" {
t.Skip("skipping - test not working with postgres in Drone. Works locally.")
}

_, cfg := db.InitTestDBWithCfg(t)
cfg.HTTPPort = "3001"
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/unified/resource/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -171,7 +170,7 @@ func (s *searchSupport) Search(ctx context.Context, req *ResourceSearchRequest)

// init is called during startup. any failure will block startup and continued execution
func (s *searchSupport) init(ctx context.Context) error {
_, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init")
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init")
defer span.End()
start := time.Now().Unix()

Expand Down Expand Up @@ -214,6 +213,7 @@ func (s *searchSupport) init(ctx context.Context) error {
}()

end := time.Now().Unix()
s.log.Info("search index initialized", "duration_secs", end-start, "total_docs", s.search.TotalDocs())
if IndexMetrics != nil {
IndexMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start))
}
Expand Down Expand Up @@ -277,7 +277,7 @@ func (s *searchSupport) handleEvent(ctx context.Context, evt *WrittenEvent) {
// record latency from when event was created to when it was indexed
latencySeconds := float64(time.Now().UnixMicro()-evt.ResourceVersion) / 1e6
if latencySeconds > 5 {
logger.Warn("high index latency", "latency", latencySeconds)
s.log.Warn("high index latency", "latency", latencySeconds)
}
if IndexMetrics != nil {
IndexMetrics.IndexLatency.WithLabelValues(evt.Key.Resource).Observe(latencySeconds)
Expand Down Expand Up @@ -307,7 +307,7 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso
}

func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size int64, rv int64) (ResourceIndex, int64, error) {
_, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Build")
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Build")
defer span.End()

builder, err := s.builders.get(ctx, nsr)
Expand Down
57 changes: 11 additions & 46 deletions pkg/storage/unified/resource/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
}
}

err := s.Init(ctx)
if err != nil {
s.log.Error("error initializing resource server", "error", err)
return nil, err
}

return s, nil
}

Expand Down Expand Up @@ -294,16 +300,16 @@ func (s *server) Init(ctx context.Context) error {
}
}

// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
}

// initialize the search index
if s.initErr == nil && s.search != nil {
s.initErr = s.search.init(ctx)
}

// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
}

if s.initErr != nil {
s.log.Error("error initializing resource server", "error", s.initErr)
}
Expand Down Expand Up @@ -446,10 +452,6 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
defer span.End()

if err := s.Init(ctx); err != nil {
return nil, err
}

rsp := &CreateResponse{}
user, ok := claims.From(ctx)
if !ok || user == nil {
Expand Down Expand Up @@ -488,10 +490,6 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()

if err := s.Init(ctx); err != nil {
return nil, err
}

rsp := &UpdateResponse{}
user, ok := claims.From(ctx)
if !ok || user == nil {
Expand Down Expand Up @@ -542,10 +540,6 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
defer span.End()

if err := s.Init(ctx); err != nil {
return nil, err
}

rsp := &DeleteResponse{}
if req.ResourceVersion < 0 {
return nil, apierrors.NewBadRequest("update must include the previous version")
Expand Down Expand Up @@ -634,9 +628,6 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
}

func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
user, ok := claims.From(ctx)
if !ok || user == nil {
return &ReadResponse{
Expand Down Expand Up @@ -693,9 +684,6 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
}}, nil
}

if err := s.Init(ctx); err != nil {
return nil, err
}
if req.Limit < 1 {
req.Limit = 50 // default max 50 items in a page
}
Expand Down Expand Up @@ -786,10 +774,6 @@ func (s *server) initWatcher() error {
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
ctx := srv.Context()

if err := s.Init(ctx); err != nil {
return err
}

user, ok := claims.From(ctx)
if !ok || user == nil {
return apierrors.NewUnauthorized("no user found in context")
Expand Down Expand Up @@ -930,9 +914,6 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
}

func (s *server) Search(ctx context.Context, req *ResourceSearchRequest) (*ResourceSearchResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
Expand All @@ -941,25 +922,16 @@ func (s *server) Search(ctx context.Context, req *ResourceSearchRequest) (*Resou

// History implements ResourceServer.
func (s *server) History(ctx context.Context, req *HistoryRequest) (*HistoryResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
return s.search.History(ctx, req)
}

// Origin implements ResourceServer.
func (s *server) Origin(ctx context.Context, req *OriginRequest) (*OriginResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
return s.search.Origin(ctx, req)
}

// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
return s.diagnostics.IsHealthy(ctx, req)
}

Expand All @@ -971,9 +943,6 @@ func (s *server) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResp
Code: http.StatusNotImplemented,
}}, nil
}
if err := s.Init(ctx); err != nil {
return nil, err
}

rsp, err := s.blob.PutResourceBlob(ctx, req)
if err != nil {
Expand Down Expand Up @@ -1016,10 +985,6 @@ func (s *server) GetBlob(ctx context.Context, req *GetBlobRequest) (*GetBlobResp
}}, nil
}

if err := s.Init(ctx); err != nil {
return nil, err
}

// The linked blob is stored in the resource metadata attributes
obj, status := s.getPartialObject(ctx, req.Resource, req.ResourceVersion)
if status != nil {
Expand Down
11 changes: 5 additions & 6 deletions pkg/storage/unified/search/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
// The builder will write all documents before returning
builder func(index resource.ResourceIndex) (int64, error),
) (resource.ResourceIndex, error) {
b.cacheMu.Lock()
defer b.cacheMu.Unlock()

_, span := b.tracer.Start(ctx, tracingPrexfixBleve+"BuildIndex")
defer span.End()

Expand All @@ -99,9 +96,9 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
if size > b.opts.FileThreshold {
dir := filepath.Join(b.opts.Root, key.Namespace, fmt.Sprintf("%s.%s", key.Resource, key.Group))
index, err = bleve.New(dir, mapper)
if err == nil {
b.log.Info("TODO, check last RV so we can see if the numbers have changed", "dir", dir)
}

// TODO, check last RV so we can see if the numbers have changed

resource.IndexMetrics.IndexTenants.WithLabelValues(key.Namespace, "file").Inc()
} else {
index, err = bleve.NewMemOnly(mapper)
Expand Down Expand Up @@ -137,7 +134,9 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
return nil, err
}

b.cacheMu.Lock()
b.cache[key] = idx
b.cacheMu.Unlock()
return idx, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/unified/sql/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (b *backend) Stop(_ context.Context) error {

// GetResourceStats implements Backend.
func (b *backend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]resource.ResourceStats, error) {
_, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats")
ctx, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats")
defer span.End()
Comment on lines +126 to 127

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Double dot in span name: "sql.resource..GetResourceStats"

tracePrefix is "sql.resource." (line 27), so concatenating ".GetResourceStats" produces "sql.resource..GetResourceStats". Other methods in this file (e.g., WriteEvent, Create, Update) omit the leading dot. This also applies to the pre-existing ".Read" on line 353.

🔧 Proposed fix
-	ctx, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats")
+	ctx, span := b.tracer.Start(ctx, tracePrefix+"GetResourceStats")

And separately (pre-existing, line 353):

-	_, span := b.tracer.Start(ctx, tracePrefix+".Read")
+	_, span := b.tracer.Start(ctx, tracePrefix+"Read")
🤖 Prompt for AI Agents
In `@pkg/storage/unified/sql/backend.go` around lines 126 - 127, The span name
concatenation double-dot bug occurs because tracePrefix is "sql.resource." and
the code appends ".GetResourceStats" producing "sql.resource..GetResourceStats";
update the span creation in GetResourceStats (tracePrefix+".GetResourceStats")
to remove the extra leading dot by either appending "GetResourceStats"
(tracePrefix+"GetResourceStats") or trimming the trailing dot from tracePrefix
before concatenation, and apply the same fix to the other occurrences that use
".Read" so all span names follow the same pattern as WriteEvent/Create/Update.


req := &sqlStatsRequest{
Expand Down