-
Notifications
You must be signed in to change notification settings - Fork 0
Automated Test: span-flusher-multiprocess #333
Conversation
📝 WalkthroughWalkthroughThis change introduces per-shard multiprocessing to the span flusher. A new Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI/Config
participant Factory as ProcessSpansStrategyFactory
participant Flusher as SpanFlusher
participant Proc1 as Worker Process 1
participant Proc2 as Worker Process 2
participant Buf as SpansBuffer (per-shard)
CLI->>Factory: Initialize with flusher_processes=2
Factory->>Flusher: Create with max_processes=2
Flusher->>Flusher: Compute shard-to-process mapping
Flusher->>Flusher: Distribute 4 shards across 2 processes
Flusher->>Proc1: Create for shards [0,1]
Flusher->>Proc2: Create for shards [2,3]
Proc1->>Buf: Operate on dedicated buffer (shards 0,1)
Proc2->>Buf: Operate on dedicated buffer (shards 2,3)
Flusher->>Flusher: Track per-process health & backpressure
Flusher->>Proc1: Emit shard-tagged metrics
Flusher->>Proc2: Emit shard-tagged metrics
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/sentry/consumers/__init__.py`:
- Around line 430-438: The click option "--flusher-processes" (click.Option with
dest "flusher_processes") lacks validation allowing 0 or negative values; update
the option to enforce a minimum of 1 by adding a callback validator on that
click.Option (or clamp the value in the span flusher factory where
flusher_processes is consumed) so any input < 1 is rejected or coerced to 1, and
ensure error text clearly states it must be >= 1.
In `@src/sentry/spans/consumers/process/flusher.py`:
- Around line 185-199: The metric tag key is inconsistent: change the tags on
the metrics.timer call that currently uses tags={"shards": shard_tag} to use the
same key as the others (tags={"shard": shard_tag}) so
metrics.timer("spans.buffer.flusher.wait_produce", ...) aligns with the earlier
metrics.timer and metrics.timing calls that use shard_tag; update the call near
the produce loop that references wait_produce to use "shard" (symbols to locate:
metrics.timer, metrics.timing, produce, flushed_segments, shard_tag,
KafkaPayload).
- Around line 336-347: The loop over self.processes currently breaks out when
deadline expires, which skips calling process.terminate() for remaining entries;
change the logic so that when remaining_time <= 0 you do not break the for-loop
but instead skip the waiting while-loop and continue to the termination step for
each remaining process. Concretely, in the for process_index, process in
self.processes.items() loop remove the break and implement a branch (e.g., if
remaining_time <= 0: skip waiting) so the subsequent isinstance(process,
multiprocessing.Process): process.terminate() still runs for every process even
after the deadline has passed.
🧹 Nitpick comments (3)
tests/sentry/spans/consumers/process/test_consumer.py (1)
60-62: Sleep-based synchronization is fragile.The
time.sleep(0.1)to allow flusher threads to process drift changes could flake on slow CI. Consider polling with a bounded retry loop instead.src/sentry/spans/consumers/process/flusher.py (2)
337-337: Unused loop variableprocess_index.Per static analysis,
process_indexis unused in the loop body. Use_or.values().♻️ Proposed fix
- for process_index, process in self.processes.items(): + for process in self.processes.values():
127-132: Remove unused_create_process_for_shardmethod.This method is defined but never called within the codebase. Remove it per the YAGNI principle.
| "click_options": [ | ||
| *multiprocessing_options(default_max_batch_size=100), | ||
| click.Option( | ||
| ["--flusher-processes", "flusher_processes"], | ||
| default=1, | ||
| type=int, | ||
| help="Maximum number of processes for the span flusher. Defaults to 1.", | ||
| ), | ||
| ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No validation that --flusher-processes is ≥ 1.
A user could pass --flusher-processes 0 or a negative value, which would cause unexpected behavior downstream (e.g., min(0, num_shards) → 0 processes, leading to empty dicts and no flushing). Consider adding a callback or clamping in the factory.
🛡️ Proposed fix: add a callback validator
+def _validate_positive_int(ctx, param, value):
+ if value < 1:
+ raise click.BadParameter("must be >= 1")
+ return value
+
"process-spans": {
"topic": Topic.INGEST_SPANS,
"dlq_topic": Topic.INGEST_SPANS_DLQ,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
"click_options": [
*multiprocessing_options(default_max_batch_size=100),
click.Option(
["--flusher-processes", "flusher_processes"],
default=1,
type=int,
+ callback=_validate_positive_int,
help="Maximum number of processes for the span flusher. Defaults to 1.",
),
],
},🤖 Prompt for AI Agents
In `@src/sentry/consumers/__init__.py` around lines 430 - 438, The click option
"--flusher-processes" (click.Option with dest "flusher_processes") lacks
validation allowing 0 or negative values; update the option to enforce a minimum
of 1 by adding a callback validator on that click.Option (or clamp the value in
the span flusher factory where flusher_processes is consumed) so any input < 1
is rejected or coerced to 1, and ensure error text clearly states it must be >=
1.
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | ||
| for flushed_segment in flushed_segments.values(): | ||
| if not flushed_segment.spans: | ||
| continue | ||
|
|
||
| spans = [span.payload for span in flushed_segment.spans] | ||
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | ||
| metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) | ||
| metrics.timing( | ||
| "spans.buffer.segment_size_bytes", | ||
| len(kafka_payload.value), | ||
| tags={"shard": shard_tag}, | ||
| ) | ||
| produce(kafka_payload) | ||
|
|
||
| with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent metric tag key: "shard" vs "shards".
Lines 185 and 195 use tags={"shard": shard_tag} (singular), while line 199 uses tags={"shards": shard_tag} (plural). This will create split metrics in your dashboards.
🔧 Proposed fix
- with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
+ with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | |
| for flushed_segment in flushed_segments.values(): | |
| if not flushed_segment.spans: | |
| continue | |
| spans = [span.payload for span in flushed_segment.spans] | |
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | |
| metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) | |
| metrics.timing( | |
| "spans.buffer.segment_size_bytes", | |
| len(kafka_payload.value), | |
| tags={"shard": shard_tag}, | |
| ) | |
| produce(kafka_payload) | |
| with metrics.timer("spans.buffer.flusher.wait_produce"): | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): | |
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | |
| for flushed_segment in flushed_segments.values(): | |
| if not flushed_segment.spans: | |
| continue | |
| spans = [span.payload for span in flushed_segment.spans] | |
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | |
| metrics.timing( | |
| "spans.buffer.segment_size_bytes", | |
| len(kafka_payload.value), | |
| tags={"shard": shard_tag}, | |
| ) | |
| produce(kafka_payload) | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}): |
🤖 Prompt for AI Agents
In `@src/sentry/spans/consumers/process/flusher.py` around lines 185 - 199, The
metric tag key is inconsistent: change the tags on the metrics.timer call that
currently uses tags={"shards": shard_tag} to use the same key as the others
(tags={"shard": shard_tag}) so
metrics.timer("spans.buffer.flusher.wait_produce", ...) aligns with the earlier
metrics.timer and metrics.timing calls that use shard_tag; update the call near
the produce loop that references wait_produce to use "shard" (symbols to locate:
metrics.timer, metrics.timing, produce, flushed_segments, shard_tag,
KafkaPayload).
| # Wait for all processes to finish | ||
| for process_index, process in self.processes.items(): | ||
| if deadline is not None: | ||
| remaining_time = deadline - time.time() | ||
| if remaining_time <= 0: | ||
| break | ||
|
|
||
| while process.is_alive() and (deadline is None or deadline > time.time()): | ||
| time.sleep(0.1) | ||
|
|
||
| if isinstance(self.process, multiprocessing.Process): | ||
| self.process.terminate() | ||
| if isinstance(process, multiprocessing.Process): | ||
| process.terminate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break on deadline expiry skips terminate() for remaining processes, risking zombie processes.
If the deadline is exceeded while waiting for the first process, the break on line 341 exits the loop entirely, leaving remaining processes un-terminated. Since stopped.value = True is a cooperative signal, a stuck process could linger.
🐛 Proposed fix: terminate all processes regardless of deadline
def join(self, timeout: float | None = None):
# set stopped flag first so we can "flush" the background threads while
# next_step is also shutting down. we can do two things at once!
self.stopped.value = True
deadline = time.time() + timeout if timeout else None
self.next_step.join(timeout)
# Wait for all processes to finish
- for process_index, process in self.processes.items():
+ for process in self.processes.values():
if deadline is not None:
remaining_time = deadline - time.time()
if remaining_time <= 0:
break
while process.is_alive() and (deadline is None or deadline > time.time()):
time.sleep(0.1)
+ # Terminate all remaining processes regardless of deadline
+ for process in self.processes.values():
if isinstance(process, multiprocessing.Process):
process.terminate()🧰 Tools
🪛 Ruff (0.14.14)
[warning] 337-337: Loop control variable process_index not used within loop body
Rename unused process_index to _process_index
(B007)
🤖 Prompt for AI Agents
In `@src/sentry/spans/consumers/process/flusher.py` around lines 336 - 347, The
loop over self.processes currently breaks out when deadline expires, which skips
calling process.terminate() for remaining entries; change the logic so that when
remaining_time <= 0 you do not break the for-loop but instead skip the waiting
while-loop and continue to the termination step for each remaining process.
Concretely, in the for process_index, process in self.processes.items() loop
remove the break and implement a branch (e.g., if remaining_time <= 0: skip
waiting) so the subsequent isinstance(process, multiprocessing.Process):
process.terminate() still runs for every process even after the deadline has
passed.
This pull request was automatically created by
@coderabbitai/e2e-reviewer.Batch created pull request.
Summary by CodeRabbit
Release Notes
New Features
--flusher-processesCLI flag for process-spans consumer to control the maximum number of parallel processing instances (default: 1).Documentation