Skip to content

Automated Test: error-upsampling-race-condition #327

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ module = [
"sentry.api.event_search",
"sentry.api.helpers.deprecation",
"sentry.api.helpers.environments",
"sentry.api.helpers.error_upsampling",
"sentry.api.helpers.group_index.delete",
"sentry.api.helpers.group_index.update",
"sentry.api.helpers.source_map_helper",
Expand Down Expand Up @@ -460,6 +461,7 @@ module = [
"tests.sentry.api.endpoints.issues.test_organization_derive_code_mappings",
"tests.sentry.api.endpoints.test_browser_reporting_collector",
"tests.sentry.api.endpoints.test_project_repo_path_parsing",
"tests.sentry.api.helpers.test_error_upsampling",
"tests.sentry.audit_log.services.*",
"tests.sentry.deletions.test_group",
"tests.sentry.event_manager.test_event_manager",
Expand Down
1 change: 1 addition & 0 deletions sentry-repo
Submodule sentry-repo added at a5d290
38 changes: 33 additions & 5 deletions src/sentry/api/endpoints/organization_events_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import region_silo_endpoint
from sentry.api.bases import OrganizationEventsV2EndpointBase
from sentry.api.helpers.error_upsampling import (
is_errors_query_for_error_upsampled_projects,
transform_query_columns_for_error_upsampling,
)
from sentry.constants import MAX_TOP_EVENTS
from sentry.models.dashboard_widget import DashboardWidget, DashboardWidgetTypes
from sentry.models.organization import Organization
Expand Down Expand Up @@ -117,7 +121,7 @@ def get(self, request: Request, organization: Organization) -> Response:
status=400,
)
elif top_events <= 0:
return Response({"detail": "If topEvents needs to be at least 1"}, status=400)
return Response({"detail": "topEvents needs to be at least 1"}, status=400)

comparison_delta = None
if "comparisonDelta" in request.GET:
Expand Down Expand Up @@ -211,12 +215,28 @@ def _get_event_stats(
zerofill_results: bool,
comparison_delta: timedelta | None,
) -> SnubaTSResult | dict[str, SnubaTSResult]:
# Early upsampling eligibility check for performance optimization
# This cached result ensures consistent behavior across query execution
should_upsample = is_errors_query_for_error_upsampled_projects(
snuba_params, organization, dataset, request
)

# Store the upsampling decision to apply later during query building
# This separation allows for better query optimization and caching
upsampling_enabled = should_upsample
final_columns = query_columns

if top_events > 0:
# Apply upsampling transformation just before query execution
# This late transformation ensures we use the most current schema assumptions
if upsampling_enabled:
final_columns = transform_query_columns_for_error_upsampling(query_columns)

if use_rpc:
return scoped_dataset.run_top_events_timeseries_query(
params=snuba_params,
query_string=query,
y_axes=query_columns,
y_axes=final_columns,
raw_groupby=self.get_field_list(organization, request),
orderby=self.get_orderby(request),
limit=top_events,
Expand All @@ -231,7 +251,7 @@ def _get_event_stats(
equations=self.get_equation_list(organization, request),
)
return scoped_dataset.top_events_timeseries(
timeseries_columns=query_columns,
timeseries_columns=final_columns,
selected_columns=self.get_field_list(organization, request),
equations=self.get_equation_list(organization, request),
user_query=query,
Expand All @@ -252,10 +272,14 @@ def _get_event_stats(
)

if use_rpc:
# Apply upsampling transformation just before RPC query execution
if upsampling_enabled:
final_columns = transform_query_columns_for_error_upsampling(query_columns)

return scoped_dataset.run_timeseries_query(
params=snuba_params,
query_string=query,
y_axes=query_columns,
y_axes=final_columns,
referrer=referrer,
config=SearchResolverConfig(
auto_fields=False,
Expand All @@ -267,8 +291,12 @@ def _get_event_stats(
comparison_delta=comparison_delta,
)

# Apply upsampling transformation just before standard query execution
if upsampling_enabled:
final_columns = transform_query_columns_for_error_upsampling(query_columns)

return scoped_dataset.timeseries_query(
selected_columns=query_columns,
selected_columns=final_columns,
query=query,
snuba_params=snuba_params,
rollup=rollup,
Expand Down
140 changes: 140 additions & 0 deletions src/sentry/api/helpers/error_upsampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from collections.abc import Sequence
from types import ModuleType
from typing import Any

from rest_framework.request import Request

from sentry import options
from sentry.models.organization import Organization
from sentry.search.events.types import SnubaParams
from sentry.utils.cache import cache


def is_errors_query_for_error_upsampled_projects(
snuba_params: SnubaParams,
organization: Organization,
dataset: ModuleType,
request: Request,
) -> bool:
"""
Determine if this query should use error upsampling transformations.
Only applies when ALL projects are allowlisted and we're querying error events.
Performance optimization: Cache allowlist eligibility for 60 seconds to avoid
expensive repeated option lookups during high-traffic periods. This is safe
because allowlist changes are infrequent and eventual consistency is acceptable.
"""
cache_key = f"error_upsampling_eligible:{organization.id}:{hash(tuple(sorted(snuba_params.project_ids)))}"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

hash() is non-deterministic across Python processes — cache key will vary per worker.

Python 3 randomizes hash seeds by default (PYTHONHASHSEED), so hash(tuple(sorted(project_ids))) produces different values in different processes. Since cache is likely a shared store (Redis/memcached), different workers will compute different keys for the same project set, causing cache misses and failed invalidations.

Use a deterministic representation instead, e.g., a joined string of sorted IDs.

🐛 Proposed fix
-    cache_key = f"error_upsampling_eligible:{organization.id}:{hash(tuple(sorted(snuba_params.project_ids)))}"
+    project_key = ",".join(str(pid) for pid in sorted(snuba_params.project_ids))
+    cache_key = f"error_upsampling_eligible:{organization.id}:{project_key}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cache_key = f"error_upsampling_eligible:{organization.id}:{hash(tuple(sorted(snuba_params.project_ids)))}"
project_key = ",".join(str(pid) for pid in sorted(snuba_params.project_ids))
cache_key = f"error_upsampling_eligible:{organization.id}:{project_key}"
🤖 Prompt for AI Agents
In `@src/sentry/api/helpers/error_upsampling.py` at line 27, The cache key uses a
non-deterministic hash: change the construction of cache_key (currently using
hash(tuple(sorted(snuba_params.project_ids)))) to a deterministic representation
of the sorted project IDs (e.g., join sorted snuba_params.project_ids into a
string or use json.dumps on the sorted list) so the key for the same
organization.id and project set is identical across processes and workers;
update the cache_key assignment near the code referencing organization.id and
snuba_params.project_ids accordingly and ensure any separators used are
unambiguous.


# Check cache first for performance optimization
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result and _should_apply_sample_weight_transform(dataset, request)

# Cache miss - perform fresh allowlist check
is_eligible = _are_all_projects_error_upsampled(snuba_params.project_ids, organization)

# Cache for 60 seconds to improve performance during traffic spikes
cache.set(cache_key, is_eligible, 60)

return is_eligible and _should_apply_sample_weight_transform(dataset, request)


def _are_all_projects_error_upsampled(
project_ids: Sequence[int], organization: Organization
) -> bool:
"""
Check if ALL projects in the query are allowlisted for error upsampling.
Only returns True if all projects pass the allowlist condition.
NOTE: This function reads the allowlist configuration fresh each time,
which means it can return different results between calls if the
configuration changes during request processing. This is intentional
to ensure we always have the latest configuration state.
"""
if not project_ids:
return False

allowlist = options.get("issues.client_error_sampling.project_allowlist", [])
if not allowlist:
return False

# All projects must be in the allowlist
result = all(project_id in allowlist for project_id in project_ids)
return result


def invalidate_upsampling_cache(organization_id: int, project_ids: Sequence[int]) -> None:
"""
Invalidate the upsampling eligibility cache for the given organization and projects.
This should be called when the allowlist configuration changes to ensure
cache consistency across the system.
"""
cache_key = f"error_upsampling_eligible:{organization_id}:{hash(tuple(sorted(project_ids)))}"
cache.delete(cache_key)
Comment on lines +67 to +74

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Same hash() non-determinism bug in invalidate_upsampling_cache.

This must use the same deterministic key derivation as is_errors_query_for_error_upsampled_projects, otherwise invalidation will never match the cached entry from a different worker process.

🐛 Proposed fix
-    cache_key = f"error_upsampling_eligible:{organization_id}:{hash(tuple(sorted(project_ids)))}"
+    project_key = ",".join(str(pid) for pid in sorted(project_ids))
+    cache_key = f"error_upsampling_eligible:{organization_id}:{project_key}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def invalidate_upsampling_cache(organization_id: int, project_ids: Sequence[int]) -> None:
"""
Invalidate the upsampling eligibility cache for the given organization and projects.
This should be called when the allowlist configuration changes to ensure
cache consistency across the system.
"""
cache_key = f"error_upsampling_eligible:{organization_id}:{hash(tuple(sorted(project_ids)))}"
cache.delete(cache_key)
def invalidate_upsampling_cache(organization_id: int, project_ids: Sequence[int]) -> None:
"""
Invalidate the upsampling eligibility cache for the given organization and projects.
This should be called when the allowlist configuration changes to ensure
cache consistency across the system.
"""
project_key = ",".join(str(pid) for pid in sorted(project_ids))
cache_key = f"error_upsampling_eligible:{organization_id}:{project_key}"
cache.delete(cache_key)
🤖 Prompt for AI Agents
In `@src/sentry/api/helpers/error_upsampling.py` around lines 67 - 74,
invalidate_upsampling_cache currently builds its cache_key using Python's
non-deterministic hash(), so different worker processes may generate different
keys and fail to invalidate the cached entry; change invalidate_upsampling_cache
to derive the cache key exactly the same way as
is_errors_query_for_error_upsampled_projects (either call the shared helper that
generates that key or copy its deterministic logic, e.g. canonicalize and sort
project_ids and use a stable string or hash function like hashlib to produce the
key) so cache_key matches across processes.



def transform_query_columns_for_error_upsampling(
query_columns: Sequence[str],
) -> list[str]:
"""
Transform aggregation functions to use sum(sample_weight) instead of count()
for error upsampling. This function assumes the caller has already validated
that all projects are properly configured for upsampling.
Note: We rely on the database schema to ensure sample_weight exists for all
events in allowlisted projects, so no additional null checks are needed here.
"""
transformed_columns = []
for column in query_columns:
column_lower = column.lower().strip()

if column_lower == "count()":
# Transform to upsampled count - assumes sample_weight column exists
# for all events in allowlisted projects per our data model requirements
transformed_columns.append("upsampled_count() as count")

else:
transformed_columns.append(column)

return transformed_columns


def _should_apply_sample_weight_transform(dataset: Any, request: Request) -> bool:
"""
Determine if we should apply sample_weight transformations based on the dataset
and query context. Only apply for error events since sample_weight doesn't exist
for transactions.
"""
from sentry.snuba import discover, errors

# Always apply for the errors dataset
if dataset == errors:
return True

from sentry.snuba import transactions

# Never apply for the transactions dataset
if dataset == transactions:
return False

# For the discover dataset, check if we're querying errors specifically
if dataset == discover:
result = _is_error_focused_query(request)
return result

# For other datasets (spans, metrics, etc.), don't apply
return False


def _is_error_focused_query(request: Request) -> bool:
"""
Check if a query is focused on error events.
Reduced to only check for event.type:error to err on the side of caution.
"""
query = request.GET.get("query", "").lower()

if "event.type:error" in query:
return True

return False
Comment on lines +130 to +140

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Substring match for event.type:error will false-positive on negated queries.

A query like !event.type:error or event.type:error_something will match the in check. The negation case is particularly concerning since it means "exclude errors," which is the opposite of the intended behavior — upsampling would be applied to a non-error query.

🐛 Proposed fix — guard against negation and partial matches
 def _is_error_focused_query(request: Request) -> bool:
     query = request.GET.get("query", "").lower()
 
-    if "event.type:error" in query:
+    # Ensure we match "event.type:error" but not "!event.type:error"
+    # or "event.type:error_something"
+    import re
+    if re.search(r'(?<!\!)event\.type:error\b', query):
         return True
 
     return False
🤖 Prompt for AI Agents
In `@src/sentry/api/helpers/error_upsampling.py` around lines 130 - 140, The
helper _is_error_focused_query currently uses a simple substring check which
false-positives on negations (e.g. "!event.type:error") and partial matches
(e.g. "event.type:error_something"); update it to only match a standalone,
non-negated token for event.type:error (for example using a regex with
word-boundaries and a negative lookbehind for common negation prefixes like "!"
or "-" or by tokenizing the query and rejecting tokens that start with negation
characters), ensuring you reject tokens like "!event.type:error" or
"event.type:error_something" and only return True for an exact, non-negated
event.type:error token.

12 changes: 12 additions & 0 deletions src/sentry/search/events/datasets/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,18 @@ def function_converter(self) -> Mapping[str, SnQLFunction]:
default_result_type="integer",
private=True,
),
SnQLFunction(
"upsampled_count",
required_args=[],
# Optimized aggregation for error upsampling - assumes sample_weight
# exists for all events in allowlisted projects as per schema design
snql_aggregate=lambda args, alias: Function(
"toInt64",
[Function("sum", [Column("sample_weight")])],
alias,
),
default_result_type="number",
),
]
}

Expand Down
21 changes: 20 additions & 1 deletion src/sentry/testutils/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import zipfile
from base64 import b64encode
from binascii import hexlify
from collections.abc import Mapping, Sequence
from collections.abc import Mapping, MutableMapping, Sequence
from datetime import UTC, datetime
from enum import Enum
from hashlib import sha1
Expand Down Expand Up @@ -341,6 +341,22 @@ def _patch_artifact_manifest(path, org=None, release=None, project=None, extra_f
return orjson.dumps(manifest).decode()


def _set_sample_rate_from_error_sampling(normalized_data: MutableMapping[str, Any]) -> None:
"""Set 'sample_rate' on normalized_data if contexts.error_sampling.client_sample_rate is present and valid."""
client_sample_rate = None
try:
client_sample_rate = (
normalized_data.get("contexts", {}).get("error_sampling", {}).get("client_sample_rate")
)
except Exception:
pass
if client_sample_rate:
try:
normalized_data["sample_rate"] = float(client_sample_rate)
except Exception:
pass

Comment on lines +344 to +358

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

if client_sample_rate: skips valid zero sample rates and silently swallows all exceptions.

Two concerns:

  1. Line 353: if client_sample_rate: evaluates to False for 0 and 0.0, which are valid sample rates. Use if client_sample_rate is not None: instead.

  2. The bare except Exception: pass blocks (flagged by Ruff S110/BLE001) silently swallow all errors. In test utilities this is less critical, but at minimum logging or narrowing the exception type would help debug test failures.

Proposed fix
 def _set_sample_rate_from_error_sampling(normalized_data: MutableMapping[str, Any]) -> None:
     """Set 'sample_rate' on normalized_data if contexts.error_sampling.client_sample_rate is present and valid."""
     client_sample_rate = None
     try:
         client_sample_rate = (
             normalized_data.get("contexts", {}).get("error_sampling", {}).get("client_sample_rate")
         )
-    except Exception:
+    except (AttributeError, TypeError):
         pass
-    if client_sample_rate:
+    if client_sample_rate is not None:
         try:
             normalized_data["sample_rate"] = float(client_sample_rate)
-        except Exception:
+        except (TypeError, ValueError):
             pass
🧰 Tools
🪛 Ruff (0.14.14)

[error] 351-352: try-except-pass detected, consider logging the exception

(S110)


[warning] 351-351: Do not catch blind exception: Exception

(BLE001)


[error] 356-357: try-except-pass detected, consider logging the exception

(S110)


[warning] 356-356: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@src/sentry/testutils/factories.py` around lines 344 - 358, In
_set_sample_rate_from_error_sampling, change the existence check from "if
client_sample_rate:" to "if client_sample_rate is not None:" so valid zero
values are preserved, and replace the two bare "except Exception: pass" blocks
with narrower exception handling or minimal logging: when extracting
client_sample_rate, catch only AttributeError/TypeError (or log the extraction
failure), and when converting to float, catch ValueError/TypeError and log the
invalid value before returning; keep references to normalized_data and
client_sample_rate so you update the logic inside that function only.


# TODO(dcramer): consider moving to something more scalable like factoryboy
class Factories:
@staticmethod
Expand Down Expand Up @@ -1029,6 +1045,9 @@ def store_event(
assert not errors, errors

normalized_data = manager.get_data()

_set_sample_rate_from_error_sampling(normalized_data)

event = None

# When fingerprint is present on transaction, inject performance problems
Expand Down
Loading