-
Notifications
You must be signed in to change notification settings - Fork 0
Automated Test: span-flusher-multiprocess #324
Conversation
📝 WalkthroughWalkthroughChanges introduce configurable multiprocessing support for span flushing with per-shard distribution. A new Changes
Sequence DiagramsequenceDiagram
participant CLI as CLI Configuration
participant Factory as ProcessSpansStrategyFactory
participant Flusher as SpanFlusher
participant Proc1 as Flusher Process 1
participant Proc2 as Flusher Process 2
participant Shards as Shard Buffer
CLI->>Factory: Create with flusher_processes=2
Factory->>Flusher: Initialize(max_processes=2, shards=[0,1,2,3])
Flusher->>Flusher: Map shards to processes (shard_to_process)
Flusher->>Proc1: Create process for shards [0,1]
Flusher->>Proc2: Create process for shards [2,3]
Note over Proc1,Proc2: Per-process monitoring loop
Flusher->>Proc1: Monitor health/backpressure
Flusher->>Proc2: Monitor health/backpressure
Proc1->>Shards: Poll shard 0 buffer
Proc1->>Shards: Poll shard 1 buffer
Proc2->>Shards: Poll shard 2 buffer
Proc2->>Shards: Poll shard 3 buffer
alt Process unhealthy
Flusher->>Proc1: Restart process
else Backpressure detected
Flusher->>Flusher: Update process_backpressure_since
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/sentry/spans/consumers/process/flusher.py`:
- Around line 185-199: The metric tag key is inconsistent: earlier calls use
tags={"shard": shard_tag} while the wait_produce timer uses tags={"shards":
shard_tag}; update the metrics.timer call for
"spans.buffer.flusher.wait_produce" to use the same tag key ("shard") as the
other metrics (i.e., change the tags dict for the wait_produce metrics.timer to
{"shard": shard_tag}) so all metrics (see the metrics.timer and metrics.timing
calls around flushed_segments, KafkaPayload, and produce) use a consistent tag
name.
- Around line 127-132: Remove the dead helper method _create_process_for_shard:
it's never called (the restart logic in _ensure_processes_alive already uses
_create_process_for_shards), so delete the entire _create_process_for_shard
definition and any related unused imports or references; keep
_create_process_for_shards and process_to_shards_map intact to preserve the
actual restart flow.
🧹 Nitpick comments (2)
src/sentry/spans/consumers/process/flusher.py (2)
247-251: Consider extracting the error message to a constant or using a custom exception.Per static analysis (TRY003), long messages in exceptions are better handled via custom exception classes for consistency and maintainability.
Proposed refactor
+class FlusherProcessCrashedError(RuntimeError): + def __init__(self, shards: list[int], cause: str): + super().__init__(f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer") # In _ensure_processes_alive: - if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS: - raise RuntimeError( - f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer" - ) + if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS: + raise FlusherProcessCrashedError(shards, cause)
337-347: Rename unused loop variable per static analysis.The
process_indexvariable is not used within the loop body (B007).Proposed fix
- for process_index, process in self.processes.items(): + for _process_index, process in self.processes.items():
| def _create_process_for_shard(self, shard: int): | ||
| # Find which process this shard belongs to and restart that process | ||
| for process_index, shards in self.process_to_shards_map.items(): | ||
| if shard in shards: | ||
| self._create_process_for_shards(process_index, shards) | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for any usages of _create_process_for_shard in the codebase
rg -n '_create_process_for_shard' --type=pyRepository: admin-coderabbit/coderabbit-sentry-mirror
Length of output: 671
Unused method _create_process_for_shard.
This method is defined at lines 127-132 but never called anywhere in the codebase. The process restart logic in _ensure_processes_alive (line 259) directly calls the plural version _create_process_for_shards instead. Remove this unused method.
🤖 Prompt for AI Agents
In `@src/sentry/spans/consumers/process/flusher.py` around lines 127 - 132, Remove
the dead helper method _create_process_for_shard: it's never called (the restart
logic in _ensure_processes_alive already uses _create_process_for_shards), so
delete the entire _create_process_for_shard definition and any related unused
imports or references; keep _create_process_for_shards and process_to_shards_map
intact to preserve the actual restart flow.
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | ||
| for flushed_segment in flushed_segments.values(): | ||
| if not flushed_segment.spans: | ||
| continue | ||
|
|
||
| spans = [span.payload for span in flushed_segment.spans] | ||
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | ||
| metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) | ||
| metrics.timing( | ||
| "spans.buffer.segment_size_bytes", | ||
| len(kafka_payload.value), | ||
| tags={"shard": shard_tag}, | ||
| ) | ||
| produce(kafka_payload) | ||
|
|
||
| with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent metric tag naming: shard vs shards.
The metric tags use inconsistent naming - "shard" on lines 185 and 195-196, but "shards" on line 199. This inconsistency could make metrics aggregation and querying more difficult.
Proposed fix for consistent tag naming
- with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
+ with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | |
| for flushed_segment in flushed_segments.values(): | |
| if not flushed_segment.spans: | |
| continue | |
| spans = [span.payload for span in flushed_segment.spans] | |
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | |
| metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) | |
| metrics.timing( | |
| "spans.buffer.segment_size_bytes", | |
| len(kafka_payload.value), | |
| tags={"shard": shard_tag}, | |
| ) | |
| produce(kafka_payload) | |
| with metrics.timer("spans.buffer.flusher.wait_produce"): | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): | |
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | |
| for flushed_segment in flushed_segments.values(): | |
| if not flushed_segment.spans: | |
| continue | |
| spans = [span.payload for span in flushed_segment.spans] | |
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | |
| metrics.timing( | |
| "spans.buffer.segment_size_bytes", | |
| len(kafka_payload.value), | |
| tags={"shard": shard_tag}, | |
| ) | |
| produce(kafka_payload) | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}): |
🤖 Prompt for AI Agents
In `@src/sentry/spans/consumers/process/flusher.py` around lines 185 - 199, The
metric tag key is inconsistent: earlier calls use tags={"shard": shard_tag}
while the wait_produce timer uses tags={"shards": shard_tag}; update the
metrics.timer call for "spans.buffer.flusher.wait_produce" to use the same tag
key ("shard") as the other metrics (i.e., change the tags dict for the
wait_produce metrics.timer to {"shard": shard_tag}) so all metrics (see the
metrics.timer and metrics.timing calls around flushed_segments, KafkaPayload,
and produce) use a consistent tag name.
This pull request was automatically created by
@coderabbitai/e2e-reviewer.Batch created pull request.
Summary by CodeRabbit
New Features
--flusher-processesCLI option to control the number of span processing workers.Improvements
Documentation