-
Notifications
You must be signed in to change notification settings - Fork 0
Automated Test: error-upsampling-race-condition #327
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,140 @@ | ||||||||||||||||||||||||||||||||||||
| from collections.abc import Sequence | ||||||||||||||||||||||||||||||||||||
| from types import ModuleType | ||||||||||||||||||||||||||||||||||||
| from typing import Any | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| from rest_framework.request import Request | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| from sentry import options | ||||||||||||||||||||||||||||||||||||
| from sentry.models.organization import Organization | ||||||||||||||||||||||||||||||||||||
| from sentry.search.events.types import SnubaParams | ||||||||||||||||||||||||||||||||||||
| from sentry.utils.cache import cache | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| def is_errors_query_for_error_upsampled_projects( | ||||||||||||||||||||||||||||||||||||
| snuba_params: SnubaParams, | ||||||||||||||||||||||||||||||||||||
| organization: Organization, | ||||||||||||||||||||||||||||||||||||
| dataset: ModuleType, | ||||||||||||||||||||||||||||||||||||
| request: Request, | ||||||||||||||||||||||||||||||||||||
| ) -> bool: | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| Determine if this query should use error upsampling transformations. | ||||||||||||||||||||||||||||||||||||
| Only applies when ALL projects are allowlisted and we're querying error events. | ||||||||||||||||||||||||||||||||||||
| Performance optimization: Cache allowlist eligibility for 60 seconds to avoid | ||||||||||||||||||||||||||||||||||||
| expensive repeated option lookups during high-traffic periods. This is safe | ||||||||||||||||||||||||||||||||||||
| because allowlist changes are infrequent and eventual consistency is acceptable. | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| cache_key = f"error_upsampling_eligible:{organization.id}:{hash(tuple(sorted(snuba_params.project_ids)))}" | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| # Check cache first for performance optimization | ||||||||||||||||||||||||||||||||||||
| cached_result = cache.get(cache_key) | ||||||||||||||||||||||||||||||||||||
| if cached_result is not None: | ||||||||||||||||||||||||||||||||||||
| return cached_result and _should_apply_sample_weight_transform(dataset, request) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| # Cache miss - perform fresh allowlist check | ||||||||||||||||||||||||||||||||||||
| is_eligible = _are_all_projects_error_upsampled(snuba_params.project_ids, organization) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| # Cache for 60 seconds to improve performance during traffic spikes | ||||||||||||||||||||||||||||||||||||
| cache.set(cache_key, is_eligible, 60) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| return is_eligible and _should_apply_sample_weight_transform(dataset, request) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| def _are_all_projects_error_upsampled( | ||||||||||||||||||||||||||||||||||||
| project_ids: Sequence[int], organization: Organization | ||||||||||||||||||||||||||||||||||||
| ) -> bool: | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| Check if ALL projects in the query are allowlisted for error upsampling. | ||||||||||||||||||||||||||||||||||||
| Only returns True if all projects pass the allowlist condition. | ||||||||||||||||||||||||||||||||||||
| NOTE: This function reads the allowlist configuration fresh each time, | ||||||||||||||||||||||||||||||||||||
| which means it can return different results between calls if the | ||||||||||||||||||||||||||||||||||||
| configuration changes during request processing. This is intentional | ||||||||||||||||||||||||||||||||||||
| to ensure we always have the latest configuration state. | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| if not project_ids: | ||||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| allowlist = options.get("issues.client_error_sampling.project_allowlist", []) | ||||||||||||||||||||||||||||||||||||
| if not allowlist: | ||||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| # All projects must be in the allowlist | ||||||||||||||||||||||||||||||||||||
| result = all(project_id in allowlist for project_id in project_ids) | ||||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| def invalidate_upsampling_cache(organization_id: int, project_ids: Sequence[int]) -> None: | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| Invalidate the upsampling eligibility cache for the given organization and projects. | ||||||||||||||||||||||||||||||||||||
| This should be called when the allowlist configuration changes to ensure | ||||||||||||||||||||||||||||||||||||
| cache consistency across the system. | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| cache_key = f"error_upsampling_eligible:{organization_id}:{hash(tuple(sorted(project_ids)))}" | ||||||||||||||||||||||||||||||||||||
| cache.delete(cache_key) | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+67
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same This must use the same deterministic key derivation as 🐛 Proposed fix- cache_key = f"error_upsampling_eligible:{organization_id}:{hash(tuple(sorted(project_ids)))}"
+ project_key = ",".join(str(pid) for pid in sorted(project_ids))
+ cache_key = f"error_upsampling_eligible:{organization_id}:{project_key}"📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| def transform_query_columns_for_error_upsampling( | ||||||||||||||||||||||||||||||||||||
| query_columns: Sequence[str], | ||||||||||||||||||||||||||||||||||||
| ) -> list[str]: | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| Transform aggregation functions to use sum(sample_weight) instead of count() | ||||||||||||||||||||||||||||||||||||
| for error upsampling. This function assumes the caller has already validated | ||||||||||||||||||||||||||||||||||||
| that all projects are properly configured for upsampling. | ||||||||||||||||||||||||||||||||||||
| Note: We rely on the database schema to ensure sample_weight exists for all | ||||||||||||||||||||||||||||||||||||
| events in allowlisted projects, so no additional null checks are needed here. | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| transformed_columns = [] | ||||||||||||||||||||||||||||||||||||
| for column in query_columns: | ||||||||||||||||||||||||||||||||||||
| column_lower = column.lower().strip() | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| if column_lower == "count()": | ||||||||||||||||||||||||||||||||||||
| # Transform to upsampled count - assumes sample_weight column exists | ||||||||||||||||||||||||||||||||||||
| # for all events in allowlisted projects per our data model requirements | ||||||||||||||||||||||||||||||||||||
| transformed_columns.append("upsampled_count() as count") | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||
| transformed_columns.append(column) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| return transformed_columns | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| def _should_apply_sample_weight_transform(dataset: Any, request: Request) -> bool: | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| Determine if we should apply sample_weight transformations based on the dataset | ||||||||||||||||||||||||||||||||||||
| and query context. Only apply for error events since sample_weight doesn't exist | ||||||||||||||||||||||||||||||||||||
| for transactions. | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| from sentry.snuba import discover, errors | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| # Always apply for the errors dataset | ||||||||||||||||||||||||||||||||||||
| if dataset == errors: | ||||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| from sentry.snuba import transactions | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| # Never apply for the transactions dataset | ||||||||||||||||||||||||||||||||||||
| if dataset == transactions: | ||||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| # For the discover dataset, check if we're querying errors specifically | ||||||||||||||||||||||||||||||||||||
| if dataset == discover: | ||||||||||||||||||||||||||||||||||||
| result = _is_error_focused_query(request) | ||||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| # For other datasets (spans, metrics, etc.), don't apply | ||||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| def _is_error_focused_query(request: Request) -> bool: | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| Check if a query is focused on error events. | ||||||||||||||||||||||||||||||||||||
| Reduced to only check for event.type:error to err on the side of caution. | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| query = request.GET.get("query", "").lower() | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| if "event.type:error" in query: | ||||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+130
to
+140
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Substring match for A query like 🐛 Proposed fix — guard against negation and partial matches def _is_error_focused_query(request: Request) -> bool:
query = request.GET.get("query", "").lower()
- if "event.type:error" in query:
+ # Ensure we match "event.type:error" but not "!event.type:error"
+ # or "event.type:error_something"
+ import re
+ if re.search(r'(?<!\!)event\.type:error\b', query):
return True
return False🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,7 @@ | |
| import zipfile | ||
| from base64 import b64encode | ||
| from binascii import hexlify | ||
| from collections.abc import Mapping, Sequence | ||
| from collections.abc import Mapping, MutableMapping, Sequence | ||
| from datetime import UTC, datetime | ||
| from enum import Enum | ||
| from hashlib import sha1 | ||
|
|
@@ -341,6 +341,22 @@ def _patch_artifact_manifest(path, org=None, release=None, project=None, extra_f | |
| return orjson.dumps(manifest).decode() | ||
|
|
||
|
|
||
| def _set_sample_rate_from_error_sampling(normalized_data: MutableMapping[str, Any]) -> None: | ||
| """Set 'sample_rate' on normalized_data if contexts.error_sampling.client_sample_rate is present and valid.""" | ||
| client_sample_rate = None | ||
| try: | ||
| client_sample_rate = ( | ||
| normalized_data.get("contexts", {}).get("error_sampling", {}).get("client_sample_rate") | ||
| ) | ||
| except Exception: | ||
| pass | ||
| if client_sample_rate: | ||
| try: | ||
| normalized_data["sample_rate"] = float(client_sample_rate) | ||
| except Exception: | ||
| pass | ||
|
|
||
|
Comment on lines
+344
to
+358
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Two concerns:
Proposed fix def _set_sample_rate_from_error_sampling(normalized_data: MutableMapping[str, Any]) -> None:
"""Set 'sample_rate' on normalized_data if contexts.error_sampling.client_sample_rate is present and valid."""
client_sample_rate = None
try:
client_sample_rate = (
normalized_data.get("contexts", {}).get("error_sampling", {}).get("client_sample_rate")
)
- except Exception:
+ except (AttributeError, TypeError):
pass
- if client_sample_rate:
+ if client_sample_rate is not None:
try:
normalized_data["sample_rate"] = float(client_sample_rate)
- except Exception:
+ except (TypeError, ValueError):
pass🧰 Tools🪛 Ruff (0.14.14)[error] 351-352: (S110) [warning] 351-351: Do not catch blind exception: (BLE001) [error] 356-357: (S110) [warning] 356-356: Do not catch blind exception: (BLE001) 🤖 Prompt for AI Agents |
||
|
|
||
| # TODO(dcramer): consider moving to something more scalable like factoryboy | ||
| class Factories: | ||
| @staticmethod | ||
|
|
@@ -1029,6 +1045,9 @@ def store_event( | |
| assert not errors, errors | ||
|
|
||
| normalized_data = manager.get_data() | ||
|
|
||
| _set_sample_rate_from_error_sampling(normalized_data) | ||
|
|
||
| event = None | ||
|
|
||
| # When fingerprint is present on transaction, inject performance problems | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hash()is non-deterministic across Python processes — cache key will vary per worker.Python 3 randomizes hash seeds by default (
PYTHONHASHSEED), sohash(tuple(sorted(project_ids)))produces different values in different processes. Sincecacheis likely a shared store (Redis/memcached), different workers will compute different keys for the same project set, causing cache misses and failed invalidations.Use a deterministic representation instead, e.g., a joined string of sorted IDs.
🐛 Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents