Skip to content

Commit

Permalink
chore: execute goimports to format the code
Browse files Browse the repository at this point in the history
Signed-off-by: promalert <promalert@outlook.com>
  • Loading branch information
promalert committed Jan 7, 2026
1 parent e67973d commit 7c4db48
Show file tree
Hide file tree
Showing 275 changed files with 661 additions and 533 deletions.
7 changes: 4 additions & 3 deletions other/mq_client_example/agent_pub_record/agent_pub_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package main
import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/other/mq_client_example/example"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"log"
"sync"
"sync/atomic"
"time"

"github.com/seaweedfs/seaweedfs/other/mq_client_example/example"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
)

var (
Expand Down
5 changes: 3 additions & 2 deletions other/mq_client_example/agent_sub_record/agent_sub_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package main
import (
"flag"
"fmt"
"log"
"time"

"github.com/seaweedfs/seaweedfs/other/mq_client_example/example"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"log"
"time"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions seaweedfs-rdma-sidecar/pkg/rdma/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (c *Client) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, err
}).Info("✅ RDMA read completed successfully")

// MOCK DATA IMPLEMENTATION - FOR DEVELOPMENT/TESTING ONLY
//
//
// This section generates placeholder data for the mock RDMA implementation.
// In a production RDMA implementation, this should be replaced with:
//
Expand Down Expand Up @@ -472,7 +472,7 @@ func (c *Client) ReadFileRange(ctx context.Context, fileID string, offset, size
if err != nil {
return nil, fmt.Errorf("invalid file ID %s: %w", fileID, err)
}

req := &ReadRequest{
VolumeID: volumeID,
NeedleID: needleID,
Expand Down
1 change: 1 addition & 0 deletions seaweedfs-rdma-sidecar/pkg/seaweedfs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"time"

"seaweedfs-rdma-sidecar/pkg/rdma"
Expand Down
1 change: 1 addition & 0 deletions test/fuse_integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fuse_test
import (
"fmt"
"io/fs"
"net"
"os"
"os/exec"
"path/filepath"
Expand Down
2 changes: 1 addition & 1 deletion test/fuse_integration/minimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package fuse_test
import "testing"

func TestMinimal(t *testing.T) {
t.Log("minimal test")
t.Log("minimal test")
}
10 changes: 5 additions & 5 deletions test/kafka/integration/rebalancing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func testTwoConsumersRebalance(t *testing.T, addr, topicName, groupID string) {

// Wait for rebalancing to occur - both consumers should get new assignments
var rebalancedAssignment1, rebalancedAssignment2 []int32

// Consumer1 should get a rebalance assignment
select {
case partitions := <-handler1.assignments:
Expand Down Expand Up @@ -372,7 +372,7 @@ func testMultipleConsumersJoin(t *testing.T, addr, topicName, groupID string) {
t.Errorf("Partition %d assigned to multiple consumers", partition)
}
}

// Each consumer should get exactly 1 partition (4 partitions / 4 consumers)
if len(assignment) != 1 {
t.Errorf("Consumer%d should get exactly 1 partition, got %d", i, len(assignment))
Expand Down Expand Up @@ -408,7 +408,7 @@ func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error
h.readyOnce.Do(func() {
close(h.ready)
})

// Send partition assignment
partitions := make([]int32, 0)
for topic, partitionList := range session.Claims() {
Expand All @@ -417,13 +417,13 @@ func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error
partitions = append(partitions, partition)
}
}

select {
case h.assignments <- partitions:
default:
// Channel might be full, that's ok
}

return nil
}

Expand Down
5 changes: 2 additions & 3 deletions test/kafka/integration/schema_end_to_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestSchemaEndToEnd_AvroRoundTrip(t *testing.T) {
// Verify all fields
assert.Equal(t, int32(12345), decodedMap["id"])
assert.Equal(t, "Alice Johnson", decodedMap["name"])

// Verify union fields
emailUnion, ok := decodedMap["email"].(map[string]interface{})
require.True(t, ok, "Email should be a union")
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestSchemaEndToEnd_ProtobufRoundTrip(t *testing.T) {
require.Equal(t, uint32(2), envelope.SchemaID, "Schema ID should match")
// Note: ParseConfluentEnvelope defaults to FormatAvro; format detection requires schema registry
require.Equal(t, schema.FormatAvro, envelope.Format, "Format defaults to Avro without schema registry lookup")

// For Protobuf with indexes, we need to use the specialized parser
protobufEnvelope, ok := schema.ParseConfluentProtobufEnvelopeWithIndexCount(confluentMsg, 1)
require.True(t, ok, "Message should be a valid Protobuf envelope")
Expand Down Expand Up @@ -269,7 +269,6 @@ func createMockSchemaRegistryForE2E(t *testing.T) *httptest.Server {
}))
}


func getUserAvroSchemaForE2E() string {
return `{
"type": "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

// TestConsumerStallingPattern is a REPRODUCER for the consumer stalling bug.
//
//
// This test simulates the exact pattern that causes consumers to stall:
// 1. Consumer reads messages in batches
// 2. Consumer commits offset after each batch
Expand All @@ -24,7 +24,7 @@ import (
// If the test PASSES, it means consumer successfully fetches all messages (bug fixed)
func TestConsumerStallingPattern(t *testing.T) {
t.Skip("REPRODUCER TEST: Requires running load test infrastructure. See comments for setup.")

// This test documents the exact stalling pattern:
// - Consumers consume messages 0-163, commit offset 163
// - Next iteration: fetch offset 164+
Expand All @@ -36,7 +36,7 @@ func TestConsumerStallingPattern(t *testing.T) {
// 2. Empty fetch doesn't mean "end of partition" (could be transient)
// 3. Consumer retries on empty fetch instead of giving up
// 4. Logging shows why fetch stopped

t.Logf("=== CONSUMER STALLING REPRODUCER ===")
t.Logf("")
t.Logf("Setup Steps:")
Expand Down Expand Up @@ -72,27 +72,27 @@ func TestConsumerStallingPattern(t *testing.T) {
// This is a UNIT reproducer that can run standalone
func TestOffsetPlusOneCalculation(t *testing.T) {
testCases := []struct {
name string
committedOffset int64
name string
committedOffset int64
expectedNextOffset int64
}{
{"Offset 0", 0, 1},
{"Offset 99", 99, 100},
{"Offset 163", 163, 164}, // The exact stalling point!
{"Offset 163", 163, 164}, // The exact stalling point!
{"Offset 999", 999, 1000},
{"Large offset", 10000, 10001},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// This is the critical calculation
nextOffset := tc.committedOffset + 1

if nextOffset != tc.expectedNextOffset {
t.Fatalf("OFFSET MATH BUG: committed=%d, next=%d (expected %d)",
tc.committedOffset, nextOffset, tc.expectedNextOffset)
}

t.Logf("✓ offset %d → next fetch at %d", tc.committedOffset, nextOffset)
})
}
Expand All @@ -105,18 +105,18 @@ func TestEmptyFetchShouldNotStopConsumer(t *testing.T) {
// Scenario: Consumer committed offset 163, then fetches 164+
committedOffset := int64(163)
nextFetchOffset := committedOffset + 1

// First attempt: get empty (transient - data might not be available yet)
// WRONG behavior (bug): Consumer sees 0 bytes and stops
// wrongConsumerLogic := (firstFetchResult == 0) // gives up!

// CORRECT behavior: Consumer should retry
correctConsumerLogic := true // continues retrying
correctConsumerLogic := true // continues retrying

if !correctConsumerLogic {
t.Fatalf("Consumer incorrectly gave up after empty fetch at offset %d", nextFetchOffset)
}

t.Logf("✓ Empty fetch doesn't stop consumer, continues retrying")
})
}
1 change: 0 additions & 1 deletion test/kafka/loadtest/resume_million_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,3 @@ func TestResumeMillionRecords_Fixed(t *testing.T) {

glog.Infof("🏆 MILLION RECORD KAFKA INTEGRATION TEST COMPLETED SUCCESSFULLY!")
}

16 changes: 8 additions & 8 deletions test/kafka/unit/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestGatewayBasicFunctionality(t *testing.T) {
defer gateway.CleanupAndClose()

addr := gateway.StartAndWait()

// Give the gateway a bit more time to be fully ready
time.Sleep(200 * time.Millisecond)

Expand All @@ -32,17 +32,17 @@ func TestGatewayBasicFunctionality(t *testing.T) {
func testGatewayAcceptsConnections(t *testing.T, addr string) {
// Test basic TCP connection to gateway
t.Logf("Testing connection to gateway at %s", addr)

conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
t.Fatalf("Failed to connect to gateway: %v", err)
}
defer conn.Close()

// Test that we can establish a connection and the gateway is listening
// We don't need to send a full Kafka request for this basic test
t.Logf("Successfully connected to gateway at %s", addr)

// Optional: Test that we can write some data without error
testData := []byte("test")
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
Expand All @@ -57,19 +57,19 @@ func testGatewayRefusesAfterClose(t *testing.T, gateway *testutil.GatewayTestSer
// Get the address from the gateway's listener
host, port := gateway.GetListenerAddr()
addr := fmt.Sprintf("%s:%d", host, port)

// Close the gateway
gateway.CleanupAndClose()

t.Log("Testing that gateway refuses connections after close")

// Attempt to connect - should fail
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
if err == nil {
conn.Close()
t.Fatal("Expected connection to fail after gateway close, but it succeeded")
}

// Verify it's a connection refused error
if !strings.Contains(err.Error(), "connection refused") && !strings.Contains(err.Error(), "connect: connection refused") {
t.Logf("Connection failed as expected with error: %v", err)
Expand Down
11 changes: 5 additions & 6 deletions test/s3/etag/s3_etag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ const (
autoChunkSize = 8 * 1024 * 1024

// Test sizes
smallFileSize = 1 * 1024 // 1KB - single chunk
mediumFileSize = 256 * 1024 // 256KB - single chunk (at threshold)
largeFileSize = 10 * 1024 * 1024 // 10MB - triggers auto-chunking (2 chunks)
xlFileSize = 25 * 1024 * 1024 // 25MB - triggers auto-chunking (4 chunks)
multipartSize = 5 * 1024 * 1024 // 5MB per part for multipart uploads
smallFileSize = 1 * 1024 // 1KB - single chunk
mediumFileSize = 256 * 1024 // 256KB - single chunk (at threshold)
largeFileSize = 10 * 1024 * 1024 // 10MB - triggers auto-chunking (2 chunks)
xlFileSize = 25 * 1024 * 1024 // 25MB - triggers auto-chunking (4 chunks)
multipartSize = 5 * 1024 * 1024 // 5MB per part for multipart uploads
)

// ETag format patterns
Expand Down Expand Up @@ -540,4 +540,3 @@ func TestMultipleLargeFileUploads(t *testing.T) {
assert.NoError(t, err, "File %d ETag should be valid hex", i)
}
}

6 changes: 3 additions & 3 deletions test/s3/iam/s3_iam_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func TestS3IAMBucketPolicyIntegration(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, testObjectData, string(data))
result.Body.Close()

// Clean up bucket policy after this test
_, err = adminClient.DeleteBucketPolicy(&s3.DeleteBucketPolicyInput{
Bucket: aws.String(bucketName),
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestS3IAMBucketPolicyIntegration(t *testing.T) {
assert.Contains(t, *policyResult.Policy, "Deny")

// NOTE: Enforcement test is commented out due to known architectural limitation:
//
//
// KNOWN LIMITATION: DeleteObject uses the coarse-grained ACTION_WRITE constant,
// which convertActionToS3Format maps to "s3:PutObject" (not "s3:DeleteObject").
// This means the policy engine evaluates the deny policy against "s3:PutObject",
Expand All @@ -499,7 +499,7 @@ func TestS3IAMBucketPolicyIntegration(t *testing.T) {
// awsErr, ok := err.(awserr.Error)
// require.True(t, ok, "Error should be an awserr.Error")
// assert.Equal(t, "AccessDenied", awsErr.Code(), "Expected AccessDenied error code")

// Clean up bucket policy after this test
_, err = adminClient.DeleteBucketPolicy(&s3.DeleteBucketPolicyInput{
Bucket: aws.String(bucketName),
Expand Down
3 changes: 2 additions & 1 deletion test/s3/s3client/s3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"context"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"time"
)

func main() {
Expand Down
1 change: 0 additions & 1 deletion test/s3/sse/github_7562_copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,4 +502,3 @@ func TestGitHub7562LargeFile(t *testing.T) {

t.Log("Large file test passed!")
}

1 change: 0 additions & 1 deletion test/s3/versioning/s3_versioning_multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,3 @@ func TestMultipartUploadDeleteMarkerListBehavior(t *testing.T) {

t.Logf("Object restored after delete marker removal, ETag=%s", multipartETag)
}

1 change: 0 additions & 1 deletion test/s3/versioning/s3_versioning_pagination_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,3 @@ func listAllVersions(t *testing.T, client *s3.Client, bucketName, objectKey stri
t.Logf("Total: %d versions in %d pages", len(allVersions), pageCount)
return allVersions
}

5 changes: 2 additions & 3 deletions test/sftp/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,11 +540,11 @@ func TestPathEdgeCases(t *testing.T) {
// Therefore, we cannot trigger the server-side path traversal block with this client.
// Instead, we verify that the file is created successfully within the jail (contained).
// The server-side protection logic is verified in unit tests (sftpd/sftp_server_test.go).

file, err := sftpClient.Create(traversalPath)
require.NoError(t, err, "creation should succeed because client sanitizes path")
file.Close()

// Clean up
err = sftpClient.Remove(traversalPath)
require.NoError(t, err)
Expand Down Expand Up @@ -649,4 +649,3 @@ func TestFileContent(t *testing.T) {
sftpClient.Remove(filename)
})
}

1 change: 0 additions & 1 deletion test/sftp/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,3 @@ func findTestDataPath() string {

return "./testdata"
}

Loading

0 comments on commit 7c4db48

Please sign in to comment.