Skip to content

chore: execute goimports to format the code #2

Open
wants to merge 2 commits into
base: replay-7983-base-e67973d
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions other/mq_client_example/agent_pub_record/agent_pub_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package main
import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/other/mq_client_example/example"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"log"
"sync"
"sync/atomic"
"time"

"github.com/seaweedfs/seaweedfs/other/mq_client_example/example"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
)

var (
Expand Down
5 changes: 3 additions & 2 deletions other/mq_client_example/agent_sub_record/agent_sub_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package main
import (
"flag"
"fmt"
"log"
"time"

"github.com/seaweedfs/seaweedfs/other/mq_client_example/example"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"log"
"time"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions seaweedfs-rdma-sidecar/pkg/rdma/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (c *Client) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, err
}).Info("✅ RDMA read completed successfully")

// MOCK DATA IMPLEMENTATION - FOR DEVELOPMENT/TESTING ONLY
//
//
// This section generates placeholder data for the mock RDMA implementation.
// In a production RDMA implementation, this should be replaced with:
//
Expand Down Expand Up @@ -472,7 +472,7 @@ func (c *Client) ReadFileRange(ctx context.Context, fileID string, offset, size
if err != nil {
return nil, fmt.Errorf("invalid file ID %s: %w", fileID, err)
}

req := &ReadRequest{
VolumeID: volumeID,
NeedleID: needleID,
Expand Down
1 change: 1 addition & 0 deletions seaweedfs-rdma-sidecar/pkg/seaweedfs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"time"

"seaweedfs-rdma-sidecar/pkg/rdma"
Expand Down
5 changes: 3 additions & 2 deletions telemetry/proto/telemetry.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions test/fuse_integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fuse_test
import (
"fmt"
"io/fs"
"net"
"os"
"os/exec"
"path/filepath"
Expand Down
2 changes: 1 addition & 1 deletion test/fuse_integration/minimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package fuse_test
import "testing"

func TestMinimal(t *testing.T) {
t.Log("minimal test")
t.Log("minimal test")
}
10 changes: 5 additions & 5 deletions test/kafka/integration/rebalancing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func testTwoConsumersRebalance(t *testing.T, addr, topicName, groupID string) {

// Wait for rebalancing to occur - both consumers should get new assignments
var rebalancedAssignment1, rebalancedAssignment2 []int32

// Consumer1 should get a rebalance assignment
select {
case partitions := <-handler1.assignments:
Expand Down Expand Up @@ -372,7 +372,7 @@ func testMultipleConsumersJoin(t *testing.T, addr, topicName, groupID string) {
t.Errorf("Partition %d assigned to multiple consumers", partition)
}
}

// Each consumer should get exactly 1 partition (4 partitions / 4 consumers)
if len(assignment) != 1 {
t.Errorf("Consumer%d should get exactly 1 partition, got %d", i, len(assignment))
Expand Down Expand Up @@ -408,7 +408,7 @@ func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error
h.readyOnce.Do(func() {
close(h.ready)
})

// Send partition assignment
partitions := make([]int32, 0)
for topic, partitionList := range session.Claims() {
Expand All @@ -417,13 +417,13 @@ func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error
partitions = append(partitions, partition)
}
}

select {
case h.assignments <- partitions:
default:
// Channel might be full, that's ok
}

return nil
}

Expand Down
5 changes: 2 additions & 3 deletions test/kafka/integration/schema_end_to_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestSchemaEndToEnd_AvroRoundTrip(t *testing.T) {
// Verify all fields
assert.Equal(t, int32(12345), decodedMap["id"])
assert.Equal(t, "Alice Johnson", decodedMap["name"])

// Verify union fields
emailUnion, ok := decodedMap["email"].(map[string]interface{})
require.True(t, ok, "Email should be a union")
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestSchemaEndToEnd_ProtobufRoundTrip(t *testing.T) {
require.Equal(t, uint32(2), envelope.SchemaID, "Schema ID should match")
// Note: ParseConfluentEnvelope defaults to FormatAvro; format detection requires schema registry
require.Equal(t, schema.FormatAvro, envelope.Format, "Format defaults to Avro without schema registry lookup")

// For Protobuf with indexes, we need to use the specialized parser
protobufEnvelope, ok := schema.ParseConfluentProtobufEnvelopeWithIndexCount(confluentMsg, 1)
require.True(t, ok, "Message should be a valid Protobuf envelope")
Expand Down Expand Up @@ -269,7 +269,6 @@ func createMockSchemaRegistryForE2E(t *testing.T) *httptest.Server {
}))
}


func getUserAvroSchemaForE2E() string {
return `{
"type": "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

// TestConsumerStallingPattern is a REPRODUCER for the consumer stalling bug.
//
//
// This test simulates the exact pattern that causes consumers to stall:
// 1. Consumer reads messages in batches
// 2. Consumer commits offset after each batch
Expand All @@ -24,7 +24,7 @@ import (
// If the test PASSES, it means consumer successfully fetches all messages (bug fixed)
func TestConsumerStallingPattern(t *testing.T) {
t.Skip("REPRODUCER TEST: Requires running load test infrastructure. See comments for setup.")

// This test documents the exact stalling pattern:
// - Consumers consume messages 0-163, commit offset 163
// - Next iteration: fetch offset 164+
Expand All @@ -36,7 +36,7 @@ func TestConsumerStallingPattern(t *testing.T) {
// 2. Empty fetch doesn't mean "end of partition" (could be transient)
// 3. Consumer retries on empty fetch instead of giving up
// 4. Logging shows why fetch stopped

t.Logf("=== CONSUMER STALLING REPRODUCER ===")
t.Logf("")
t.Logf("Setup Steps:")
Expand Down Expand Up @@ -72,27 +72,27 @@ func TestConsumerStallingPattern(t *testing.T) {
// This is a UNIT reproducer that can run standalone
func TestOffsetPlusOneCalculation(t *testing.T) {
testCases := []struct {
name string
committedOffset int64
name string
committedOffset int64
expectedNextOffset int64
}{
{"Offset 0", 0, 1},
{"Offset 99", 99, 100},
{"Offset 163", 163, 164}, // The exact stalling point!
{"Offset 163", 163, 164}, // The exact stalling point!
{"Offset 999", 999, 1000},
{"Large offset", 10000, 10001},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// This is the critical calculation
nextOffset := tc.committedOffset + 1

if nextOffset != tc.expectedNextOffset {
t.Fatalf("OFFSET MATH BUG: committed=%d, next=%d (expected %d)",
tc.committedOffset, nextOffset, tc.expectedNextOffset)
}

t.Logf("✓ offset %d → next fetch at %d", tc.committedOffset, nextOffset)
})
}
Expand All @@ -105,18 +105,18 @@ func TestEmptyFetchShouldNotStopConsumer(t *testing.T) {
// Scenario: Consumer committed offset 163, then fetches 164+
committedOffset := int64(163)
nextFetchOffset := committedOffset + 1

// First attempt: get empty (transient - data might not be available yet)
// WRONG behavior (bug): Consumer sees 0 bytes and stops
// wrongConsumerLogic := (firstFetchResult == 0) // gives up!

// CORRECT behavior: Consumer should retry
correctConsumerLogic := true // continues retrying
correctConsumerLogic := true // continues retrying

if !correctConsumerLogic {
t.Fatalf("Consumer incorrectly gave up after empty fetch at offset %d", nextFetchOffset)
}

t.Logf("✓ Empty fetch doesn't stop consumer, continues retrying")
})
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion test/kafka/loadtest/resume_million_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,3 @@ func TestResumeMillionRecords_Fixed(t *testing.T) {

glog.Infof("🏆 MILLION RECORD KAFKA INTEGRATION TEST COMPLETED SUCCESSFULLY!")
}

16 changes: 8 additions & 8 deletions test/kafka/unit/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestGatewayBasicFunctionality(t *testing.T) {
defer gateway.CleanupAndClose()

addr := gateway.StartAndWait()

// Give the gateway a bit more time to be fully ready
time.Sleep(200 * time.Millisecond)

Expand All @@ -32,17 +32,17 @@ func TestGatewayBasicFunctionality(t *testing.T) {
func testGatewayAcceptsConnections(t *testing.T, addr string) {
// Test basic TCP connection to gateway
t.Logf("Testing connection to gateway at %s", addr)

conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
t.Fatalf("Failed to connect to gateway: %v", err)
}
defer conn.Close()

// Test that we can establish a connection and the gateway is listening
// We don't need to send a full Kafka request for this basic test
t.Logf("Successfully connected to gateway at %s", addr)

// Optional: Test that we can write some data without error
testData := []byte("test")
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
Expand All @@ -57,19 +57,19 @@ func testGatewayRefusesAfterClose(t *testing.T, gateway *testutil.GatewayTestSer
// Get the address from the gateway's listener
host, port := gateway.GetListenerAddr()
addr := fmt.Sprintf("%s:%d", host, port)

// Close the gateway
gateway.CleanupAndClose()

t.Log("Testing that gateway refuses connections after close")

// Attempt to connect - should fail
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
if err == nil {
conn.Close()
t.Fatal("Expected connection to fail after gateway close, but it succeeded")
}

// Verify it's a connection refused error
if !strings.Contains(err.Error(), "connection refused") && !strings.Contains(err.Error(), "connect: connection refused") {
t.Logf("Connection failed as expected with error: %v", err)
Expand Down
11 changes: 5 additions & 6 deletions test/s3/etag/s3_etag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ const (
autoChunkSize = 8 * 1024 * 1024

// Test sizes
smallFileSize = 1 * 1024 // 1KB - single chunk
mediumFileSize = 256 * 1024 // 256KB - single chunk (at threshold)
largeFileSize = 10 * 1024 * 1024 // 10MB - triggers auto-chunking (2 chunks)
xlFileSize = 25 * 1024 * 1024 // 25MB - triggers auto-chunking (4 chunks)
multipartSize = 5 * 1024 * 1024 // 5MB per part for multipart uploads
smallFileSize = 1 * 1024 // 1KB - single chunk
mediumFileSize = 256 * 1024 // 256KB - single chunk (at threshold)
largeFileSize = 10 * 1024 * 1024 // 10MB - triggers auto-chunking (2 chunks)
xlFileSize = 25 * 1024 * 1024 // 25MB - triggers auto-chunking (4 chunks)
multipartSize = 5 * 1024 * 1024 // 5MB per part for multipart uploads
)

// ETag format patterns
Expand Down Expand Up @@ -540,4 +540,3 @@ func TestMultipleLargeFileUploads(t *testing.T) {
assert.NoError(t, err, "File %d ETag should be valid hex", i)
}
}

6 changes: 3 additions & 3 deletions test/s3/iam/s3_iam_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func TestS3IAMBucketPolicyIntegration(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, testObjectData, string(data))
result.Body.Close()

// Clean up bucket policy after this test
_, err = adminClient.DeleteBucketPolicy(&s3.DeleteBucketPolicyInput{
Bucket: aws.String(bucketName),
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestS3IAMBucketPolicyIntegration(t *testing.T) {
assert.Contains(t, *policyResult.Policy, "Deny")

// NOTE: Enforcement test is commented out due to known architectural limitation:
//
//
// KNOWN LIMITATION: DeleteObject uses the coarse-grained ACTION_WRITE constant,
// which convertActionToS3Format maps to "s3:PutObject" (not "s3:DeleteObject").
// This means the policy engine evaluates the deny policy against "s3:PutObject",
Expand All @@ -499,7 +499,7 @@ func TestS3IAMBucketPolicyIntegration(t *testing.T) {
// awsErr, ok := err.(awserr.Error)
// require.True(t, ok, "Error should be an awserr.Error")
// assert.Equal(t, "AccessDenied", awsErr.Code(), "Expected AccessDenied error code")

// Clean up bucket policy after this test
_, err = adminClient.DeleteBucketPolicy(&s3.DeleteBucketPolicyInput{
Bucket: aws.String(bucketName),
Expand Down
3 changes: 2 additions & 1 deletion test/s3/s3client/s3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"context"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"time"
)

func main() {
Expand Down
1 change: 0 additions & 1 deletion test/s3/sse/github_7562_copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,4 +502,3 @@ func TestGitHub7562LargeFile(t *testing.T) {

t.Log("Large file test passed!")
}

1 change: 0 additions & 1 deletion test/s3/versioning/s3_versioning_multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,3 @@ func TestMultipartUploadDeleteMarkerListBehavior(t *testing.T) {

t.Logf("Object restored after delete marker removal, ETag=%s", multipartETag)
}

1 change: 0 additions & 1 deletion test/s3/versioning/s3_versioning_pagination_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,3 @@ func listAllVersions(t *testing.T, client *s3.Client, bucketName, objectKey stri
t.Logf("Total: %d versions in %d pages", len(allVersions), pageCount)
return allVersions
}

Loading