-
Notifications
You must be signed in to change notification settings - Fork 0
Automated Test: span-flusher-multiprocess #315
Conversation
📝 WalkthroughWalkthroughThe PR introduces multi-process parallelism to the span flusher, allowing configurable distribution of shard processing across multiple processes. Configuration is added via CLI options, factory parameter propagation, and comprehensive flusher refactoring to manage per-process buffers, health tracking, and backpressure handling. Tests updated accordingly. Changes
Sequence DiagramsequenceDiagram
participant CLI as CLI Argument Parser
participant Factory as ProcessSpansStrategyFactory
participant Flusher as SpanFlusher
participant ProcessMgr as Process Manager
participant Worker as Worker Process
participant Buffer as SpansBuffer
CLI->>Factory: Initialize with max_processes=N
Factory->>Flusher: Create instance with flusher_processes=N
Flusher->>ProcessMgr: Create process-to-shards mapping
Flusher->>ProcessMgr: Initialize per-process structures<br/>(buffers, health, backpressure)
loop For each shard group
ProcessMgr->>Worker: Spawn worker process<br/>with assigned shards
Worker->>Buffer: Create dedicated SpansBuffer
end
Note over Flusher,Worker: Message Processing Loop
Flusher->>Flusher: Receive message
Flusher->>Flusher: Check backpressure<br/>across all processes
Flusher->>Buffer: Route to correct<br/>process buffer
Flusher->>Worker: Signal buffer availability
Worker->>Buffer: Process and flush segments
Worker->>Flusher: Update per-process<br/>health_since/backpressure_since
Note over Flusher: Health & Lifecycle
Flusher->>Worker: Periodic health check
alt Process unhealthy
Flusher->>Worker: Terminate
Flusher->>ProcessMgr: Restart for shard group
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/sentry/consumers/__init__.py`:
- Around line 430-438: The --flusher-processes click.Option currently allows 0
or negative values which lead to silent misconfiguration; update the Option
definition for "flusher_processes" (the click.Option in the "click_options"
list) to enforce a minimum of 1—either by using click.IntRange(min=1) as the
type or adding min=1/type validation or a click callback that raises a
click.BadParameter when value < 1; ensure this validation is applied alongside
multiprocessing_options so the flusher code that reads flusher_processes (the
path using max_processes or len(...)) always receives a positive integer.
In `@src/sentry/spans/consumers/process/flusher.py`:
- Line 199: The metric invocation using metrics.timer for
"spans.buffer.flusher.wait_produce" has an inconsistent tag key ("shards")
compared to other calls that use "shard"; update the tags argument in that
metrics.timer call to use tags={"shard": shard_tag} so the key matches the other
uses (see metrics.timer, shard_tag, and the metric name
"spans.buffer.flusher.wait_produce") to keep dashboard aggregation consistent.
🧹 Nitpick comments (2)
src/sentry/spans/consumers/process/flusher.py (2)
336-347: Unused loop variableprocess_index.Per the static analysis hint,
process_indexis not used within the loop body. Use_or iterate over.values()directly.Proposed fix
- for process_index, process in self.processes.items(): + for process in self.processes.values():
127-132: Remove unused_create_process_for_shardmethod.This method is never called anywhere in the codebase. A search across the repository shows only the method definition exists with no callers. The similar method
_create_process_for_shards(plural) is the one actually in use. Since there's no TODO comment or indication of future use, this appears to be dead code from an earlier API design and should be removed.
| "click_options": [ | ||
| *multiprocessing_options(default_max_batch_size=100), | ||
| click.Option( | ||
| ["--flusher-processes", "flusher_processes"], | ||
| default=1, | ||
| type=int, | ||
| help="Maximum number of processes for the span flusher. Defaults to 1.", | ||
| ), | ||
| ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a minimum-value constraint for --flusher-processes.
A value of 0 is silently treated as "use default" (due to max_processes or len(...) in the flusher), and a negative value would result in no processes being created at all — a silent failure with no error message. Adding a min=1 constraint or a click callback to validate would prevent misconfiguration.
Proposed fix
click.Option(
["--flusher-processes", "flusher_processes"],
default=1,
type=int,
+ callback=lambda ctx, param, value: value if value >= 1 else (_ for _ in ()).throw(click.BadParameter("--flusher-processes must be >= 1")),
help="Maximum number of processes for the span flusher. Defaults to 1.",
),Or more cleanly with click.IntRange:
click.Option(
["--flusher-processes", "flusher_processes"],
default=1,
- type=int,
+ type=click.IntRange(min=1),
help="Maximum number of processes for the span flusher. Defaults to 1.",
),🤖 Prompt for AI Agents
In `@src/sentry/consumers/__init__.py` around lines 430 - 438, The
--flusher-processes click.Option currently allows 0 or negative values which
lead to silent misconfiguration; update the Option definition for
"flusher_processes" (the click.Option in the "click_options" list) to enforce a
minimum of 1—either by using click.IntRange(min=1) as the type or adding
min=1/type validation or a click callback that raises a click.BadParameter when
value < 1; ensure this validation is applied alongside multiprocessing_options
so the flusher code that reads flusher_processes (the path using max_processes
or len(...)) always receives a positive integer.
| produce(kafka_payload) | ||
|
|
||
| with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent metrics tag name: "shards" (plural) vs "shard" (singular) elsewhere.
Lines 185 and 192–196 use tags={"shard": shard_tag}, but this line uses tags={"shards": shard_tag}. This will cause the metric spans.buffer.flusher.wait_produce to be tracked under a different tag key, breaking dashboard queries and aggregation.
Proposed fix
- with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
+ with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}): |
🤖 Prompt for AI Agents
In `@src/sentry/spans/consumers/process/flusher.py` at line 199, The metric
invocation using metrics.timer for "spans.buffer.flusher.wait_produce" has an
inconsistent tag key ("shards") compared to other calls that use "shard"; update
the tags argument in that metrics.timer call to use tags={"shard": shard_tag} so
the key matches the other uses (see metrics.timer, shard_tag, and the metric
name "spans.buffer.flusher.wait_produce") to keep dashboard aggregation
consistent.
This pull request was automatically created by
@coderabbitai/e2e-reviewer.Batch created pull request.
Summary by CodeRabbit
New Features
--flusher-processesCLI option to configure the maximum number of parallel processes for span flushing (defaults to 1).Documentation