-
Notifications
You must be signed in to change notification settings - Fork 0
Boostrap persistent state for volume servers. #1
base: replay-7984-base-e67973d
Are you sure you want to change the base?
Boostrap persistent state for volume servers. #1
Conversation
This PR implements logic load/save persistent state information for storages associated with volume servers, and reporting state changes back to masters via heartbeat messages. More work ensues! See https://github.com/seaweedfs/seaweedfs/issues/7977 for details.
📝 WalkthroughWalkthroughThe changes introduce persistent state management for volume servers with new protobuf message definitions, state storage lifecycle methods in the storage layer, and integration of state updates into heartbeat communication to the master server. Changes
Sequence Diagram(s)sequenceDiagram
participant VS as Volume Server<br/>(Store)
participant CH as State Update<br/>Channel
participant HB as Heartbeat Loop<br/>(gRPC Client)
participant MS as Master Server
Note over VS: LoadState() or SaveState()
VS->>VS: State lifecycle operation
VS->>CH: Push VolumeServerState<br/>to StateUpdateChan
HB->>CH: Monitor StateUpdateChan
activate HB
CH-->>HB: State update received
HB->>HB: Build Heartbeat message<br/>with State field
HB->>MS: Send Heartbeat to Master
MS-->>HB: Acknowledge
deactivate HB
Note over HB: Continue regular heartbeat<br/>interval processing
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @weed/server/volume_grpc_client_to_master.go:
- Around line 215-227: Store.Close() currently doesn't close StateUpdateChan nor
the delta channels (NewVolumesChan, DeletedVolumesChan, NewEcShardsChan,
DeletedEcShardsChan) and StopHeartbeat() is never invoked, risking goroutine
leaks and panics from sends after close; fix by closing those channels inside
Store.Close() (or call StopHeartbeat() first to stop the heartbeat loop), and
update producers (LoadState(), SaveState(), and any other senders) to do
non-blocking/select sends or check a store-closed flag before sending so they
won't panic when channels are closed; locate the heartbeat loop that reads
StateUpdateChan (and the StopHeartbeat implementation) and ensure shutdown
ordering: StopHeartbeat() -> drain/stop producers -> close channels in
Store.Close().
🧹 Nitpick comments (4)
weed/storage/disk_location.go (1)
51-51: Consider usingfilepath.Joininstead ofpath.Joinfor filesystem paths.Since this constructs a filesystem path for file I/O operations,
filepath.Joinwould be more appropriate as it's OS-aware and handles platform-specific path separators correctly. Thepathpackage is intended for URL-like paths with forward slashes.♻️ Proposed refactor
- fileName := path.Join(dir, UUIDFileName) + fileName := filepath.Join(dir, UUIDFileName)This would also allow removing the
pathimport on line 6.weed/storage/store_state.go (2)
61-64: Use consistent format verb for error.Line 63 uses
%sfor the error value while line 66 uses%v. For consistency and to handle errors that may have structured information, use%vfor both.🔧 Suggested fix
binPb, err := proto.Marshal(st.Pb) if err != nil { - return fmt.Errorf("failed to serialize store state %v: %s", st.Pb, err) + return fmt.Errorf("failed to serialize store state %v: %v", st.Pb, err) }
8-8: Migrate from deprecated protobuf module.The
github.com/golang/protobuf/protopackage is deprecated and scheduled for removal in 2025. Switch togoogle.golang.org/protobuf/protofor runtime imports. The protobuf project provides the open2opaque migration tool to automate most of the changes.weed/storage/store.go (1)
161-180: Consider non-blocking sends to StateUpdateChan.Both
LoadState()andSaveState()use blocking sends toStateUpdateChan. While the 1024 buffer makes blocking unlikely, other channels in this file (e.g.,NewEcShardsChanat lines 128-141) use non-blocking sends withselect/defaultto avoid potential deadlocks during startup or high-load scenarios.For consistency and safety, consider applying the same pattern here.
♻️ Suggested non-blocking send pattern
func (s *Store) LoadState() error { err := s.State.Load() if s.State.Pb != nil && err == nil { - s.StateUpdateChan <- s.State.Pb + select { + case s.StateUpdateChan <- s.State.Pb: + default: + glog.V(2).Infof("StateUpdateChan full during LoadState, state will be reported in heartbeat") + } } return err } func (s *Store) SaveState() error { if s.State.Pb == nil { glog.Warningf("tried to save empty state for store %s", s.Id) return nil } err := s.State.Save() if s.State.Pb != nil && err == nil { - s.StateUpdateChan <- s.State.Pb + select { + case s.StateUpdateChan <- s.State.Pb: + default: + glog.V(2).Infof("StateUpdateChan full during SaveState, state will be reported in heartbeat") + } } return err }
📜 Review details
Configuration used: Environment YAML
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (3)
weed/pb/master_pb/master.pb.gois excluded by!**/*.pb.goweed/pb/volume_server_pb/volume_server.pb.gois excluded by!**/*.pb.goweed/pb/volume_server_pb/volume_server_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (7)
weed/pb/master.protoweed/pb/volume_server.protoweed/server/master_grpc_server.goweed/server/volume_grpc_client_to_master.goweed/storage/disk_location.goweed/storage/store.goweed/storage/store_state.go
🔇 Additional comments (15)
weed/pb/volume_server.proto (3)
10-14: LGTM! Clean protobuf message definition.The
VolumeServerStatemessage is well-defined with clear documentation. The singlemaintenancefield appropriately represents the read-only mode state.
56-56: Appropriate placeholder for future work.The TODO correctly references the tracking issue and indicates planned RPC additions for state control.
581-581: Appropriate placeholder for future integration.The TODO correctly marks where the state will be included in status responses, aligning with the incremental rollout approach described in the PR objectives.
weed/storage/disk_location.go (2)
23-26: LGTM! Well-defined constants.Exporting
UUIDFileNameandUUIDFileModpromotes reusability across the codebase and eliminates magic values.
71-71: LGTM! Using named constant improves readability.Replacing the magic number with
UUIDFileModmakes the code more maintainable and self-documenting.weed/pb/master.proto (2)
7-8: LGTM! Necessary import added.The import of
volume_server.protois correctly added to support the newVolumeServerStatetype in the Heartbeat message.
89-91: LGTM! State field properly integrated into Heartbeat.The
statefield is correctly added with an appropriate tag number and clear documentation. The cross-package reference tovolume_server_pb.VolumeServerStateis properly formatted.weed/server/master_grpc_server.go (1)
168-168: Appropriate placeholder for future state handling.The TODO is well-positioned in the heartbeat processing flow and correctly references the tracking issue. This aligns with the PR's incremental approach.
weed/storage/store_state.go (3)
14-17: LGTM!Constants are appropriately defined with sensible defaults for file naming and permissions.
19-32: LGTM!The
Statestruct andNewStateconstructor are well-structured. The constructor properly derives the file path and delegates toLoad()for initialization.
34-54: LGTM!The
Load()method handles all cases correctly:
- Missing file is treated as first-time initialization (no error)
- Initializes
Pbbefore attempting unmarshal- Properly cleans up
Pbon error to avoid partial stateweed/storage/store.go (4)
19-19: LGTM!Import added correctly for the new state-related protobuf types.
27-30: LGTM!The
HEARTBEAT_CHAN_SIZEconstant provides consistent sizing for heartbeat-related channels and improves maintainability.
74-75: LGTM!New fields integrate well with the existing Store struct. The
StateUpdateChanenables propagation of state changes to heartbeat processing.
152-156: Verify fatal error handling aligns with operational expectations.Using
glog.Fatalffor state initialization failure means the volume server will not start if the state file exists but is corrupted. This is a reasonable fail-fast approach, but verify this matches the desired operational behavior. An alternative would be logging an error, initializing with empty state, and moving/renaming the corrupted file.
| case stateMessage := <-vs.store.StateUpdateChan: | ||
| stateBeat := &master_pb.Heartbeat{ | ||
| Ip: ip, | ||
| Port: port, | ||
| DataCenter: dataCenter, | ||
| Rack: rack, | ||
| State: stateMessage, | ||
| } | ||
| glog.V(0).Infof("volume server %s:%d updates state to %v", vs.store.Ip, vs.store.Port, stateMessage) | ||
| if err = stream.Send(stateBeat); err != nil { | ||
| glog.V(0).Infof("Volume Server Failed to update state to master %s: %v", masterAddress, err) | ||
| return "", err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the Store struct definition and look for StateUpdateChan
echo "=== Searching for StateUpdateChan in Store struct ==="
rg -n "StateUpdateChan" --type=go -B 2 -A 2
echo -e "\n=== Searching for Store struct definition ==="
fd . --type f -name "*.go" -path "*/store*" | head -20Repository: admin-coderabbit/seaweedfs
Length of output: 2118
🏁 Script executed:
#!/bin/bash
# Search for HEARTBEAT_CHAN_SIZE definition
echo "=== HEARTBEAT_CHAN_SIZE definition ==="
rg -n "HEARTBEAT_CHAN_SIZE" --type=go -B 1 -A 1
# Search for channel close operations on StateUpdateChan or all channels
echo -e "\n=== Channel close operations in store context ==="
rg -n "close.*Chan|StateUpdateChan" weed/storage/store.go
# Look for store shutdown/cleanup methods
echo -e "\n=== Store shutdown/cleanup methods ==="
rg -n "func.*store.*Close|func.*store.*Shutdown" --type=go -A 3Repository: admin-coderabbit/seaweedfs
Length of output: 32645
🏁 Script executed:
#!/bin/bash
# Search for Store struct close/cleanup methods more broadly
echo "=== Store struct methods (weed/storage/store.go) ==="
rg -n "func.*Store\)" weed/storage/store.go | head -20
# Search for any close() or shutdown pattern in the file
echo -e "\n=== Close operations in store.go ==="
rg -n "close|Close" weed/storage/store.go
# Check if there's a destructor or cleanup anywhere
echo -e "\n=== Looking for volume server shutdown patterns ==="
rg -n "StateUpdateChan|store.*shutdown|store.*close" weed/server/ --type=go -A 2 -B 2Repository: admin-coderabbit/seaweedfs
Length of output: 2155
🏁 Script executed:
#!/bin/bash
# Read the Store.Close() method implementation
echo "=== Store.Close() implementation ==="
sed -n '553,570p' weed/storage/store.go
# Look for any channel closing logic
echo -e "\n=== Check for channel closing in entire store.go ==="
rg -n "close(" weed/storage/store.goRepository: admin-coderabbit/seaweedfs
Length of output: 800
🏁 Script executed:
#!/bin/bash
# Look for the heartbeat goroutine that reads from these channels
echo "=== Find the heartbeat goroutine that consumes StateUpdateChan ==="
rg -n "func.*SendHeartbeat" weed/server/volume_grpc_client_to_master.go -A 50 | head -100
# Check the complete Close() implementation more carefully
echo -e "\n=== Store.Close() method (exact lines) ==="
cat -n weed/storage/store.go | sed -n '553,560p'
# Look for where the heartbeat goroutine is started
echo -e "\n=== Look for goroutine starting patterns ==="
rg -n "go.*Heartbeat|SendHeartbeat|StateUpdateChan" weed/server/ --type=goRepository: admin-coderabbit/seaweedfs
Length of output: 1865
🏁 Script executed:
#!/bin/bash
# Get the complete heartbeat sending function context
echo "=== Complete heartbeat sending logic (around line 215) ==="
sed -n '100,250p' weed/server/volume_grpc_client_to_master.go | cat -n
# Look for where the heartbeat goroutine is launched
echo -e "\n=== Search for where SendHeartbeat function is called ==="
rg -n "SendHeartbeat\(" weed/server/volume_server.go
# Check if there's a shutdown/stop mechanism for the volume server
echo -e "\n=== Look for volume server stopping patterns ==="
rg -n "func.*Stop|func.*Shutdown|vs.store.Close" weed/server/ --type=go -B 2 -A 2Repository: admin-coderabbit/seaweedfs
Length of output: 8500
🏁 Script executed:
#!/bin/bash
# Look for stopChan handling in the select loop
echo "=== Complete select loop in doHeartbeatWithRetry (lines 114-250) ==="
sed -n '114,250p' weed/server/volume_grpc_client_to_master.go | cat -n
# Check how StopHeartbeat is used
echo -e "\n=== StopHeartbeat and stopChan handling ==="
rg -n "stopChan\|StopHeartbeat" weed/server/ --type=go -B 2 -A 2Repository: admin-coderabbit/seaweedfs
Length of output: 6132
🏁 Script executed:
#!/bin/bash
# Get more complete lines from the select statement
echo "=== Complete select statement (lines 100-300) ==="
sed -n '100,300p' weed/server/volume_grpc_client_to_master.go | cat -n
# Look for where vs.stopChan is defined and how it's used
echo -e "\n=== VolumeServer struct definition to find stopChan ==="
rg -n "type VolumeServer struct" weed/server/volume_server.go -A 50
# Search for all stopChan references
echo -e "\n=== All stopChan references ==="
rg -n "stopChan" weed/server/ --type=goRepository: admin-coderabbit/seaweedfs
Length of output: 12019
🏁 Script executed:
#!/bin/bash
# Get the complete select statement with stopChan case
echo "=== Complete select statement with stopChan (lines 114-310) ==="
sed -n '114,310p' weed/server/volume_grpc_client_to_master.go | tail -50
# Verify if there's a closeFunc or similar for channels
echo -e "\n=== Check if Store has any close methods for channels ==="
rg -n "func.*Store.*Close\|func.*Store.*Stop" weed/storage/store.go -A 5Repository: admin-coderabbit/seaweedfs
Length of output: 2313
🏁 Script executed:
#!/bin/bash
# Check if LoadState/SaveState are ever called after shutdown
echo "=== Where LoadState and SaveState are called ==="
rg -n "LoadState\|SaveState" weed/server/ --type=go -B 2 -A 2
# Check the complete shutdown flow in VolumeServer
echo -e "\n=== VolumeServer.SetStopping() and Shutdown() methods ==="
sed -n '151,165p' weed/server/volume_server.go | cat -n
# Check if there's any synchronization with heartbeat goroutine
echo -e "\n=== Heartbeat goroutine lifecycle management ==="
rg -n "isHeartbeating\|StartHeartbeat" weed/server/ --type=goRepository: admin-coderabbit/seaweedfs
Length of output: 722
🏁 Script executed:
#!/bin/bash
# Find where LoadState and SaveState might be invoked
echo "=== Search in entire codebase for LoadState/SaveState ==="
rg -rn "LoadState\(\)|SaveState\(\)" --type=go | head -20
# Check if StopHeartbeat is called anywhere
echo -e "\n=== Where StopHeartbeat is called ==="
rg -n "StopHeartbeat\|stopChan" weed/server/volume_server.go
# Verify the heartbeat loop fully
echo -e "\n=== Check complete stopChan handler ==="
sed -n '305,320p' weed/server/volume_grpc_client_to_master.go | cat -nRepository: admin-coderabbit/seaweedfs
Length of output: 1044
StateUpdateChan is properly initialized and buffered, but channel lifecycle management is incomplete.
The channel is correctly defined with buffer size 1024 (HEARTBEAT_CHAN_SIZE) to prevent blocking state updates. However, verification reveals a critical gap:
Store.Close()does not closeStateUpdateChanor related delta update channels (NewVolumesChan,DeletedVolumesChan,NewEcShardsChan,DeletedEcShardsChan)StopHeartbeat()exists but is never invoked, leaving the graceful shutdown path (stopChan) unused- Risk of goroutine leak if the heartbeat loop is blocked on a channel receive when the store shuts down
- Potential panic if
LoadState()/SaveState()attempt to send afterStore.Close()
Channels should be closed in Store.Close(), or shutdown should explicitly trigger StopHeartbeat() before closing the store to ensure clean goroutine termination.
🤖 Prompt for AI Agents
In @weed/server/volume_grpc_client_to_master.go around lines 215 - 227,
Store.Close() currently doesn't close StateUpdateChan nor the delta channels
(NewVolumesChan, DeletedVolumesChan, NewEcShardsChan, DeletedEcShardsChan) and
StopHeartbeat() is never invoked, risking goroutine leaks and panics from sends
after close; fix by closing those channels inside Store.Close() (or call
StopHeartbeat() first to stop the heartbeat loop), and update producers
(LoadState(), SaveState(), and any other senders) to do non-blocking/select
sends or check a store-closed flag before sending so they won't panic when
channels are closed; locate the heartbeat loop that reads StateUpdateChan (and
the StopHeartbeat implementation) and ensure shutdown ordering: StopHeartbeat()
-> drain/stop producers -> close channels in Store.Close().
What problem are we solving?
Add support for maintenance (read-only) mode on volume servers.
See https://github.com/seaweedfs/seaweedfs/issues/7977 for details.
How are we solving the problem?
This PR implements logic to load/save persistent state information for storages associated with volume servers, and reporting state changes back to masters via heartbeat messages.
Master and node full support will follow in subsequent PRs.
How is the PR tested?
No functional changes, as nothing is updating states yet.
Checks
Summary by CodeRabbit
New Features
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.