diff --git a/CHANGELOG.md b/CHANGELOG.md index aa1964e3..36d603a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,16 @@ All notable user-facing changes to this project will be documented in this file. ## Unreleased +- Node version is no longer edited by hand in the portal — it is detected automatically from your running node and shown read-only on your profile, so it always reflects reality instead of whatever was last typed in + +- Node upgrades are now detected automatically from live node metrics instead of relying on validators to edit their profile: when the network's first validator is seen running a new stable release, that version becomes the official upgrade target (the number of validators required to confirm a new version is configurable via a setting) and all validators are notified, each validator's running version is kept in sync from what their node actually reports (it can only move forward, so a brief reporting gap never shows a downgrade), and the node-upgrade points (more for upgrading sooner) are awarded automatically once a validator is seen on the target version — no manual submission or steward review needed. Only versions reported by known, non-banned validator nodes count, and stewards can pause the automatic points at any time by removing the node-upgrade multiplier + +- The Wall of Shame now shows how many consecutive days each validator has gone without being shamed — a clean-uptime streak per node and per network (Asimov and Bradbury), alongside the reasons a streak was broken. An operator's network streak stays alive as long as at least one of their nodes was healthy that day, days spent quarantined or inactive break it, and days where the monitoring itself had no data are simply skipped so an outage on our side never resets anyone's streak + +- The portal now keeps a daily history of each validator's observability: every Grafana sync records whether each node was reporting metrics, reporting logs, and running an up-to-date version, and rolls it up per day (a day counts as shamed if the node was shamed at any point that day). This history powers the uptime streaks above and is the foundation for future days-in-shame reporting; it starts accumulating from deploy since past days were never recorded + +- The grace period before a validator running an outdated node version is flagged on the Wall of Shame is now configurable via a setting (default three days), instead of a fixed three-day window baked into the code + - Grafana dashboards can now read a dedicated minimal validator roster endpoint that lists every validator wallet with its network, on-chain node address, display name, status, operator address, and (for visible operators) linked account, kept intentionally small and separate from the Wall of Shame so monitoring can join validator identity onto live node metrics (2aaf68f) - Contribution types can now require evidence URLs in groups, so a single submission can be made to provide both a contract code link (GenLayer Studio import or GitHub repository) and a deployed-contract explorer link (Asimov, Bradbury, or Studio explorer) together; GenLayer explorer contract addresses are now recognized as their own evidence type (25f90869) diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index b2c5217f..19909bc8 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -45,13 +45,13 @@ backend/ - Validator model with node_version field (OneToOne with User) - Custom UserManager for email-based auth - **Views**: `users/views.py` - - `/api/v1/users/me/` - GET/PATCH current user profile (name and node_version editable) + - `/api/v1/users/me/` - GET/PATCH current user profile (name/description/website/socials editable; node_version is NOT editable — Grafana-sourced, display only) - `/api/v1/users/by-address/{address}/` - Get user by wallet address - `/api/v1/users/validators/` - Get validator list from blockchain - **Serializers**: `users/serializers.py` - UserSerializer - Full user data including validator info - ValidatorSerializer - Validator node version and target matching - - UserProfileUpdateSerializer - Allows name and node_version updates + - UserProfileUpdateSerializer - Allows name/description/website/socials updates (node_version removed — Grafana is source of truth) - UserCreateSerializer - Registration ### Authentication @@ -101,7 +101,21 @@ backend/ ### Node Upgrade (Sub-app) - **Models**: `contributions/node_upgrade/models.py` - - TargetNodeVersion - Active target version for node upgrades + - TargetNodeVersion - Active target version for node upgrades. Per-network, single + `is_active` per network. The version-shame grace period — how many days after + `target_date` a node still behind is marked version "shame" — is the global + `settings.NODE_VERSION_SHAME_GRACE_DAYS` (default 3, env-overridable), not a per-target field. +- **Version verdict**: `validators/version_status.py::compute_version_status(wallet, target, now, node_version=...)` + is the shared helper (used by the Wall of Shame view and the Grafana sync) that returns + `on`/`warning`/`shame`/`unknown` using the `NODE_VERSION_SHAME_GRACE_DAYS` setting. The viewset's + `ValidatorWalletViewSet._version_context` delegates to it; the Grafana sync passes the + Prometheus-observed version explicitly. +- **Node versions are Grafana-sourced.** Target creation and the `node-upgrade` award are + driven automatically by the Grafana sync (`GrafanaValidatorStatusService._sync_node_versions` + / `_award_node_upgrade`); the portal no longer lets users edit their node version. The old + `NodeVersionMixin.save()` auto-submission path has been removed — `NodeVersionMixin` now only + holds the version fields + validation + comparison helpers, and `calculate_early_upgrade_bonus` + (reused by the Grafana award). Dedup on the `version {v} [{network}]` notes key is preserved. - **Admin**: `contributions/node_upgrade/admin.py` - TargetNodeVersion admin interface - **Views**: `contributions/views.py` @@ -227,17 +241,20 @@ backend/ ### Validators - **Models**: `validators/models.py` - ValidatorWallet - Synced validator wallet metadata per network. Now also stores Wall of Shame observability state: `metrics_status`, `logs_status` (both `on` / `shame` / `unknown`), and `last_grafana_check_at`. - - ValidatorWalletStatusSnapshot - Daily wallet status snapshots for uptime lookback + - ValidatorWalletStatusSnapshot - Daily wallet rollup. On-chain `status` (owned by the on-chain sync, for uptime lookback) PLUS the latched observability verdict written by the Grafana sync: `metrics_status` / `logs_status` / `version_status`, `metrics_samples` / `logs_samples` counters, and `node_version`. **Metrics and logs latch pessimistically** (worst-of-day: shame at ANY observation → the day is shame). **Version latches optimistically** (best-of-day: a single up-to-date observation → the day is OK, since once a node upgrades that day an earlier stale reading must not shame it; `on` > `warning` > `shame`). A day is "clean" only if `status=='active'` and both sample counters are ≥1 and neither metrics nor logs is `shame` and version is not `shame`. The two syncs write disjoint columns (bulk_create update_conflicts on `(wallet, date)`), so neither clobbers the other. + - ValidatorWalletObservation - Append-only raw log; one row per active wallet per Grafana sync run (`observed_at`, `onchain_status`, `metrics_status`, `logs_status`, `version_status`, `node_version`). Source of truth the daily rollup is materialised from and rebuildable via `rebuild_daily_snapshots`. - SyncLock - Database-backed sync coordination row with owner token for cross-worker locking - **Services**: `validators/grafana_service.py` - - GrafanaValidatorStatusService - Polls Grafana Cloud (`/api/ds/query`) Prometheus + Loki datasources and updates `ValidatorWallet.metrics_status` / `logs_status` for `status='active'` wallets, per network. Used by the Wall of Shame cron. + - GrafanaValidatorStatusService - Polls Grafana Cloud (`/api/ds/query`) Prometheus + Loki datasources and updates `ValidatorWallet.metrics_status` / `logs_status` for `status='active'` wallets, per network. The Prometheus query also reads the `version` label from `genlayer_node_info` — **normalised at ingest** in `parse_response` ('v' prefix stripped, capped to the 50-char column; when a node briefly reports two version series right after an upgrade, the higher parseable one wins). Each run writes a `ValidatorWalletObservation` and latches today's `ValidatorWalletStatusSnapshot` rollup (`_record_history`, best-effort — never breaks the live status sync). Observations are retained forever by explicit decision — no pruning in points. Used by the Wall of Shame cron. + - GrafanaValidatorStatusService is also the **source of truth for node versions** (`_sync_node_versions`, best-effort, runs before the active-wallet early return so networks with zero active wallets are still covered): version detection covers **every reporting node on the network regardless of on-chain status** (a quarantined node can still record its upgrade), **except banned wallets**, and only counts versions observed on wallets known to the DB and linked to an operator — the `version` label is self-reported by the node being judged and rewarded, so unknown Prometheus series count for nothing. Only versions that are both semver-valid AND PEP 440-parseable drive comparisons (e.g. `0.6.0-genlayer.1` is excluded — `packaging` can't parse it; in the shame loop an unparseable observed version or an unparseable active target yields `version_status='unknown'`, never a lexicographic fallback verdict). It auto-creates a `TargetNodeVersion` when a STABLE release (bare `x.y.z`, no pre-release/build) higher than the active target is reported by **at least `NODE_VERSION_MIN_OPERATORS_FOR_AUTO_TARGET` (default 1: the first adopter creates the target) distinct operators** (`target_date=now`; an unparseable active target is never blindly superseded; a broadcast notification is emitted via `broadcast_target_node_version`), raises each linked operator's `node_version_` to their highest observed version via a direct `.update()` (**monotonic** — a wallet skipping a scrape cycle can't transiently downgrade the field; genuine downgrades need admin correction), and directly awards an already-approved `node-upgrade` Contribution (`_award_node_upgrade`, early-bonus 4/3/2/1) when a visible operator first reaches the active target. **Removing the node-upgrade multiplier pauses the auto-award** (it is skipped with a warning, not created at 1.0). The per-operator loop is individually fault-isolated — one operator's failure never blocks the rest. Dedup shares the exact `version {v} [{network}]` notes key with the old manual flow so nothing double-awards. A run where a whole datasource comes back empty (no Prometheus series or no Loki counts) still updates live wallet statuses (they self-heal) but **skips the permanent history latch** — a datasource blackout must not shame every validator's recorded day. +- **Commands**: `validators/management/commands/rebuild_daily_snapshots.py` (`--days N`) re-materialises the daily rollup's observability columns from the raw observation log (preserves the on-chain `status`). - **Views**: `validators/views.py` - - `/api/v1/validators/` - Validator profile CRUD for authenticated users - - `/api/v1/validators/me/` - GET/PATCH current validator profile + - `/api/v1/validators/` - Validator profile listing/detail for authenticated users; create/update/delete are staff-only (non-staff mutations get 403) + - `/api/v1/validators/me/` - GET current validator profile (read-only; PATCH removed — node versions are Grafana-sourced, not portal-editable) - `/api/v1/validators/wallets/` - Read-only validator wallet listing - `/api/v1/validators/wallets/sync/` - POST cron-protected background sync trigger with DB-backed lock (on-chain validator sync) - `/api/v1/validators/wallets/sync-grafana/` - POST cron-protected background sync trigger for Grafana observability cross-check (separate SyncLock row `grafana_status_sync` so it can run alongside the on-chain sync) - - `/api/v1/validators/wallets/wall-of-shame/` - Public read-only endpoint listing active validator wallets with `metrics_status` / `logs_status`. SHAME rows sort first. Cached 60s. Optional `?network=asimov|bradbury` filter. + - `/api/v1/validators/wallets/wall-of-shame/` - Public read-only endpoint listing active validator wallets with `metrics_status` / `logs_status`. SHAME rows sort first. Cached 60s. Optional `?network=asimov|bradbury` filter. Each wallet also carries `clean_streak_days` + `clean_streak_broken_by` (consecutive not-shamed days for that node, from `validators/streaks.py` over the daily rollup). The grouped `validators` output adds `network_streaks` — per-operator-per-network any-node-clean streaks (a network-day is clean if ≥1 of the operator's nodes was clean) — plus per-node `clean_streak_days` on each `networks` entry. Streaks start accumulating at deploy (history wasn't recorded before). Days with no Grafana data while the node was active (sync outage, pre-history) are SKIPPED — they neither count nor break, so an infra failure on our side never resets streaks; days spent non-active per the on-chain sync break the streak with `broken_by: ['status']`. - `/api/v1/validators/wallets/grafana/` - Public minimal roster for the Grafana Infinity datasource (`GrafanaValidatorSerializer`). Flat array, one row per wallet across ALL statuses; fields: `network` (Grafana label value e.g. `asimov-phase5`), `node` (on-chain validator address == Prometheus `genlayer_node_info` `node` label, lowercased), `name`, `status`, `operator`, `account`/`account_name` (only for visible operators), `explorer_url`. Excludes observability/shame fields by design. Cached 60s. Optional `?network=asimov|bradbury` filter. ### Partners (Ecosystem Partners) @@ -366,7 +383,7 @@ POST /api/auth/logout/ # Users GET /api/v1/users/ (requires auth) GET /api/v1/users/me/ (requires auth) -PATCH /api/v1/users/me/ (requires auth, only name) +PATCH /api/v1/users/me/ (requires auth; name/description/website/socials — node_version NOT editable, Grafana-sourced) GET /api/v1/users/{address}/ (requires auth) GET /api/v1/users/by-address/{address}/ (requires auth) GET /api/v1/users/validators/ (requires auth) @@ -484,6 +501,8 @@ Located in `.env` file: - `GRAFANA_PROM_DS_UID` - Prometheus datasource UID (default `grafanacloud-prom`) - `GRAFANA_LOKI_DS_UID` - Loki datasource UID (default `grafanacloud-logs`) - `GRAFANA_ASIMOV_LABEL` / `GRAFANA_BRADBURY_LABEL` - Override the `network` label values Grafana queries use per testnet (defaults: `asimov-phase5`, `bradbury-phase1`) +- `NODE_VERSION_SHAME_GRACE_DAYS` - Grace period (days) after a target's `target_date` before a node still behind it is version-shamed, applied globally (default `3`) +- `NODE_VERSION_MIN_OPERATORS_FOR_AUTO_TARGET` - Minimum distinct operators that must be observed running a new stable node version before the Grafana sync auto-creates it as the fleet-wide upgrade target (default `1`: the first adopter creates the target; raise it to require corroboration if version spoofing ever becomes a concern) - `SORSA_API_BASE_URL` - Sorsa API base URL (default `https://api.sorsa.io/v3`); used for Twitter follow verification in social_tasks and X follower counts in overview metrics. - `SORSA_API_KEY` - Sorsa API key sent in the `ApiKey` header (secret, required). Store in AWS SSM (`/tally/{env}/sorsa_api_key`) for production. - Note: the Sorsa request timeout and follow endpoint path are intentionally code constants in `social_tasks/sorsa_client.py`, not env vars. Changing the endpoint requires a code deploy anyway because the response parser lives in the same file. diff --git a/backend/tally/settings.py b/backend/tally/settings.py index e0f0237c..cd87ace3 100644 --- a/backend/tally/settings.py +++ b/backend/tally/settings.py @@ -455,6 +455,21 @@ def get_port_from_argv(): # Uptime lookback window (days) - how many days back to check for active status UPTIME_LOOKBACK_DAYS = int(os.environ.get('UPTIME_LOOKBACK_DAYS', '7') or '7') +# Grace period (days) after a target's target_date before a node still running an +# older version is marked as version "shame". Applied globally at evaluation time +# (validators/version_status.py); changing it affects all targets, past and future. +NODE_VERSION_SHAME_GRACE_DAYS = int(os.environ.get('NODE_VERSION_SHAME_GRACE_DAYS', '3') or '3') + +# Minimum distinct operators that must be observed running a new stable node +# version before the Grafana sync auto-creates it as the fleet-wide upgrade +# target. Default 1: the first adopter creates the target (product decision). +# The version label is self-reported by the nodes being judged and rewarded, +# so raise this to require corroboration if spoofing ever becomes a concern +# (validators/grafana_service.py). +NODE_VERSION_MIN_OPERATORS_FOR_AUTO_TARGET = int( + os.environ.get('NODE_VERSION_MIN_OPERATORS_FOR_AUTO_TARGET', '1') or '1' +) + # AWS Health Check IPs - Allow these IPs to bypass ALLOWED_HOSTS # Required environment variable with AWS internal/metadata service IPs ALLOWED_CIDR_NETS = get_required_env('ALLOWED_CIDR_NETS').split(',') diff --git a/backend/users/serializers.py b/backend/users/serializers.py index 8b440505..e411aa96 100644 --- a/backend/users/serializers.py +++ b/backend/users/serializers.py @@ -305,19 +305,15 @@ def get_total_validators_count(self, obj): class UserProfileUpdateSerializer(serializers.ModelSerializer): """ Serializer for updating user profile. - Allows updating name, profile fields, and validator node versions per network. + Allows updating name and profile fields. Node versions are NOT editable here: + they are sourced from Grafana (see validators/grafana_service.py) and only + displayed on the profile, never written from the portal. """ - node_version_asimov = serializers.CharField( - required=False, allow_blank=True, allow_null=True, source='validator.node_version_asimov' - ) - node_version_bradbury = serializers.CharField( - required=False, allow_blank=True, allow_null=True, source='validator.node_version_bradbury' - ) website = serializers.CharField(required=False, allow_blank=True, max_length=200) class Meta: model = User - fields = ['name', 'node_version_asimov', 'node_version_bradbury', 'description', 'website', + fields = ['name', 'description', 'website', 'telegram_handle', 'linkedin_handle'] def to_internal_value(self, data): @@ -368,28 +364,6 @@ def validate_linkedin_handle(self, value): value = value.split('?')[0] return value - def update(self, instance, validated_data): - # Handle validator data if present - validator_data = validated_data.pop('validator', {}) - - # Update other user fields - for field, value in validated_data.items(): - setattr(instance, field, value) - - instance.save() - - # Update or create validator if any node_version field is provided - if 'node_version_asimov' in validator_data or 'node_version_bradbury' in validator_data: - validator, created = Validator.objects.get_or_create(user=instance) - if 'node_version_asimov' in validator_data: - validator.node_version_asimov = validator_data['node_version_asimov'] - if 'node_version_bradbury' in validator_data: - validator.node_version_bradbury = validator_data['node_version_bradbury'] - validator.save() - - return instance - - class BuilderSerializer(serializers.ModelSerializer): """ Serializer for Builder profile. diff --git a/backend/validators/genlayer_validators_service.py b/backend/validators/genlayer_validators_service.py index 5e74cfa3..5e550d3a 100644 --- a/backend/validators/genlayer_validators_service.py +++ b/backend/validators/genlayer_validators_service.py @@ -627,7 +627,10 @@ def _record_status_snapshots(self): """Record status snapshots for all wallets on this network for today.""" from .models import ValidatorWallet, ValidatorWalletStatusSnapshot - today = timezone.now().date() + # Same day-bucketing as the Grafana rollup (grafana_service._record_history): + # both writers must agree on the (wallet, date) key or each day splits into + # two rows and the streak logic never sees a complete one. + today = timezone.localdate() wallets = ValidatorWallet.objects.filter(network=self.network_key) snapshots = [ diff --git a/backend/validators/grafana_service.py b/backend/validators/grafana_service.py index d821f3ea..8a941ee7 100644 --- a/backend/validators/grafana_service.py +++ b/backend/validators/grafana_service.py @@ -10,15 +10,70 @@ """ import logging +import re import requests from django.conf import settings from django.utils import timezone +from packaging.version import parse as parse_version -from .models import ValidatorWallet +from .models import ( + Validator, + ValidatorWallet, + ValidatorWalletObservation, + ValidatorWalletStatusSnapshot, +) +from .version_status import compute_version_status, safe_parse_version as _safe_parse logger = logging.getLogger(__name__) +# Metrics/logs latch PESSIMISTICALLY (worst-of-day): one shame sample shames the +# whole day. Higher number = worse; _latch keeps the worse of prev/cur. +_METRICS_SEVERITY = {'unknown': 0, 'on': 1, 'shame': 2} + +# Version latches OPTIMISTICALLY (best-of-day): a single up-to-date sample makes +# the day OK — once a node has upgraded that day, an earlier stale reading must +# not shame it. Higher number = better; _latch_version keeps the better of prev/cur. +_VERSION_GOODNESS = {'unknown': 0, 'shame': 1, 'warning': 2, 'on': 3} + +# Node versions from Prometheus arrive as e.g. "v0.5.12"; the profile field forbids +# the leading "v". A stable release is a bare x.y.z (no -prerelease / +build). +_SEMVER_RE = re.compile(r'^\d+\.\d+\.\d+(-[a-zA-Z0-9\-.]+)?(\+[a-zA-Z0-9\-.]+)?$') +_STABLE_RE = re.compile(r'^\d+\.\d+\.\d+$') + +# node_version columns are varchar(50); anything longer is operator-controlled junk. +_VERSION_MAX_LENGTH = 50 + +def min_operators_for_auto_target(): + """ + Distinct operators that must report a stable release before it auto-creates a + target. Default 1: the first adopter creates the target (product decision). + The version label is self-reported by the node being judged and rewarded, so + raise the setting to require corroboration if spoofing ever becomes a concern. + Read at call time (not import) so the setting is tunable without a code deploy. + """ + return getattr(settings, 'NODE_VERSION_MIN_OPERATORS_FOR_AUTO_TARGET', 1) + + +def _latch(prev, cur, severity): + """Return whichever of prev/cur is worse (higher severity) — worst-of-day.""" + return prev if severity.get(prev, 0) >= severity.get(cur, 0) else cur + + +def _latch_version(prev, cur): + """Return the better of prev/cur (higher goodness) — best-of-day for version.""" + return prev if _VERSION_GOODNESS.get(prev, 0) >= _VERSION_GOODNESS.get(cur, 0) else cur + + +def _normalize_version(raw): + """Strip a leading 'v' so a Prometheus 'v0.5.12' matches the profile field format.""" + if not raw: + return '' + v = raw.strip() + if v[:1] in ('v', 'V'): + v = v[1:] + return v + class GrafanaValidatorStatusService: HTTP_TIMEOUT_SECONDS = 10 @@ -27,7 +82,7 @@ class GrafanaValidatorStatusService: def _build_query_body(network_label, prom_ds_uid, loki_ds_uid): """Build the /api/ds/query body matching the dashboard's panel-1 query B.""" prom_expr = ( - 'max by(validator_name, node) ' + 'max by(validator_name, node, version) ' '(genlayer_node_info{job="prometheus.scrape.genlayer_node", ' f'network="{network_label}"}})' ) @@ -61,12 +116,17 @@ def parse_response(body): """ Parse a Grafana /api/ds/query response. - Returns a 3-tuple: + Returns a 4-tuple: - prom_addresses: set of lowercased validator addresses reporting metrics - validator_name_by_address: {address_lower: validator_name} - log_counts_by_name: {validator_name: log_count} - - Matches the JQ in the dashboard panel-1 query B exactly. + - version_by_address: {address_lower: node_version} (from the `version` + label, normalised — 'v' prefix stripped, capped to the column length). + Right after an upgrade Prometheus can return BOTH the old and new + version series for ~5 minutes; the higher parseable version wins (and + a parseable one always beats an unparseable one) so frame order can + neither transiently downgrade a freshly-upgraded node nor pin a + garbage label. """ results = (body or {}).get('results') or {} prom_frames = ((results.get('prom') or {}).get('frames')) or [] @@ -74,6 +134,7 @@ def parse_response(body): prom_addresses = set() validator_name_by_address = {} + version_by_address = {} for frame in prom_frames: schema = frame.get('schema') or {} for field in schema.get('fields') or []: @@ -84,6 +145,21 @@ def parse_response(body): addr = node.lower() prom_addresses.add(addr) validator_name_by_address[addr] = vname + version = _normalize_version(labels.get('version'))[:_VERSION_MAX_LENGTH] + if version: + prev = version_by_address.get(addr) + if prev is None: + version_by_address[addr] = version + else: + prev_parsed = _safe_parse(prev) + cur_parsed = _safe_parse(version) + # A parseable version always beats an unparseable one + # (frame order must not pin a garbage label); between + # two parseable ones the higher wins. + if cur_parsed is not None and ( + prev_parsed is None or cur_parsed > prev_parsed + ): + version_by_address[addr] = version log_counts_by_name = {} for frame in loki_frames: @@ -106,7 +182,7 @@ def parse_response(body): count = 0 log_counts_by_name[vname] = count - return prom_addresses, validator_name_by_address, log_counts_by_name + return prom_addresses, validator_name_by_address, log_counts_by_name, version_by_address @classmethod def sync_network(cls, network): @@ -166,7 +242,15 @@ def sync_network(cls, network): logger.warning("Grafana returned non-JSON for %s: %s", network, exc) return {'network': network, 'error': 'invalid_json'} - prom_addresses, name_by_addr, log_counts = cls.parse_response(data) + prom_addresses, name_by_addr, log_counts, version_by_addr = cls.parse_response(data) + + now = timezone.now() + + # Version detection covers every reporting node on the network (any on-chain + # status), not just the active wallets checked for shame below — so it runs + # before the no-active-wallets early return. It can auto-create a target, so + # it also runs before the shame loop reads the active target. + cls._sync_node_versions(network, version_by_addr, now) wallets = list( ValidatorWallet.objects @@ -174,6 +258,7 @@ def sync_network(cls, network): .only( 'id', 'address', + 'status', 'metrics_status', 'logs_status', 'metrics_shame_started_at', @@ -185,18 +270,30 @@ def sync_network(cls, network): logger.info("Grafana sync for %s: no active wallets to update", network) return {'network': network, 'wallets': 0} - now = timezone.now() + from contributions.node_upgrade.models import TargetNodeVersion + target = TargetNodeVersion.get_active(network=network) + on_count = 0 shame_count = 0 + samples = [] for wallet in wallets: addr_lower = (wallet.address or '').lower() metrics_ok = addr_lower in prom_addresses vname = name_by_addr.get(addr_lower) logs_ok = bool(vname and log_counts.get(vname, 0) > 0) + observed_version = version_by_addr.get(addr_lower) or '' metrics_status = 'on' if metrics_ok else 'shame' logs_status = 'on' if logs_ok else 'shame' + # Only assess version when we actually observed a running version; + # a non-reporting node is already metrics/logs shame. Unparseable + # versions read as 'unknown' inside compute_version_status (exact + # target match excepted) — never a lexicographic verdict. + version_status = ( + compute_version_status(wallet, target, now, node_version=observed_version)['status'] + if observed_version else 'unknown' + ) if metrics_status == 'shame': if wallet.metrics_status != 'shame' or not wallet.metrics_shame_started_at: @@ -214,6 +311,17 @@ def sync_network(cls, network): wallet.logs_status = logs_status wallet.last_grafana_check_at = now + samples.append({ + 'wallet': wallet, + 'onchain_status': wallet.status, + 'metrics_status': metrics_status, + 'logs_status': logs_status, + 'version_status': version_status, + 'node_version': observed_version, + 'metrics_ok': metrics_ok, + 'logs_ok': logs_ok, + }) + if metrics_ok and logs_ok: on_count += 1 else: @@ -230,6 +338,20 @@ def sync_network(cls, network): ], ) + # A blackout of a whole datasource (no Prometheus series at all, or no Loki + # log counts at all) is an infra/config failure (rotated datasource UID, + # token scope, renamed network label), not the entire fleet failing at once. + # The live wallet statuses above self-heal on the next good run, but the + # daily rollup latches worst-of-day permanently — so skip the history write + # for suspect runs instead of shaming every validator's recorded day. + if not prom_addresses or not log_counts: + logger.error( + "Grafana sync for %s returned no %s data; skipping history latch for this run", + network, 'Prometheus' if not prom_addresses else 'Loki', + ) + else: + cls._record_history(samples, now) + logger.info( "Grafana sync for %s: %d on, %d shame (%d total)", network, on_count, shame_count, len(wallets), @@ -241,6 +363,289 @@ def sync_network(cls, network): 'shame': shame_count, } + @classmethod + def _record_history(cls, samples, now): + """ + Persist the raw observations for this sync run and latch them into today's + per-day rollup (worst-of-day). Never raises: history is best-effort and must + not break the live status sync. + """ + if not samples: + return + try: + today = timezone.localdate(now) + + ValidatorWalletObservation.objects.bulk_create([ + ValidatorWalletObservation( + wallet=s['wallet'], + observed_at=now, + onchain_status=s['onchain_status'], + metrics_status=s['metrics_status'], + logs_status=s['logs_status'], + version_status=s['version_status'], + node_version=s['node_version'], + ) + for s in samples + ]) + + wallet_ids = [s['wallet'].id for s in samples] + existing = { + snap.wallet_id: snap + for snap in ValidatorWalletStatusSnapshot.objects.filter( + wallet_id__in=wallet_ids, date=today + ) + } + + rollups = [] + for s in samples: + wallet = s['wallet'] + prev = existing.get(wallet.id) + prev_metrics = prev.metrics_status if prev else 'unknown' + prev_logs = prev.logs_status if prev else 'unknown' + prev_version = prev.version_status if prev else 'unknown' + prev_m_samples = prev.metrics_samples if prev else 0 + prev_l_samples = prev.logs_samples if prev else 0 + + rollups.append(ValidatorWalletStatusSnapshot( + wallet=wallet, + date=today, + status=wallet.status, + metrics_status=_latch(prev_metrics, s['metrics_status'], _METRICS_SEVERITY), + logs_status=_latch(prev_logs, s['logs_status'], _METRICS_SEVERITY), + version_status=_latch_version(prev_version, s['version_status']), + node_version=s['node_version'] or (prev.node_version if prev else ''), + metrics_samples=prev_m_samples + (1 if s['metrics_ok'] else 0), + logs_samples=prev_l_samples + (1 if s['logs_ok'] else 0), + )) + + # On insert, `status` is set from the wallet; on conflict only the + # observability columns update, so the on-chain sync's `status` is preserved. + ValidatorWalletStatusSnapshot.objects.bulk_create( + rollups, + update_conflicts=True, + unique_fields=['wallet', 'date'], + update_fields=[ + 'metrics_status', 'logs_status', 'version_status', + 'node_version', 'metrics_samples', 'logs_samples', + ], + ) + except Exception: # pragma: no cover - defensive + logger.exception("Failed to record validator observation history") + + @classmethod + def _sync_node_versions(cls, network, version_by_address, now): + """ + Grafana is the source of truth for node versions. From the versions observed + this run ({address_lower: version}): + 1. auto-create a TargetNodeVersion when a STABLE release higher than the + active target is reported by at least min_operators_for_auto_target() + distinct linked operators (target_date=now); + 2. raise each linked operator's node_version_ to the highest + valid version across their nodes (bypassing the profile-save path via + a direct .update()); + 3. directly award the node-upgrade contribution when an operator first + reaches the active target. + + Trust boundary: the `version` label is self-reported by the node being + judged and rewarded. Only versions observed on wallets known to this DB and + linked to an operator count for anything, and banned wallets and wallets of + banned users count for nothing. The operator quorum for auto-targets is + configurable (default 1: the first adopter creates the target). + + Any other wallet qualifies for (2)/(3) — a quarantined/inactive node + that still reports can record its upgrade and earn the award. + + Best-effort at two levels: the whole step never raises, and one operator's + failure never blocks version updates or awards for the others. + """ + try: + from contributions.node_upgrade.models import TargetNodeVersion + + # Normalise + keep only field-valid, PEP 440-parseable semvers — anything + # unparseable can't be compared, so it can't drive targets or awards. + normalized = {} + for addr, raw in version_by_address.items(): + v = _normalize_version(raw) + if v and _SEMVER_RE.match(v) and _safe_parse(v) is not None: + normalized[addr.lower()] = v + if not normalized: + return + + # Versions only count when observed on a known, operator-linked, + # non-banned wallet owned by a non-banned user: an unknown Prometheus + # series (test rig, stale/spoofed node) or a suspended account must + # not be able to move targets or earn points. + wallets = ( + ValidatorWallet.objects + .filter( + network=network, + operator__isnull=False, + operator__user__is_banned=False, + ) + .exclude(status='banned') + .select_related('operator', 'operator__user') + ) + by_operator = {} + for wallet in wallets: + version = normalized.get((wallet.address or '').lower()) + if version: + by_operator.setdefault(wallet.operator, []).append(version) + if not by_operator: + return + + # (1) Auto-create target from the highest stable release reported by + # enough distinct operators (min_operators_for_auto_target, default 1: + # the first adopter creates the target; raise the setting to require + # corroboration if spoofing ever becomes a concern). An active target + # with an unparseable version is never superseded blindly. + operators_by_stable = {} + for operator, versions in by_operator.items(): + for v in versions: + if _STABLE_RE.match(v): + operators_by_stable.setdefault(v, set()).add(operator.pk) + min_operators = min_operators_for_auto_target() + corroborated = [ + v for v, ops in operators_by_stable.items() + if len(ops) >= min_operators + ] + if corroborated: + highest = max(corroborated, key=parse_version) + active = TargetNodeVersion.get_active(network=network) + active_parsed = _safe_parse(active.version) if active else None + if not active or ( + active_parsed is not None and parse_version(highest) > active_parsed + ): + target = TargetNodeVersion.objects.create( + version=highest, network=network, target_date=now, is_active=True, + ) + logger.info( + "Auto-created node version target %s for %s from Grafana", + highest, network, + ) + cls._broadcast_auto_target(target) + + active = TargetNodeVersion.get_active(network=network) + active_parsed = _safe_parse(active.version) if active else None + + # (2) + (3): per linked operator, using their highest observed version. + field = f'node_version_{network}' + for operator, versions in by_operator.items(): + try: + highest = max(versions, key=parse_version) + current = getattr(operator, field, None) + current_parsed = _safe_parse(current) if current else None + # (2) Monotonic write: only raise the stored version. A wallet + # that skips one scrape must not transiently downgrade the + # operator's version (and flip their wall-of-shame verdict). + # Genuine downgrades are not reflected until admin-corrected. + if current != highest and ( + current_parsed is None or parse_version(highest) > current_parsed + ): + # Direct update bypasses NodeVersionMixin.save() so we control + # the award path (direct/approved). + Validator.objects.filter(pk=operator.pk).update(**{field: highest}) + setattr(operator, field, highest) + # (3) Award once the operator (visible) reaches the active target. + if ( + active + and active_parsed is not None + and operator.user_id + and getattr(operator.user, 'visible', False) + and not getattr(operator.user, 'is_banned', False) + and parse_version(highest) >= active_parsed + ): + cls._award_node_upgrade(operator, network, active, now) + except Exception: + logger.exception( + "Node version sync failed for operator %s on %s", + operator.pk, network, + ) + except Exception: # pragma: no cover - defensive + logger.exception("Failed to sync node versions for %s", network) + + @staticmethod + def _broadcast_auto_target(target): + """Notify validators of an auto-created target; the grace/bonus clock starts now.""" + try: + from notifications.services import broadcast_target_node_version + + broadcast_target_node_version( + target, + message=( + f"Target node version for {target.get_network_display()} is now " + f"{target.version} (detected on the network). Upgrade within the " + "grace period to avoid the Wall of Shame; upgrading sooner earns " + "more points." + ), + ) + except Exception: + logger.exception( + "Failed to broadcast auto-created target %s", target.pk, + ) + + @staticmethod + def _award_node_upgrade(operator, network, target, now): + """ + Create a direct, already-approved node-upgrade Contribution for a Grafana- + observed upgrade. Dedup shares the exact key with the manual profile flow + (`version {v} [{network}]`), so the two paths never double-award. + """ + from django.db import transaction + + from contributions.models import Contribution, ContributionType, SubmittedContribution + from leaderboard.models import GlobalLeaderboardMultiplier + from .node_version import calculate_early_upgrade_bonus + + contribution_type = ContributionType.objects.filter(slug='node-upgrade').first() + if not contribution_type: + return + + user = operator.user + dedup_key = f"version {target.version} [{network}]" + + points = calculate_early_upgrade_bonus(target.target_date, now) + try: + _, multiplier_value = GlobalLeaderboardMultiplier.get_active_for_type( + contribution_type, at_date=now, + ) + except GlobalLeaderboardMultiplier.DoesNotExist: + # Removing the multiplier is how stewards pause a points program; the + # auto-award must respect that kill switch, not award at 1.0 anyway. + logger.warning( + "Skipping node-upgrade award for %s on %s: no active multiplier", + user.pk, network, + ) + return + + with transaction.atomic(): + # The grafana sync lock already serializes runs; locking the user row + # here closes the residual stale-lock-takeover window so the dedup + # check-then-create below can never double-award (no-op on SQLite). + type(user).objects.select_for_update().get(pk=user.pk) + + already_awarded = Contribution.objects.filter( + user=user, contribution_type=contribution_type, notes__contains=dedup_key, + ).exists() + pending = SubmittedContribution.objects.filter( + user=user, contribution_type=contribution_type, + state__in=['pending', 'accepted'], notes__contains=dedup_key, + ).exists() + if already_awarded or pending: + return + + Contribution( + user=user, + contribution_type=contribution_type, + points=points, + contribution_date=now, + multiplier_at_creation=multiplier_value, + frozen_global_points=round(points * float(multiplier_value)), + notes=( + f"Automatic node upgrade to version {target.version} " + f"[{network}] (detected via Grafana)" + ), + ).save() # post_save signal updates the leaderboard + @classmethod def sync_all_networks(cls): """Sync every configured network. Returns a list of per-network stats.""" diff --git a/backend/validators/management/commands/rebuild_daily_snapshots.py b/backend/validators/management/commands/rebuild_daily_snapshots.py new file mode 100644 index 00000000..15f1bf81 --- /dev/null +++ b/backend/validators/management/commands/rebuild_daily_snapshots.py @@ -0,0 +1,102 @@ +""" +Rebuild the observability columns of ValidatorWalletStatusSnapshot from the raw +ValidatorWalletObservation log (worst-of-day latch + sample counters + latest +node version). Useful after a code change to the rollup logic or to repair a gap. + +The on-chain `status` column is preserved on existing rows (only set on insert from +the latest observation's on-chain status), so this never disturbs the on-chain sync. +""" +from datetime import datetime, time, timedelta + +from django.core.management.base import BaseCommand +from django.utils import timezone + +from validators.grafana_service import ( + _METRICS_SEVERITY, + _latch, + _latch_version, +) +from validators.models import ValidatorWalletObservation, ValidatorWalletStatusSnapshot + + +class Command(BaseCommand): + help = 'Rebuild daily validator status rollups from the raw observation log' + + def add_arguments(self, parser): + parser.add_argument( + '--days', type=int, default=None, + help='Only rebuild the last N days of rollups (default: all observations)' + ) + + def handle(self, *args, **options): + days = options.get('days') + observations = ValidatorWalletObservation.objects.all() + if days is not None: + # Snap the cutoff to a local-day boundary: a mid-day cutoff would rebuild + # the oldest day in range from only part of its observations and overwrite + # that day's correctly-latched rollup with wrong values. + cutoff_date = timezone.localdate() - timedelta(days=days) + cutoff = timezone.make_aware( + datetime.combine(cutoff_date, time.min), + timezone.get_current_timezone(), + ) + observations = observations.filter(observed_at__gte=cutoff) + observations = observations.order_by('wallet_id', 'observed_at') + + acc = {} + obs_count = 0 + for obs in observations.iterator(): + obs_count += 1 + key = (obs.wallet_id, timezone.localdate(obs.observed_at)) + agg = acc.get(key) + if agg is None: + agg = { + 'metrics_status': 'unknown', + 'logs_status': 'unknown', + 'version_status': 'unknown', + 'metrics_samples': 0, + 'logs_samples': 0, + 'node_version': '', + 'status': obs.onchain_status, + } + acc[key] = agg + agg['metrics_status'] = _latch(agg['metrics_status'], obs.metrics_status, _METRICS_SEVERITY) + agg['logs_status'] = _latch(agg['logs_status'], obs.logs_status, _METRICS_SEVERITY) + agg['version_status'] = _latch_version(agg['version_status'], obs.version_status) + if obs.metrics_status == 'on': + agg['metrics_samples'] += 1 + if obs.logs_status == 'on': + agg['logs_samples'] += 1 + if obs.node_version: + agg['node_version'] = obs.node_version # ascending order → latest wins + agg['status'] = obs.onchain_status + + rollups = [ + ValidatorWalletStatusSnapshot( + wallet_id=wallet_id, + date=date, + status=agg['status'], + metrics_status=agg['metrics_status'], + logs_status=agg['logs_status'], + version_status=agg['version_status'], + node_version=agg['node_version'], + metrics_samples=agg['metrics_samples'], + logs_samples=agg['logs_samples'], + ) + for (wallet_id, date), agg in acc.items() + ] + + if rollups: + ValidatorWalletStatusSnapshot.objects.bulk_create( + rollups, + update_conflicts=True, + unique_fields=['wallet', 'date'], + update_fields=[ + 'metrics_status', 'logs_status', 'version_status', + 'node_version', 'metrics_samples', 'logs_samples', + ], + ) + + self.stdout.write(self.style.SUCCESS( + f'Rebuilt {len(rollups)} daily rollup(s) from {obs_count} observation(s).' + )) diff --git a/backend/validators/migrations/0015_validatorwalletstatussnapshot_logs_samples_and_more.py b/backend/validators/migrations/0015_validatorwalletstatussnapshot_logs_samples_and_more.py new file mode 100644 index 00000000..5ce8d360 --- /dev/null +++ b/backend/validators/migrations/0015_validatorwalletstatussnapshot_logs_samples_and_more.py @@ -0,0 +1,63 @@ +# Generated by Django 6.0.6 on 2026-07-02 09:34 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('validators', '0014_validatorwallet_assets_under_management_usd_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='validatorwalletstatussnapshot', + name='logs_samples', + field=models.PositiveIntegerField(default=0, help_text='Observations that day where the node was reporting logs'), + ), + migrations.AddField( + model_name='validatorwalletstatussnapshot', + name='logs_status', + field=models.CharField(choices=[('on', 'On'), ('shame', 'Shame'), ('unknown', 'Unknown')], default='unknown', help_text='Worst-of-day logs verdict: shame at any observation shames the day', max_length=10), + ), + migrations.AddField( + model_name='validatorwalletstatussnapshot', + name='metrics_samples', + field=models.PositiveIntegerField(default=0, help_text='Observations that day where the node was reporting metrics'), + ), + migrations.AddField( + model_name='validatorwalletstatussnapshot', + name='metrics_status', + field=models.CharField(choices=[('on', 'On'), ('shame', 'Shame'), ('unknown', 'Unknown')], default='unknown', help_text='Worst-of-day metrics verdict: shame at any observation shames the day', max_length=10), + ), + migrations.AddField( + model_name='validatorwalletstatussnapshot', + name='node_version', + field=models.CharField(blank=True, help_text='Last node version observed by the Grafana sync that day', max_length=50), + ), + migrations.AddField( + model_name='validatorwalletstatussnapshot', + name='version_status', + field=models.CharField(choices=[('on', 'On'), ('warning', 'Warning'), ('shame', 'Shame'), ('unknown', 'Unknown')], default='unknown', help_text='Best-of-day version verdict vs the active target (an upgrade clears the day)', max_length=10), + ), + migrations.CreateModel( + name='ValidatorWalletObservation', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ('observed_at', models.DateTimeField(db_index=True, help_text='When the Grafana sync recorded this observation')), + ('onchain_status', models.CharField(choices=[('active', 'Active'), ('quarantined', 'Quarantined'), ('banned', 'Banned'), ('inactive', 'Inactive')], help_text="Wallet's on-chain status at observation time", max_length=20)), + ('metrics_status', models.CharField(choices=[('on', 'On'), ('shame', 'Shame'), ('unknown', 'Unknown')], help_text='Whether the node was reporting metrics at this observation', max_length=10)), + ('logs_status', models.CharField(choices=[('on', 'On'), ('shame', 'Shame'), ('unknown', 'Unknown')], help_text='Whether the node was reporting logs at this observation', max_length=10)), + ('version_status', models.CharField(choices=[('on', 'On'), ('warning', 'Warning'), ('shame', 'Shame'), ('unknown', 'Unknown')], default='unknown', help_text='Version verdict vs the active target at this observation', max_length=10)), + ('node_version', models.CharField(blank=True, help_text='Node version reported to Prometheus at this observation', max_length=50)), + ('wallet', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='observations', to='validators.validatorwallet')), + ], + options={ + 'ordering': ['-observed_at'], + 'indexes': [models.Index(fields=['wallet', 'observed_at'], name='validators__wallet__8b42c1_idx')], + }, + ), + ] diff --git a/backend/validators/models.py b/backend/validators/models.py index 33cd1c97..26347765 100644 --- a/backend/validators/models.py +++ b/backend/validators/models.py @@ -116,7 +116,20 @@ class ValidatorWalletStatusSnapshot(BaseModel): Daily snapshot of a validator wallet's status. Used for uptime lookback logic to determine if a wallet was active within a rolling window of days. + + The observability columns (metrics/logs/version status + sample counters + + node_version) are a per-day rollup of ValidatorWalletObservation rows, latched + worst-of-day: a dimension is 'shame' if it was shame at ANY observation that day. + The on-chain `status` column is owned by the on-chain sync; the Grafana sync only + writes the observability columns (so the two syncs never clobber each other). """ + VERSION_STATUS_CHOICES = [ + ('on', 'On'), + ('warning', 'Warning'), + ('shame', 'Shame'), + ('unknown', 'Unknown'), + ] + wallet = models.ForeignKey( ValidatorWallet, on_delete=models.CASCADE, @@ -125,6 +138,32 @@ class ValidatorWalletStatusSnapshot(BaseModel): date = models.DateField(db_index=True) status = models.CharField(max_length=20, choices=ValidatorWallet.STATUS_CHOICES) + # Latched worst-of-day observability verdict (from ValidatorWalletObservation). + metrics_status = models.CharField( + max_length=10, choices=ValidatorWallet.GRAFANA_STATUS_CHOICES, default='unknown', + help_text="Worst-of-day metrics verdict: shame at any observation shames the day" + ) + logs_status = models.CharField( + max_length=10, choices=ValidatorWallet.GRAFANA_STATUS_CHOICES, default='unknown', + help_text="Worst-of-day logs verdict: shame at any observation shames the day" + ) + version_status = models.CharField( + max_length=10, choices=VERSION_STATUS_CHOICES, default='unknown', + help_text="Best-of-day version verdict vs the active target (an upgrade clears the day)" + ) + node_version = models.CharField( + max_length=50, blank=True, + help_text="Last node version observed by the Grafana sync that day" + ) + metrics_samples = models.PositiveIntegerField( + default=0, + help_text="Observations that day where the node was reporting metrics" + ) + logs_samples = models.PositiveIntegerField( + default=0, + help_text="Observations that day where the node was reporting logs" + ) + class Meta: ordering = ['-date'] constraints = [ @@ -135,6 +174,55 @@ def __str__(self): return f"{self.wallet.address[:10]}... {self.date} ({self.status})" +class ValidatorWalletObservation(BaseModel): + """ + Append-only log of a single Grafana-sync observation for a validator wallet. + + One row is written per active wallet per Grafana sync run, capturing the + point-in-time observability verdict plus the on-chain status and the node + version reported to Prometheus. This is the raw source of truth from which the + daily ValidatorWalletStatusSnapshot rollup is materialised (and rebuildable). + """ + wallet = models.ForeignKey( + ValidatorWallet, + on_delete=models.CASCADE, + related_name='observations' + ) + observed_at = models.DateTimeField( + db_index=True, + help_text="When the Grafana sync recorded this observation" + ) + onchain_status = models.CharField( + max_length=20, choices=ValidatorWallet.STATUS_CHOICES, + help_text="Wallet's on-chain status at observation time" + ) + metrics_status = models.CharField( + max_length=10, choices=ValidatorWallet.GRAFANA_STATUS_CHOICES, + help_text="Whether the node was reporting metrics at this observation" + ) + logs_status = models.CharField( + max_length=10, choices=ValidatorWallet.GRAFANA_STATUS_CHOICES, + help_text="Whether the node was reporting logs at this observation" + ) + version_status = models.CharField( + max_length=10, choices=ValidatorWalletStatusSnapshot.VERSION_STATUS_CHOICES, default='unknown', + help_text="Version verdict vs the active target at this observation" + ) + node_version = models.CharField( + max_length=50, blank=True, + help_text="Node version reported to Prometheus at this observation" + ) + + class Meta: + ordering = ['-observed_at'] + indexes = [ + models.Index(fields=['wallet', 'observed_at']), + ] + + def __str__(self): + return f"{self.wallet.address[:10]}... @ {self.observed_at:%Y-%m-%d %H:%M} ({self.metrics_status}/{self.logs_status})" + + class SyncLock(models.Model): """ Database-backed advisory lock for cross-process sync coordination. diff --git a/backend/validators/node_version.py b/backend/validators/node_version.py index 1c484b4f..538959fd 100644 --- a/backend/validators/node_version.py +++ b/backend/validators/node_version.py @@ -2,7 +2,6 @@ Node version tracking and points calculation for validators. """ from django.db import models -from django.utils import timezone from django.core.exceptions import ValidationError from packaging import version import re @@ -89,84 +88,6 @@ def version_matches_or_higher(self, target_version, node_version=None): nv = node_version if node_version is not None else self.node_version_asimov return self._compare_versions(nv, target_version) - def save(self, *args, **kwargs): - """ - Override save to create SubmittedContribution with calculated points based on early upgrade bonus. - Checks each network independently. - """ - # Store old versions before saving - old_version_asimov = None - old_version_bradbury = None - is_new = not self.pk - if self.pk: - try: - old_obj = self.__class__.objects.get(pk=self.pk) - old_version_asimov = old_obj.node_version_asimov - old_version_bradbury = old_obj.node_version_bradbury - except self.__class__.DoesNotExist: - pass - - # Save the object first - super().save(*args, **kwargs) - - # Only create submissions if user is visible - if not self.user.visible: - return - - # Check each network for version changes - networks = [ - ('asimov', self.node_version_asimov, old_version_asimov), - ('bradbury', self.node_version_bradbury, old_version_bradbury), - ] - - for network, new_version, old_version in networks: - version_changed = (is_new and new_version) or (old_version != new_version and new_version) - if not version_changed: - continue - - self._create_upgrade_submission(network, new_version) - - def _create_upgrade_submission(self, network, new_version): - """Create a SubmittedContribution for a node upgrade on a given network.""" - from contributions.node_upgrade.models import TargetNodeVersion - from contributions.models import SubmittedContribution, ContributionType, Contribution - - target = TargetNodeVersion.get_active(network=network) - if not target or not self._compare_versions(new_version, target.version): - return - - contribution_type = ContributionType.objects.filter(slug='node-upgrade').first() - if not contribution_type: - return - - # Include network in dedup key to allow separate submissions per network - dedup_key = f"version {target.version} [{network}]" - - existing_contribution = Contribution.objects.filter( - user=self.user, - contribution_type=contribution_type, - notes__contains=dedup_key - ).exists() - - existing_submission = SubmittedContribution.objects.filter( - user=self.user, - contribution_type=contribution_type, - state__in=['pending', 'accepted'], - notes__contains=dedup_key - ).exists() - - if not existing_contribution and not existing_submission: - points = calculate_early_upgrade_bonus(target.target_date, timezone.now()) - - SubmittedContribution.objects.create( - user=self.user, - contribution_type=contribution_type, - proposed_points=points, - contribution_date=timezone.now(), - notes=f"Automatic submission for node upgrade to version {target.version} [{network}]", - state='pending' - ) - def calculate_early_upgrade_bonus(target_availability_date, upgrade_date): """ diff --git a/backend/validators/serializers.py b/backend/validators/serializers.py index 41e7760d..6462a927 100644 --- a/backend/validators/serializers.py +++ b/backend/validators/serializers.py @@ -123,6 +123,8 @@ class WallOfShameSerializer(serializers.ModelSerializer): """ operator_user = serializers.SerializerMethodField() explorer_url = serializers.SerializerMethodField() + clean_streak_days = serializers.SerializerMethodField() + clean_streak_broken_by = serializers.SerializerMethodField() class Meta: model = ValidatorWallet @@ -142,9 +144,23 @@ class Meta: 'metrics_shame_started_at', 'logs_shame_started_at', 'version_shame_started_at', + 'clean_streak_days', + 'clean_streak_broken_by', ] read_only_fields = fields + def _streak(self, obj): + # Per-node streak, injected by the view via context to avoid N+1 queries. + return (self.context.get('streaks_by_wallet_id') or {}).get(obj.id) + + def get_clean_streak_days(self, obj): + streak = self._streak(obj) + return streak['days'] if streak else None + + def get_clean_streak_broken_by(self, obj): + streak = self._streak(obj) + return streak['broken_by'] if streak else [] + def get_operator_user(self, obj): if obj.operator and obj.operator.user and obj.operator.user.visible: user = obj.operator.user diff --git a/backend/validators/streaks.py b/backend/validators/streaks.py new file mode 100644 index 00000000..a464ea75 --- /dev/null +++ b/backend/validators/streaks.py @@ -0,0 +1,139 @@ +""" +Consecutive "not shamed" uptime streaks, derived from the daily +ValidatorWalletStatusSnapshot rollup. + +A day counts as CLEAN for a wallet when the node was on-chain active, reported +both metrics and logs at least once, and was not shamed on any dimension: + + status == 'active' + AND metrics_samples >= 1 AND logs_samples >= 1 + AND metrics_status != 'shame' + AND logs_status != 'shame' + AND version_status != 'shame' + +Operator roll-up is "any-node-clean": an operator is not shamed on a network for +a day if AT LEAST ONE of their wallets on that network was clean that day. + +History only starts at deploy (past days were never recorded), so a streak's +`since` marks the first counted clean day, not necessarily the true start. +""" +from datetime import timedelta + +from django.utils import timezone + +from .models import ValidatorWalletStatusSnapshot + +DEFAULT_MAX_DAYS = 180 + +# Fields the streak logic needs; keeps the prefetch lean. +_SNAP_FIELDS = ( + 'wallet_id', 'date', 'status', + 'metrics_status', 'logs_status', 'version_status', + 'metrics_samples', 'logs_samples', +) + + +def _is_clean(snap): + return bool( + snap is not None + and snap.status == 'active' + and snap.metrics_samples >= 1 + and snap.logs_samples >= 1 + and snap.metrics_status != 'shame' + and snap.logs_status != 'shame' + and snap.version_status != 'shame' + ) + + +def _has_observation(snap): + """Whether the Grafana sync recorded anything for this day (vs not-yet-synced).""" + return bool( + snap is not None + and ( + snap.metrics_samples > 0 + or snap.logs_samples > 0 + or snap.metrics_status != 'unknown' + or snap.logs_status != 'unknown' + or snap.version_status != 'unknown' + ) + ) + + +def _shame_dims(snap): + """ + Dimensions that made a day non-clean, for explaining a broken streak. + + The on-chain `status` is trusted whenever a snapshot row exists (the on-chain + sync owns that column); the observability dimensions are only attributed when + the Grafana sync actually observed the day — we can't claim a node was shamed + on a day we have no data for. + """ + if snap is None: + return [] + dims = [] + if snap.status != 'active': + dims.append('status') + if _has_observation(snap): + if snap.metrics_status == 'shame' or snap.metrics_samples < 1: + dims.append('metrics') + if snap.logs_status == 'shame' or snap.logs_samples < 1: + dims.append('logs') + if snap.version_status == 'shame': + dims.append('version') + return dims + + +def load_snapshot_index(wallet_ids, now, max_days=DEFAULT_MAX_DAYS): + """One query → {(wallet_id, date): snapshot} for all given wallets in the window.""" + if not wallet_ids: + return {} + cutoff = timezone.localdate(now) - timedelta(days=max_days) + rows = ( + ValidatorWalletStatusSnapshot.objects + .filter(wallet_id__in=wallet_ids, date__gte=cutoff) + .only(*_SNAP_FIELDS) + ) + return {(row.wallet_id, row.date): row for row in rows} + + +def clean_streak(wallet_ids, now, index, max_days=DEFAULT_MAX_DAYS): + """ + Consecutive clean days ending today, over `wallet_ids` with any-node-clean + semantics (one wallet id = per-node; several = per-operator-per-network). + + Returns {'days': int, 'broken_by': [dims], 'since': date | None}. + + A day with no Grafana data while the node was active (partial today, a sync + outage, or pre-history) is SKIPPED — it neither counts nor breaks, so an infra + failure on our side can't reset every validator's streak. A day the node spent + non-active (per the on-chain sync) or was observed shamed breaks the streak. + """ + wallet_ids = list(wallet_ids) + today = timezone.localdate(now) + + def dims_on(snaps): + dims = [] + for s in snaps: + for d in _shame_dims(s): + if d not in dims: + dims.append(d) + return dims + + days = 0 + since = None + broken_by = [] + for offset in range(max_days): + day = today - timedelta(days=offset) + snaps = [index.get((wid, day)) for wid in wallet_ids] + if any(_is_clean(s) for s in snaps): + days += 1 + since = day + continue + non_active = any(s is not None and s.status != 'active' for s in snaps) + observed = any(_has_observation(s) for s in snaps) + if non_active or observed: + broken_by = dims_on(snaps) + break + # No data for this day: skip it, don't break. + + return {'days': days, 'broken_by': broken_by, 'since': since} diff --git a/backend/validators/tests/test_api.py b/backend/validators/tests/test_api.py index 01f2c261..473c586e 100644 --- a/backend/validators/tests/test_api.py +++ b/backend/validators/tests/test_api.py @@ -39,21 +39,12 @@ def test_get_validator_profile_not_exists(self): response = self.client.get('/api/v1/validators/me/') self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - def test_patch_validator_profile_does_not_create_missing_profile(self): - """PATCH /me must not create a validator profile.""" + def test_patch_validator_profile_is_not_allowed(self): + """/me is read-only: node versions come from Grafana, not the portal.""" response = self.client.patch('/api/v1/validators/me/', { 'node_version_asimov': '1.2.3' }) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - self.assertFalse(Validator.objects.filter(user=self.user).exists()) - - def test_patch_validator_profile_rejects_unsupported_keys(self): - """PATCH /me rejects arbitrary keys and does not create a profile.""" - response = self.client.patch('/api/v1/validators/me/', { - 'unsupported_field': 'x', - }) - - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED) self.assertFalse(Validator.objects.filter(user=self.user).exists()) def test_regular_user_cannot_mutate_arbitrary_validator_profile(self): @@ -81,20 +72,17 @@ def test_regular_user_cannot_create_validator_profile(self): self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) self.assertFalse(Validator.objects.filter(user=self.user).exists()) - def test_update_validator_profile(self): - """Test updating existing validator profile""" - # Create profile first + def test_patch_does_not_change_node_version(self): + """A PATCH to /me cannot overwrite the Grafana-sourced node version.""" Validator.objects.create(user=self.user, node_version_asimov='1.0.0') - # Update it response = self.client.patch('/api/v1/validators/me/', { 'node_version_asimov': '2.0.0' }) - self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED) - # Verify update validator = Validator.objects.get(user=self.user) - self.assertEqual(validator.node_version_asimov, '2.0.0') + self.assertEqual(validator.node_version_asimov, '1.0.0') def test_get_validator_profile_exists(self): """Test getting existing validator profile""" @@ -105,19 +93,6 @@ def test_get_validator_profile_exists(self): self.assertIn('node_version_asimov', response.data) self.assertEqual(response.data['node_version_asimov'], '1.2.3') - def test_update_bradbury_version(self): - """Test updating bradbury version""" - Validator.objects.create(user=self.user, node_version_asimov='1.0.0') - - response = self.client.patch('/api/v1/validators/me/', { - 'node_version_bradbury': '2.0.0' - }) - self.assertEqual(response.status_code, status.HTTP_200_OK) - - validator = Validator.objects.get(user=self.user) - self.assertEqual(validator.node_version_bradbury, '2.0.0') - self.assertEqual(validator.node_version_asimov, '1.0.0') - @override_settings(CRON_SYNC_TOKEN='test-cron-token') class ValidatorSyncLockTestCase(APITestCase): diff --git a/backend/validators/tests/test_grafana_service.py b/backend/validators/tests/test_grafana_service.py index 522c01ce..ce61fabe 100644 --- a/backend/validators/tests/test_grafana_service.py +++ b/backend/validators/tests/test_grafana_service.py @@ -16,10 +16,59 @@ from contributions.node_upgrade.models import TargetNodeVersion from users.models import User -from validators.models import Validator, ValidatorWallet +from validators.models import ( + Validator, + ValidatorWallet, + ValidatorWalletObservation, + ValidatorWalletStatusSnapshot, +) from validators.grafana_service import GrafanaValidatorStatusService +EMPTY_GRAFANA_RESPONSE = {"results": {"prom": {"frames": []}, "loki": {"frames": []}}} + + +# Only bob reporting (metrics + logs); alice absent. A partial-but-plausible run, +# unlike EMPTY_GRAFANA_RESPONSE which reads as a datasource blackout. +BOB_ONLY_GRAFANA_RESPONSE = { + "results": { + "prom": { + "frames": [ + { + "schema": { + "fields": [ + { + "name": "Value", + "labels": { + "validator_name": "bob-validator", + "node": "0xBBBBbbbbBBBBbbbbBBBBbbbbBBBBbbbbBBBBbbbb", + "network": "bradbury-phase1", + "version": "v0.5.11", + }, + } + ] + }, + "data": {"values": [[1700000000000], [1]]}, + }, + ] + }, + "loki": { + "frames": [ + { + "schema": { + "fields": [ + {"name": "Time", "labels": None}, + {"name": "Value", "labels": {"validator_name": "bob-validator"}}, + ] + }, + "data": {"values": [[1700000000000], [7]]}, + } + ] + }, + } +} + + # Two known validators reporting, one not. GRAFANA_RESPONSE_FIXTURE = { "results": { @@ -34,6 +83,7 @@ "validator_name": "alice-validator", "node": "0xAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaa", "network": "bradbury-phase1", + "version": "v0.5.12", }, } ] @@ -49,6 +99,7 @@ "validator_name": "bob-validator", "node": "0xBBBBbbbbBBBBbbbbBBBBbbbbBBBBbbbbBBBBbbbb", "network": "bradbury-phase1", + "version": "v0.5.11", }, } ] @@ -86,8 +137,8 @@ class GrafanaParseResponseTests(TestCase): """Lock down the parser against the dashboard's expected response shape.""" def test_parses_prometheus_addresses_lowercased(self): - prom_addrs, _name_by_addr, _log_counts = GrafanaValidatorStatusService.parse_response( - GRAFANA_RESPONSE_FIXTURE + prom_addrs, _name_by_addr, _log_counts, _versions = ( + GrafanaValidatorStatusService.parse_response(GRAFANA_RESPONSE_FIXTURE) ) # All addresses should be lowercased. self.assertIn('0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', prom_addrs) @@ -95,7 +146,7 @@ def test_parses_prometheus_addresses_lowercased(self): self.assertEqual(len(prom_addrs), 2) def test_maps_address_to_validator_name(self): - _, name_by_addr, _ = GrafanaValidatorStatusService.parse_response( + _, name_by_addr, _, _ = GrafanaValidatorStatusService.parse_response( GRAFANA_RESPONSE_FIXTURE ) self.assertEqual( @@ -107,8 +158,82 @@ def test_maps_address_to_validator_name(self): 'bob-validator', ) + def test_parses_node_version_label_normalized(self): + # Versions are normalised at ingest ('v' prefix stripped) so every + # persisted copy (observations, rollups, profile) shares one format. + _, _, _, version_by_addr = GrafanaValidatorStatusService.parse_response( + GRAFANA_RESPONSE_FIXTURE + ) + self.assertEqual( + version_by_addr['0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], '0.5.12', + ) + self.assertEqual( + version_by_addr['0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'], '0.5.11', + ) + + def test_duplicate_version_series_keeps_the_higher_version(self): + # Right after an upgrade Prometheus returns BOTH the old and new version + # series for ~5 minutes; the stale frame must not win regardless of order. + def frame(version): + return { + "schema": {"fields": [{"name": "Value", "labels": { + "validator_name": "alice-validator", + "node": "0xAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaa", + "version": version, + }}]}, + "data": {"values": [[1700000000000], [1]]}, + } + + for order in (["v0.6.0", "v0.5.11"], ["v0.5.11", "v0.6.0"]): + body = {"results": {"prom": {"frames": [frame(v) for v in order]}, + "loki": {"frames": []}}} + _, _, _, version_by_addr = GrafanaValidatorStatusService.parse_response(body) + self.assertEqual( + version_by_addr['0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], + '0.6.0', + f"order {order} let the stale version win", + ) + + def test_parseable_version_beats_unparseable_regardless_of_order(self): + # '0.6.0-genlayer.1' is valid semver but PEP 440-unparseable; frame order + # must not let it pin the address and suppress the comparable version. + def frame(version): + return { + "schema": {"fields": [{"name": "Value", "labels": { + "validator_name": "alice-validator", + "node": "0xAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaa", + "version": version, + }}]}, + "data": {"values": [[1700000000000], [1]]}, + } + + for order in (["v0.6.0-genlayer.1", "v0.6.5"], ["v0.6.5", "v0.6.0-genlayer.1"]): + body = {"results": {"prom": {"frames": [frame(v) for v in order]}, + "loki": {"frames": []}}} + _, _, _, version_by_addr = GrafanaValidatorStatusService.parse_response(body) + self.assertEqual( + version_by_addr['0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], + '0.6.5', + f"order {order} let the unparseable version win", + ) + + def test_overlong_version_label_is_truncated_not_fatal(self): + long_version = 'v0.5.12-rc.1+build.' + 'a' * 60 + body = {"results": {"prom": {"frames": [{ + "schema": {"fields": [{"name": "Value", "labels": { + "validator_name": "alice-validator", + "node": "0xAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaa", + "version": long_version, + }}]}, + "data": {"values": [[1700000000000], [1]]}, + }]}, "loki": {"frames": []}}} + _, _, _, version_by_addr = GrafanaValidatorStatusService.parse_response(body) + stored = version_by_addr['0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] + # Capped to the node_version column length so bulk_create never DataErrors. + self.assertLessEqual(len(stored), 50) + def test_loki_log_count_pulled_from_values_index_1(self): - _, _, log_counts = GrafanaValidatorStatusService.parse_response( + _, _, log_counts, _ = GrafanaValidatorStatusService.parse_response( GRAFANA_RESPONSE_FIXTURE ) self.assertEqual(log_counts['alice-validator'], 42) @@ -118,11 +243,11 @@ def test_loki_log_count_pulled_from_values_index_1(self): def test_empty_response_does_not_crash(self): self.assertEqual( GrafanaValidatorStatusService.parse_response({}), - (set(), {}, {}), + (set(), {}, {}, {}), ) self.assertEqual( GrafanaValidatorStatusService.parse_response(None), - (set(), {}, {}), + (set(), {}, {}, {}), ) def test_zero_log_count_is_recorded(self): @@ -144,7 +269,7 @@ def test_zero_log_count_is_recorded(self): }, } } - _, _, log_counts = GrafanaValidatorStatusService.parse_response(body) + _, _, log_counts, _ = GrafanaValidatorStatusService.parse_response(body) self.assertEqual(log_counts['quiet-validator'], 0) @@ -255,6 +380,224 @@ def test_sync_handles_request_exception_without_corrupting_data(self, mock_post) self.assertEqual(self.alice.metrics_status, 'on') self.assertEqual(self.alice.logs_status, 'on') + def test_failure_records_no_observations(self): + import requests as _requests + with patch('validators.grafana_service.requests.post', + side_effect=_requests.RequestException('boom')): + with self.settings( + GRAFANA_BASE_URL='https://grafana.test', + GRAFANA_API_TOKEN='test-token', + GRAFANA_NETWORK_LABELS={'bradbury': 'bradbury-phase1'}, + ): + GrafanaValidatorStatusService.sync_network('bradbury') + self.assertEqual(ValidatorWalletObservation.objects.count(), 0) + + +class GrafanaHistoryTests(TestCase): + """Observation log + latched daily rollup captured by the Grafana sync.""" + + def setUp(self): + self.alice = ValidatorWallet.objects.create( + address='0xAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaa', + network='bradbury', + operator_address='0x1111111111111111111111111111111111111111', + status='active', moniker='alice', + ) + self.bob = ValidatorWallet.objects.create( + address='0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', + network='bradbury', + operator_address='0x2222222222222222222222222222222222222222', + status='active', moniker='bob', + ) + self.carol = ValidatorWallet.objects.create( + address='0xcccccccccccccccccccccccccccccccccccccccc', + network='bradbury', + operator_address='0x3333333333333333333333333333333333333333', + status='active', moniker='carol', + ) + + def _sync(self, fixture): + mock_response = MagicMock() + mock_response.ok = True + mock_response.json.return_value = fixture + with patch('validators.grafana_service.requests.post', return_value=mock_response): + with self.settings( + GRAFANA_BASE_URL='https://grafana.test', + GRAFANA_API_TOKEN='test-token', + GRAFANA_NETWORK_LABELS={'bradbury': 'bradbury-phase1', 'asimov': 'asimov-phase5'}, + GRAFANA_PROM_DS_UID='grafanacloud-prom', + GRAFANA_LOKI_DS_UID='grafanacloud-logs', + ): + return GrafanaValidatorStatusService.sync_network('bradbury') + + def test_first_sync_writes_observations_and_rollup(self): + self._sync(GRAFANA_RESPONSE_FIXTURE) + + # One observation per active wallet. + self.assertEqual(ValidatorWalletObservation.objects.count(), 3) + + alice_snap = ValidatorWalletStatusSnapshot.objects.get(wallet=self.alice) + self.assertEqual(alice_snap.metrics_status, 'on') + self.assertEqual(alice_snap.logs_status, 'on') + self.assertEqual(alice_snap.metrics_samples, 1) + self.assertEqual(alice_snap.logs_samples, 1) + self.assertEqual(alice_snap.node_version, '0.5.12') # normalised at ingest + + # Bob reports metrics but no logs. + bob_snap = ValidatorWalletStatusSnapshot.objects.get(wallet=self.bob) + self.assertEqual(bob_snap.metrics_status, 'on') + self.assertEqual(bob_snap.logs_status, 'shame') + self.assertEqual(bob_snap.logs_samples, 0) + + def test_shame_latches_for_the_whole_day(self): + # Run 1: alice healthy. Run 2 (same day): others report, alice doesn't. + self._sync(GRAFANA_RESPONSE_FIXTURE) + self._sync(BOB_ONLY_GRAFANA_RESPONSE) + + self.assertEqual(ValidatorWalletObservation.objects.filter(wallet=self.alice).count(), 2) + + alice_snap = ValidatorWalletStatusSnapshot.objects.get(wallet=self.alice) + # Once shamed at any sample, the whole day is shamed and does not revert. + self.assertEqual(alice_snap.metrics_status, 'shame') + self.assertEqual(alice_snap.logs_status, 'shame') + # The healthy sample is still counted. + self.assertEqual(alice_snap.metrics_samples, 1) + self.assertEqual(alice_snap.logs_samples, 1) + # Latest observed version is retained even after the node stops reporting. + self.assertEqual(alice_snap.node_version, '0.5.12') + + def test_datasource_blackout_does_not_latch_history(self): + """A run where a whole datasource comes back empty (rotated UID, token + scope, renamed label) is an infra failure: the permanent daily rollup must + not shame every validator's day, even though live statuses still flip + (they self-heal on the next good run).""" + self._sync(GRAFANA_RESPONSE_FIXTURE) + self._sync(EMPTY_GRAFANA_RESPONSE) + + # The blackout run recorded no observations. + self.assertEqual(ValidatorWalletObservation.objects.filter(wallet=self.alice).count(), 1) + + alice_snap = ValidatorWalletStatusSnapshot.objects.get(wallet=self.alice) + self.assertEqual(alice_snap.metrics_status, 'on') + self.assertEqual(alice_snap.logs_status, 'on') + + # Live status still reflects the (suspect) run — pre-existing behavior. + self.alice.refresh_from_db() + self.assertEqual(self.alice.metrics_status, 'shame') + + def test_unparseable_observed_version_records_unknown_status(self): + """A garbage `version` label must not string-compare its way into an 'on' + verdict (or shame a valid one) — it reads as version-unknown.""" + TargetNodeVersion.objects.create( + version='0.6.0', network='bradbury', + target_date=timezone.now() - timedelta(days=30), is_active=True, + ) + fixture = { + "results": { + "prom": { + "frames": [ + { + "schema": { + "fields": [ + { + "name": "Value", + "labels": { + "validator_name": "alice-validator", + "node": "0xAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaaAAAAaaaa", + "network": "bradbury-phase1", + "version": "zzz-not-a-version", + }, + } + ] + }, + "data": {"values": [[1700000000000], [1]]}, + }, + ] + }, + "loki": GRAFANA_RESPONSE_FIXTURE["results"]["loki"], + } + } + self._sync(fixture) + + obs = ValidatorWalletObservation.objects.get(wallet=self.alice) + self.assertEqual(obs.version_status, 'unknown') + + def test_no_active_wallets_still_syncs_versions(self): + """Version detection covers every reporting node regardless of on-chain + status — including when a network has zero active wallets.""" + user = User.objects.create( + email='alice@x.com', address='0x9999999999999999999999999999999999999999', + visible=True, + ) + operator = Validator.objects.create(user=user) + ValidatorWallet.objects.filter(pk=self.alice.pk).update( + status='quarantined', operator=operator, + ) + ValidatorWallet.objects.exclude(pk=self.alice.pk).update(status='inactive') + + stats = self._sync(GRAFANA_RESPONSE_FIXTURE) + + self.assertEqual(stats['wallets'], 0) + operator.refresh_from_db() + self.assertEqual(operator.node_version_bradbury, '0.5.12') + + def _obs(self, wallet, *, metrics='on', logs='on', version='on'): + return { + 'wallet': wallet, 'onchain_status': 'active', + 'metrics_status': metrics, 'logs_status': logs, 'version_status': version, + 'node_version': '0.6.0', + 'metrics_ok': metrics == 'on', 'logs_ok': logs == 'on', + } + + def test_version_latches_optimistically_within_a_day(self): + now = timezone.now() + # Same day: version shame first (running old build), then on (upgraded). + GrafanaValidatorStatusService._record_history( + [self._obs(self.alice, version='shame')], now) + GrafanaValidatorStatusService._record_history( + [self._obs(self.alice, version='on')], now) + + snap = ValidatorWalletStatusSnapshot.objects.get(wallet=self.alice) + # One up-to-date sample makes the whole day OK for version. + self.assertEqual(snap.version_status, 'on') + + def test_metrics_latches_pessimistically_within_a_day(self): + now = timezone.now() + # Same day: metrics on first, then shame — one shame shames the day. + GrafanaValidatorStatusService._record_history( + [self._obs(self.alice, metrics='on')], now) + GrafanaValidatorStatusService._record_history( + [self._obs(self.alice, metrics='shame')], now) + + snap = ValidatorWalletStatusSnapshot.objects.get(wallet=self.alice) + self.assertEqual(snap.metrics_status, 'shame') + + def test_rebuild_days_cutoff_covers_full_days(self): + """--days N must snap to a day boundary: a mid-day cutoff would rebuild the + oldest day from partial observations and un-latch a real shame.""" + from datetime import datetime as dt, time as t + from django.core.management import call_command + + yesterday = timezone.localdate() - timedelta(days=1) + tz = timezone.get_current_timezone() + early = timezone.make_aware(dt.combine(yesterday, t(3, 0)), tz) + late = timezone.make_aware(dt.combine(yesterday, t(12, 0)), tz) + + # 03:00 shame + 12:00 clean → the day's rollup must latch to shame. + for observed_at, metrics in ((early, 'shame'), (late, 'on')): + ValidatorWalletObservation.objects.create( + wallet=self.alice, observed_at=observed_at, + onchain_status='active', metrics_status=metrics, logs_status='on', + ) + + call_command('rebuild_daily_snapshots', '--days', '1') + + snap = ValidatorWalletStatusSnapshot.objects.get(wallet=self.alice, date=yesterday) + # A now()-based cutoff (mid-day) would have dropped the 03:00 observation + # and rebuilt the day as clean. + self.assertEqual(snap.metrics_status, 'shame') + self.assertEqual(snap.logs_samples, 2) + class WallOfShameEndpointTests(TestCase): def setUp(self): @@ -347,9 +690,32 @@ def test_response_includes_observability_fields(self): first = response.data['wallets'][0] for field in ('metrics_status', 'logs_status', 'last_grafana_check_at', 'address', 'network', 'operator_address', 'moniker', - 'operator_user', 'explorer_url'): + 'operator_user', 'explorer_url', + 'clean_streak_days', 'clean_streak_broken_by'): self.assertIn(field, first) + def test_clean_streak_days_reflects_history(self): + from datetime import timedelta as _td + from django.utils import timezone as _tz + from validators.models import ValidatorWalletStatusSnapshot + today = _tz.localdate(_tz.now()) + for i in range(3): + ValidatorWalletStatusSnapshot.objects.create( + wallet=self.ok, date=today - _td(days=i), status='active', + metrics_status='on', logs_status='on', version_status='on', + metrics_samples=2, logs_samples=2, + ) + response = self.client.get('/api/v1/validators/wallets/wall-of-shame/') + ok_row = next(w for w in response.data['wallets'] if w['id'] == self.ok.id) + self.assertEqual(ok_row['clean_streak_days'], 3) + # Grouped output exposes the per-network operator streak too. + group = next( + g for g in response.data['validators'] + if any(n['wallet_id'] == self.ok.id for n in g['networks']) + ) + self.assertIn('network_streaks', group) + self.assertEqual(group['network_streaks'][self.ok.network]['clean_streak_days'], 3) + def test_stats_counts(self): response = self.client.get('/api/v1/validators/wallets/wall-of-shame/') stats = response.data['stats'] diff --git a/backend/validators/tests/test_node_version_sync.py b/backend/validators/tests/test_node_version_sync.py new file mode 100644 index 00000000..5642bdb5 --- /dev/null +++ b/backend/validators/tests/test_node_version_sync.py @@ -0,0 +1,338 @@ +""" +Tests for Grafana-driven node version detection: auto-target creation, +node_version write-back, and direct (auto-approved) node-upgrade awards. +""" +from datetime import timedelta + +from django.contrib.auth import get_user_model +from django.test import TestCase, override_settings +from django.utils import timezone + +from contributions.models import Category, Contribution, ContributionType +from contributions.node_upgrade.models import TargetNodeVersion +from leaderboard.models import GlobalLeaderboardMultiplier +from validators.grafana_service import GrafanaValidatorStatusService +from validators.models import Validator, ValidatorWallet + +User = get_user_model() + + +class NodeVersionSyncTests(TestCase): + def setUp(self): + self.now = timezone.now() + self.category, _ = Category.objects.get_or_create(name='Validator', slug='validator') + self.ctype, _ = ContributionType.objects.get_or_create( + slug='node-upgrade', + defaults={'name': 'Node Upgrade', 'category': self.category, + 'min_points': 1, 'max_points': 100, 'is_submittable': True}, + ) + GlobalLeaderboardMultiplier.objects.create( + contribution_type=self.ctype, + multiplier_value=1.0, + valid_from=self.now - timedelta(days=30), + ) + + def _operator(self, email, address, visible=True, is_banned=False): + user = User.objects.create( + email=email, + address=address, + visible=visible, + is_banned=is_banned, + ) + return Validator.objects.create(user=user) + + def _wallet(self, address, operator, network='asimov', status='active'): + return ValidatorWallet.objects.create( + address=address, network=network, operator=operator, + operator_address=operator.user.address, status=status, + ) + + def _sync(self, network, version_by_wallet): + # _sync_node_versions takes {address_lower: version} + GrafanaValidatorStatusService._sync_node_versions( + network, + {w.address.lower(): v for w, v in version_by_wallet.items()}, + self.now, + ) + + def test_first_stable_release_creates_active_target(self): + """Default quorum is 1: the first adopter's stable release becomes the target.""" + op = self._operator('a@x.com', '0x' + 'a' * 40) + w = self._wallet('0x' + '1' * 40, op) + self._sync('asimov', {w: 'v0.6.0'}) + + target = TargetNodeVersion.get_active(network='asimov') + self.assertIsNotNone(target) + self.assertEqual(target.version, '0.6.0') # 'v' stripped + + @override_settings(NODE_VERSION_MIN_OPERATORS_FOR_AUTO_TARGET=2) + def test_auto_target_operator_quorum_is_configurable(self): + """Raising the quorum setting requires corroboration: one operator alone + can no longer pin a fleet-wide target, but a second operator confirms it.""" + op = self._operator('solo@x.com', '0x' + 'a' * 39 + '2') + w1 = self._wallet('0x' + '1' * 39 + '2', op) + w2 = self._wallet('0x' + '1' * 39 + '3', op) # same operator, two nodes + self._sync('asimov', {w1: 'v9.9.9', w2: 'v9.9.9'}) + + self.assertIsNone(TargetNodeVersion.get_active(network='asimov')) + # ...but their own running version is still recorded. + op.refresh_from_db() + self.assertEqual(op.node_version_asimov, '9.9.9') + + op2 = self._operator('cfg@x.com', '0x' + 'f' * 39 + '2') + w3 = self._wallet('0x' + 'f' * 39 + '3', op2) + self._sync('asimov', {w1: 'v9.9.9', w3: 'v9.9.9'}) + + target = TargetNodeVersion.get_active(network='asimov') + self.assertIsNotNone(target) + self.assertEqual(target.version, '9.9.9') + + def test_unknown_addresses_cannot_create_target(self): + """Prometheus series for addresses we don't know (test rigs, spoofed or + stale nodes) must not move targets or earn anything.""" + GrafanaValidatorStatusService._sync_node_versions( + 'asimov', + {'0x' + 'd' * 39 + '1': '9.9.9', '0x' + 'd' * 39 + '2': '9.9.9'}, + self.now, + ) + self.assertIsNone(TargetNodeVersion.get_active(network='asimov')) + + def test_prerelease_does_not_create_target(self): + op = self._operator('b@x.com', '0x' + 'b' * 40) + w = self._wallet('0x' + '2' * 40, op) + self._sync('asimov', {w: 'v0.6.0-rc1'}) + + self.assertIsNone(TargetNodeVersion.get_active(network='asimov')) + # ...but the running (pre-release) version is still recorded on the profile. + op.refresh_from_db() + self.assertEqual(op.node_version_asimov, '0.6.0-rc1') + + def test_build_metadata_version_is_ignored_for_target(self): + op = self._operator('c@x.com', '0x' + 'c' * 40) + w = self._wallet('0x' + '3' * 40, op) + self._sync('asimov', {w: 'v0.6.0+dev'}) + self.assertIsNone(TargetNodeVersion.get_active(network='asimov')) + + def test_node_version_is_max_across_operator_nodes(self): + op = self._operator('d@x.com', '0x' + 'd' * 40) + w1 = self._wallet('0x' + '4' * 40, op) + w2 = self._wallet('0x' + '5' * 40, op) + self._sync('asimov', {w1: 'v0.5.10', w2: 'v0.6.0'}) + + op.refresh_from_db() + self.assertEqual(op.node_version_asimov, '0.6.0') + + def test_reaching_target_awards_direct_contribution_once(self): + TargetNodeVersion.objects.create( + version='0.6.0', network='asimov', target_date=self.now, is_active=True, + ) + op = self._operator('e@x.com', '0x' + 'e' * 40) + w = self._wallet('0x' + '6' * 40, op) + self._sync('asimov', {w: 'v0.6.0'}) + + awards = Contribution.objects.filter( + user=op.user, contribution_type=self.ctype, + notes__contains='version 0.6.0 [asimov]', + ) + self.assertEqual(awards.count(), 1) + # target_date == now → same-day bonus of 4. + self.assertEqual(awards.first().points, 4) + + # Running the sync again must not double-award. + self._sync('asimov', {w: 'v0.6.0'}) + self.assertEqual(awards.count(), 1) + + def test_award_skipped_when_no_multiplier(self): + """Removing the multiplier is the stewards' kill switch for a points + program; the auto-award must respect it instead of awarding at 1.0.""" + GlobalLeaderboardMultiplier.objects.all().delete() + TargetNodeVersion.objects.create( + version='0.6.0', network='asimov', target_date=self.now, is_active=True, + ) + op = self._operator('nm@x.com', '0x' + 'b' * 39 + '2') + w = self._wallet('0x' + 'b' * 39 + '3', op) + self._sync('asimov', {w: 'v0.6.0'}) + + self.assertFalse( + Contribution.objects.filter(user=op.user, contribution_type=self.ctype).exists() + ) + # The version itself is still recorded. + op.refresh_from_db() + self.assertEqual(op.node_version_asimov, '0.6.0') + + def test_banned_wallet_gets_no_version_or_award(self): + TargetNodeVersion.objects.create( + version='0.6.0', network='asimov', target_date=self.now, is_active=True, + ) + op = self._operator('ban@x.com', '0x' + 'c' * 39 + '2') + w = self._wallet('0x' + 'c' * 39 + '3', op, status='banned') + self._sync('asimov', {w: 'v0.6.0'}) + + op.refresh_from_db() + self.assertIsNone(op.node_version_asimov) + self.assertFalse( + Contribution.objects.filter(user=op.user, contribution_type=self.ctype).exists() + ) + + def test_banned_operator_user_gets_no_target_version_or_award(self): + op = self._operator('suspended@x.com', '0x' + 'c' * 39 + '4', is_banned=True) + w = self._wallet('0x' + 'c' * 39 + '5', op) + + self._sync('asimov', {w: 'v0.6.0'}) + + self.assertIsNone(TargetNodeVersion.get_active(network='asimov')) + op.refresh_from_db() + self.assertIsNone(op.node_version_asimov) + + TargetNodeVersion.objects.create( + version='0.6.0', network='asimov', target_date=self.now, is_active=True, + ) + self._sync('asimov', {w: 'v0.6.0'}) + + op.refresh_from_db() + self.assertIsNone(op.node_version_asimov) + self.assertFalse( + Contribution.objects.filter(user=op.user, contribution_type=self.ctype).exists() + ) + + def test_missing_scrape_does_not_downgrade_version(self): + """When one of an operator's wallets skips a scrape cycle, the recorded + version must not regress to the remaining (lower) wallet's version.""" + op = self._operator('mono@x.com', '0x' + 'd' * 39 + '3') + op.node_version_asimov = '0.6.0' + op.save() + w_low = self._wallet('0x' + 'd' * 39 + '4', op) + # Only the 0.5.0 wallet reports this cycle. + self._sync('asimov', {w_low: 'v0.5.0'}) + + op.refresh_from_db() + self.assertEqual(op.node_version_asimov, '0.6.0') + + def test_auto_created_target_broadcasts_notification(self): + from notifications.models import Notification + + op1 = self._operator('n1@x.com', '0x' + 'e' * 39 + '2') + w1 = self._wallet('0x' + 'e' * 39 + '3', op1) + op2 = self._operator('n2@x.com', '0x' + 'e' * 39 + '4') + w2 = self._wallet('0x' + 'e' * 39 + '5', op2) + self._sync('asimov', {w1: 'v0.6.0', w2: 'v0.6.0'}) + + self.assertTrue( + Notification.objects.filter(title__contains='0.6.0').exists() + ) + + def test_invisible_operator_gets_version_but_no_award(self): + op = self._operator('f@x.com', '0x' + 'f' * 40, visible=False) + w = self._wallet('0x' + '7' * 40, op) + self._sync('asimov', {w: 'v0.6.0'}) + + op.refresh_from_db() + self.assertEqual(op.node_version_asimov, '0.6.0') + self.assertFalse( + Contribution.objects.filter(user=op.user, contribution_type=self.ctype).exists() + ) + + def test_higher_stable_version_supersedes_active_target(self): + TargetNodeVersion.objects.create( + version='0.5.0', network='asimov', + target_date=self.now - timedelta(days=10), is_active=True, + ) + op1 = self._operator('g@x.com', '0x' + '9' * 40) + w1 = self._wallet('0x' + '8' * 40, op1) + op2 = self._operator('g2@x.com', '0x' + '9' * 39 + '1') + w2 = self._wallet('0x' + '8' * 39 + '1', op2) + self._sync('asimov', {w1: 'v0.6.0', w2: 'v0.6.0'}) + + active = TargetNodeVersion.get_active(network='asimov') + self.assertEqual(active.version, '0.6.0') + self.assertEqual( + TargetNodeVersion.objects.filter(network='asimov', is_active=True).count(), 1 + ) + + def test_no_new_target_when_fleet_matches_active(self): + TargetNodeVersion.objects.create( + version='0.6.0', network='asimov', + target_date=self.now - timedelta(days=1), is_active=True, + ) + op = self._operator('h@x.com', '0x' + '7' * 40) + w = self._wallet('0x' + 'aa'[0] * 40, op) + self._sync('asimov', {w: 'v0.6.0'}) + self.assertEqual(TargetNodeVersion.objects.filter(network='asimov').count(), 1) + + def test_quarantined_wallet_still_gets_version_and_award(self): + """Version detection covers every reporting node, not just active wallets — + a quarantined validator that upgrades must still record it and be awarded.""" + TargetNodeVersion.objects.create( + version='0.6.0', network='asimov', target_date=self.now, is_active=True, + ) + op = self._operator('q@x.com', '0x' + '1' * 39 + '2') + w = self._wallet('0x' + '2' * 39 + '3', op, status='quarantined') + self._sync('asimov', {w: 'v0.6.0'}) + + op.refresh_from_db() + self.assertEqual(op.node_version_asimov, '0.6.0') + self.assertTrue( + Contribution.objects.filter( + user=op.user, contribution_type=self.ctype, + notes__contains='version 0.6.0 [asimov]', + ).exists() + ) + + def test_pep440_invalid_semver_does_not_abort_other_operators(self): + """'0.6.0-genlayer.1' is valid semver but packaging.parse raises + InvalidVersion on it; one such node must not abort the whole network.""" + TargetNodeVersion.objects.create( + version='0.6.0', network='asimov', target_date=self.now, is_active=True, + ) + bad_op = self._operator('bad@x.com', '0x' + '3' * 39 + '4') + bad_w = self._wallet('0x' + '4' * 39 + '5', bad_op) + good_op = self._operator('good@x.com', '0x' + '5' * 39 + '6') + good_w = self._wallet('0x' + '6' * 39 + '7', good_op) + + self._sync('asimov', {bad_w: 'v0.6.0-genlayer.1', good_w: 'v0.6.0'}) + + # The good operator's version + award went through untouched. + good_op.refresh_from_db() + self.assertEqual(good_op.node_version_asimov, '0.6.0') + self.assertTrue( + Contribution.objects.filter(user=good_op.user, contribution_type=self.ctype).exists() + ) + # The unparseable version is excluded (can't be compared), not crashed on. + bad_op.refresh_from_db() + self.assertIsNone(bad_op.node_version_asimov) + + def test_one_failing_operator_does_not_block_the_rest(self): + """A per-operator failure (e.g. Contribution validation) must not stop + version updates and awards for the other operators in the same run.""" + from unittest.mock import patch + + op1 = self._operator('p1@x.com', '0x' + '7' * 39 + '8') + w1 = self._wallet('0x' + '8' * 39 + '9', op1) + op2 = self._operator('p2@x.com', '0x' + '9' * 39 + 'a') + w2 = self._wallet('0x' + 'a' * 39 + 'b', op2) + + real_award = GrafanaValidatorStatusService._award_node_upgrade + calls = [] + + def flaky_award(operator, network, target, now): + calls.append(operator.pk) + if len(calls) == 1: + raise RuntimeError('boom') + return real_award(operator, network, target, now) + + with patch.object( + GrafanaValidatorStatusService, '_award_node_upgrade', + side_effect=flaky_award, + ): + self._sync('asimov', {w1: 'v0.6.0', w2: 'v0.6.0'}) + + # Both operators were attempted; exactly one award landed despite the failure. + self.assertEqual(len(calls), 2) + self.assertEqual( + Contribution.objects.filter(contribution_type=self.ctype).count(), 1 + ) + # Both got their version recorded (the update happens before the award). + op1.refresh_from_db() + op2.refresh_from_db() + self.assertEqual(op1.node_version_asimov, '0.6.0') + self.assertEqual(op2.node_version_asimov, '0.6.0') diff --git a/backend/validators/tests/test_node_version_tracking.py b/backend/validators/tests/test_node_version_tracking.py index c4b37f7d..b8448457 100644 --- a/backend/validators/tests/test_node_version_tracking.py +++ b/backend/validators/tests/test_node_version_tracking.py @@ -9,7 +9,7 @@ from django.forms import modelform_factory from validators.models import Validator from validators.node_version import calculate_early_upgrade_bonus -from contributions.models import ContributionType, SubmittedContribution, Category +from contributions.models import ContributionType, Category from contributions.node_upgrade.models import TargetNodeVersion from leaderboard.models import GlobalLeaderboardMultiplier @@ -84,71 +84,6 @@ def test_early_upgrade_bonus(self): self.assertEqual(calculate_early_upgrade_bonus(now, now + timedelta(days=10)), 1) - def test_submission_creation_with_calculated_points(self): - """Test that updating node version creates a submission with calculated points.""" - # Update validator to target version - self.validator.node_version_asimov = '2.0.0' - self.validator.save() - - # Check that a submission was created - submissions = SubmittedContribution.objects.filter(user=self.user) - self.assertEqual(submissions.count(), 1) - - submission = submissions.first() - self.assertEqual(submission.contribution_type, self.contribution_type) - self.assertEqual(submission.state, 'pending') - - # Check suggested points (should be 4 for same-day upgrade) - self.assertEqual(submission.proposed_points, 4) - - # Check that no evidence was created for automatic submission - self.assertEqual(submission.evidence_items.count(), 0) - - def test_submission_with_delayed_upgrade(self): - """Test points calculation with delayed upgrade.""" - # Update target to have been available 3 days ago - self.target.target_date = timezone.now() - timedelta(days=3) - self.target.save() - - # Update to target version - self.validator.node_version_asimov = '2.0.0' - self.validator.save() - - # Check submission - submission = SubmittedContribution.objects.filter(user=self.user).first() - self.assertIsNotNone(submission) - - # Should have 1 point (minimum for 3+ days delay) - self.assertEqual(submission.proposed_points, 1) - - def test_no_duplicate_submissions(self): - """Test that duplicate submissions are not created for the same target.""" - # First upgrade - self.validator.node_version_asimov = '2.0.0' - self.validator.save() - - # Try to create another submission for the same target - self.validator.node_version_asimov = '2.0.1' # Minor version change - self.validator.save() - - # Should still only have one submission - submissions = SubmittedContribution.objects.filter(user=self.user) - self.assertEqual(submissions.count(), 1) - - def test_invisible_user_no_submission(self): - """Test that invisible users don't get submissions.""" - # Make user invisible - self.user.visible = False - self.user.save() - - # Update validator version - self.validator.node_version_asimov = '2.0.0' - self.validator.save() - - # Should not create submission - submissions = SubmittedContribution.objects.filter(user=self.user) - self.assertEqual(submissions.count(), 0) - def test_per_network_targets(self): """Test that targets are per-network and deactivation is scoped.""" # Create a bradbury target @@ -169,30 +104,6 @@ def test_per_network_targets(self): self.assertEqual(asimov_active.version, '2.0.0') self.assertEqual(bradbury_active.version, '3.0.0') - def test_bradbury_version_creates_submission(self): - """Test that updating bradbury version creates a separate submission.""" - # Create bradbury target - bradbury_target = TargetNodeVersion.objects.create( - version='3.0.0', - network='bradbury', - target_date=timezone.now() + timedelta(days=7), - is_active=True - ) - - # Update both versions - self.validator.node_version_asimov = '2.0.0' - self.validator.node_version_bradbury = '3.0.0' - self.validator.save() - - # Should have two submissions (one per network) - submissions = SubmittedContribution.objects.filter(user=self.user) - self.assertEqual(submissions.count(), 2) - - # Check dedup keys in notes - notes = list(submissions.values_list('notes', flat=True)) - self.assertTrue(any('[asimov]' in n for n in notes)) - self.assertTrue(any('[bradbury]' in n for n in notes)) - def test_deactivate_only_same_network(self): """Test that activating a new target only deactivates same network.""" # Create a bradbury target diff --git a/backend/validators/tests/test_streaks.py b/backend/validators/tests/test_streaks.py new file mode 100644 index 00000000..90f7eca5 --- /dev/null +++ b/backend/validators/tests/test_streaks.py @@ -0,0 +1,156 @@ +""" +Tests for consecutive "not shamed" uptime streaks derived from the daily rollup. +""" +from datetime import timedelta + +from django.test import TestCase +from django.utils import timezone + +from validators import streaks as streaks_lib +from validators.models import ValidatorWallet, ValidatorWalletStatusSnapshot + + +def _snap(wallet, day, *, status='active', metrics='on', logs='on', + version='on', m_samples=3, l_samples=3): + return ValidatorWalletStatusSnapshot.objects.create( + wallet=wallet, date=day, status=status, + metrics_status=metrics, logs_status=logs, version_status=version, + metrics_samples=m_samples, logs_samples=l_samples, + ) + + +class CleanStreakTests(TestCase): + def setUp(self): + self.now = timezone.now() + self.today = timezone.localdate(self.now) + self.wallet = ValidatorWallet.objects.create( + address='0xaaaa000000000000000000000000000000000000', + network='asimov', + operator_address='0x1111111111111111111111111111111111111111', + status='active', moniker='alice', + ) + + def _streak(self, wallet_ids=None): + wallet_ids = wallet_ids or [self.wallet.id] + index = streaks_lib.load_snapshot_index(wallet_ids, self.now) + return streaks_lib.clean_streak(wallet_ids, self.now, index) + + def test_counts_consecutive_clean_days_including_today(self): + for i in range(5): + _snap(self.wallet, self.today - timedelta(days=i)) + result = self._streak() + self.assertEqual(result['days'], 5) + self.assertEqual(result['broken_by'], []) + self.assertEqual(result['since'], self.today - timedelta(days=4)) + + def test_shame_day_breaks_streak_and_reports_dim(self): + _snap(self.wallet, self.today) # clean + _snap(self.wallet, self.today - timedelta(days=1)) # clean + _snap(self.wallet, self.today - timedelta(days=2), logs='shame', l_samples=0) # break + _snap(self.wallet, self.today - timedelta(days=3)) # (behind the break) + result = self._streak() + self.assertEqual(result['days'], 2) + self.assertIn('logs', result['broken_by']) + + def test_zero_sample_day_is_not_clean(self): + # A day the node never reported metrics counts as shamed. + _snap(self.wallet, self.today, metrics='shame', m_samples=0) + result = self._streak() + self.assertEqual(result['days'], 0) + self.assertIn('metrics', result['broken_by']) + + def test_unsynced_today_does_not_break_streak(self): + # No snapshot for today yet; yesterday+ are clean. + _snap(self.wallet, self.today - timedelta(days=1)) + _snap(self.wallet, self.today - timedelta(days=2)) + result = self._streak() + self.assertEqual(result['days'], 2) + + def test_missing_past_day_is_skipped_not_broken(self): + """A day with no data (sync outage on our side) must not reset the fleet's + streaks: it is skipped — it neither counts nor breaks.""" + _snap(self.wallet, self.today) + # gap at today-1 (no snapshot at all) + _snap(self.wallet, self.today - timedelta(days=2)) + result = self._streak() + self.assertEqual(result['days'], 2) + self.assertEqual(result['broken_by'], []) + + def test_non_active_day_breaks_streak_with_status(self): + """A day spent quarantined breaks the streak even without Grafana data — + the on-chain sync owns the status column and its verdict is trusted.""" + _snap(self.wallet, self.today) + _snap(self.wallet, self.today - timedelta(days=1), + status='quarantined', metrics='unknown', logs='unknown', + version='unknown', m_samples=0, l_samples=0) + result = self._streak() + self.assertEqual(result['days'], 1) + self.assertEqual(result['broken_by'], ['status']) + + def test_version_shame_breaks_streak(self): + _snap(self.wallet, self.today, version='shame') + result = self._streak() + self.assertEqual(result['days'], 0) + self.assertIn('version', result['broken_by']) + + def test_version_only_observation_counts_as_observed(self): + """A rollup carrying only a version verdict (metrics/logs unknown, zero + samples) is still an observed day: it must break the streak, not be + skipped as not-yet-synced.""" + _snap(self.wallet, self.today, metrics='unknown', logs='unknown', + version='shame', m_samples=0, l_samples=0) + result = self._streak() + self.assertEqual(result['days'], 0) + self.assertIn('version', result['broken_by']) + + +class OperatorRollupTests(TestCase): + """Any-node-clean: an operator is clean on a day if >=1 of their nodes was.""" + + def setUp(self): + self.now = timezone.now() + self.today = timezone.localdate(self.now) + self.node_a = ValidatorWallet.objects.create( + address='0xaaaa000000000000000000000000000000000000', + network='asimov', + operator_address='0x1111111111111111111111111111111111111111', + status='active', moniker='a', + ) + self.node_b = ValidatorWallet.objects.create( + address='0xbbbb000000000000000000000000000000000000', + network='asimov', + operator_address='0x1111111111111111111111111111111111111111', + status='active', moniker='b', + ) + + def test_any_node_clean_keeps_operator_streak_alive(self): + # Each node is shamed on alternating days, but at least one is clean every day. + for i in range(4): + a_shamed = (i % 2 == 0) + _snap(self.node_a, self.today - timedelta(days=i), + logs='shame' if a_shamed else 'on', + l_samples=0 if a_shamed else 3) + _snap(self.node_b, self.today - timedelta(days=i), + logs='on' if a_shamed else 'shame', + l_samples=3 if a_shamed else 0) + + ids = [self.node_a.id, self.node_b.id] + index = streaks_lib.load_snapshot_index(ids, self.now) + operator = streaks_lib.clean_streak(ids, self.now, index) + self.assertEqual(operator['days'], 4) + + # But each individual node's streak is broken early. + node_a = streaks_lib.clean_streak([self.node_a.id], self.now, index) + self.assertLess(node_a['days'], 4) + + def test_operator_shamed_only_when_all_nodes_shamed(self): + # Day 0: both clean. Day 1: both shamed → operator break here. + _snap(self.node_a, self.today) + _snap(self.node_b, self.today) + _snap(self.node_a, self.today - timedelta(days=1), logs='shame', l_samples=0) + _snap(self.node_b, self.today - timedelta(days=1), logs='shame', l_samples=0) + ids = [self.node_a.id, self.node_b.id] + index = streaks_lib.load_snapshot_index(ids, self.now) + operator = streaks_lib.clean_streak(ids, self.now, index) + self.assertEqual(operator['days'], 1) + self.assertIn('logs', operator['broken_by']) diff --git a/backend/validators/tests/test_version_status.py b/backend/validators/tests/test_version_status.py new file mode 100644 index 00000000..afbfb2bb --- /dev/null +++ b/backend/validators/tests/test_version_status.py @@ -0,0 +1,100 @@ +from datetime import timedelta + +from django.test import TestCase, override_settings +from django.utils import timezone + +from contributions.node_upgrade.models import TargetNodeVersion +from users.models import User +from validators.models import Validator, ValidatorWallet +from validators.version_status import compute_version_status + + +class ComputeVersionStatusTests(TestCase): + def _wallet(self, node_version='1.0.0', network='asimov'): + user = User.objects.create_user( + email=f'v-{node_version}-{network}@example.com', + password='password', + name='Op', + address=f'0xop{node_version}{network}'.ljust(42, '0')[:42], + ) + validator = Validator.objects.create( + user=user, **{f'node_version_{network}': node_version} + ) + return ValidatorWallet.objects.create( + address=f'0xw{node_version}{network}'.ljust(42, '0')[:42], + network=network, + operator=validator, + operator_address=user.address, + status='active', + ) + + def test_no_target_is_unknown(self): + ctx = compute_version_status(self._wallet(), None, timezone.now()) + self.assertEqual(ctx['status'], 'unknown') + + def test_up_to_date_is_on(self): + now = timezone.now() + target = TargetNodeVersion.objects.create( + version='1.0.0', network='asimov', + target_date=now - timedelta(days=10), is_active=True, + ) + ctx = compute_version_status(self._wallet('1.2.0'), target, now) + self.assertEqual(ctx['status'], 'on') + + def test_within_grace_is_warning(self): + now = timezone.now() + target = TargetNodeVersion.objects.create( + version='2.0.0', network='asimov', + target_date=now - timedelta(days=2), is_active=True, + ) + ctx = compute_version_status(self._wallet('1.0.0'), target, now) + self.assertEqual(ctx['status'], 'warning') + self.assertEqual(ctx['grace_days_remaining'], 1) # default grace 3, 2 elapsed + + def test_after_grace_is_shame_at_target_plus_grace(self): + now = timezone.now() + target = TargetNodeVersion.objects.create( + version='2.0.0', network='asimov', + target_date=now - timedelta(days=5), is_active=True, + ) + ctx = compute_version_status(self._wallet('1.0.0'), target, now) + self.assertEqual(ctx['status'], 'shame') + self.assertEqual( + ctx['shame_started_at'].replace(microsecond=0), + (target.target_date + timedelta(days=3)).replace(microsecond=0), + ) + + def test_unparseable_version_is_unknown_not_string_compared(self): + """Legacy/vendor version strings packaging can't parse must never get a + lexicographic verdict ('zzz' >= '1.0.0' is True; '0.10.0' < '0.9.0').""" + now = timezone.now() + target = TargetNodeVersion.objects.create( + version='1.0.0', network='asimov', + target_date=now - timedelta(days=10), is_active=True, + ) + ctx = compute_version_status(self._wallet('zzz-not-a-version'), target, now) + self.assertEqual(ctx['status'], 'unknown') + + def test_unparseable_version_matching_target_exactly_is_on(self): + """Vendor-format fleets ('0.6.0-genlayer.1') where the steward-set target + uses the same format still verdict 'on' on exact equality.""" + now = timezone.now() + target = TargetNodeVersion.objects.create( + version='0.6.0-genlayer.1', network='asimov', + target_date=now - timedelta(days=10), is_active=True, + ) + ctx = compute_version_status(self._wallet('0.6.0-genlayer.1'), target, now) + self.assertEqual(ctx['status'], 'on') + + @override_settings(NODE_VERSION_SHAME_GRACE_DAYS=7) + def test_grace_period_is_configurable_via_setting(self): + now = timezone.now() + # 5 days elapsed would be shame under the default 3, but the setting grants 7. + target = TargetNodeVersion.objects.create( + version='2.0.0', network='asimov', + target_date=now - timedelta(days=5), is_active=True, + ) + ctx = compute_version_status(self._wallet('1.0.0'), target, now) + self.assertEqual(ctx['status'], 'warning') + self.assertEqual(ctx['grace_days'], 7) + self.assertEqual(ctx['grace_days_remaining'], 2) diff --git a/backend/validators/version_status.py b/backend/validators/version_status.py new file mode 100644 index 00000000..a666ca9f --- /dev/null +++ b/backend/validators/version_status.py @@ -0,0 +1,99 @@ +""" +Version-shame verdict for a validator wallet against the active target node version. + +Extracted from ValidatorWalletViewSet._version_context so the same logic is shared by +the Wall of Shame view and the Grafana sync. The grace period is the global +settings.NODE_VERSION_SHAME_GRACE_DAYS (default 3). + +Versions are compared with `packaging` (PEP 440). When either side is unparseable +(legacy operator values or vendor formats like '0.6.0-genlayer.1'), the verdict is +'on' only on exact string equality with the target, otherwise 'unknown' — never a +lexicographic string comparison, which misorders versions ('0.10.0' < '0.9.0'). +""" +from datetime import timedelta + +from django.conf import settings +from packaging.version import InvalidVersion, parse as parse_version + +_UNSET = object() + + +def default_grace_days(): + return getattr(settings, 'NODE_VERSION_SHAME_GRACE_DAYS', 3) + + +def safe_parse_version(v): + """ + packaging Version, or None when the string is not PEP 440-parseable. + + Semver strings like '0.6.0-genlayer.1' are valid semver but invalid PEP 440; + they must read as "can't compare", never abort a sync. + """ + if not v: + return None + try: + return parse_version(v) + except InvalidVersion: + return None + + +def compute_version_status(wallet, target, now, node_version=_UNSET): + """ + Return a context dict describing the wallet's version status vs the target: + status: 'unknown' | 'on' | 'warning' | 'shame' + node_version, target_version, target_date, target_elapsed_days, + grace_days, grace_days_remaining, shame_started_at + + By default the node version is read from the linked operator + (node_version_). Callers that already know the running version (e.g. + the Grafana sync, which reads it from Prometheus) can pass `node_version` + explicitly — including None to mean "unknown" — to bypass the operator lookup. + """ + if node_version is _UNSET: + field_name = f'node_version_{wallet.network}' + node_version = getattr(wallet.operator, field_name, None) if wallet.operator else None + target_version = target.version if target else None + target_date = target.target_date if target else None + grace_days = default_grace_days() + + context = { + 'status': 'unknown' if not target else 'on', + 'node_version': node_version, + 'target_version': target_version, + 'target_date': target_date, + 'target_elapsed_days': None, + 'grace_days': grace_days, + 'grace_days_remaining': None, + 'shame_started_at': None, + } + if not target or not target_date or target_date > now: + return context + + context['target_elapsed_days'] = max(0, (now - target_date).days) + + if node_version: + node_parsed = safe_parse_version(node_version) + target_parsed = safe_parse_version(target.version) + if node_parsed is not None and target_parsed is not None: + if node_parsed >= target_parsed: + context['status'] = 'on' + return context + # Parseable and behind → fall through to warning/shame. + elif node_version.strip() == (target.version or '').strip(): + # Unparseable but exactly the target string (vendor-format fleets). + context['status'] = 'on' + return context + else: + # Either side unparseable and not an exact match: we can't honestly + # rank them, so we neither clear nor shame. + context['status'] = 'unknown' + return context + + if context['target_elapsed_days'] <= grace_days: + context['status'] = 'warning' + context['grace_days_remaining'] = max(0, grace_days - context['target_elapsed_days']) + return context + + context['status'] = 'shame' + context['shame_started_at'] = target_date + timedelta(days=grace_days) + return context diff --git a/backend/validators/views.py b/backend/validators/views.py index 714fb6f2..385e5dd3 100644 --- a/backend/validators/views.py +++ b/backend/validators/views.py @@ -1,7 +1,6 @@ import logging import re import uuid -from datetime import timedelta from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response @@ -23,6 +22,8 @@ from .permissions import IsCronToken from .genlayer_validators_service import GenLayerValidatorsService from .grafana_service import GrafanaValidatorStatusService +from .version_status import compute_version_status +from . import streaks as streaks_lib from users.models import User from users.serializers import ValidatorSerializer, UserSerializer from contributions.models import Contribution, ContributionType @@ -84,45 +85,23 @@ def get_permissions(self): return [AllowAny()] return [IsAuthenticated()] - @action(detail=False, methods=['get', 'patch'], url_path='me') + @action(detail=False, methods=['get'], url_path='me') def my_profile(self, request): """ - Get or update current user's validator profile. - """ - if request.method == 'GET': - try: - validator = Validator.objects.get(user=request.user) - serializer = self.get_serializer(validator) - return Response(serializer.data) - except Validator.DoesNotExist: - return Response( - {'detail': 'Validator profile not found for current user.'}, - status=status.HTTP_404_NOT_FOUND - ) - - elif request.method == 'PATCH': - unsupported_fields = set(request.data) - { - 'node_version_asimov', - 'node_version_bradbury', - } - if unsupported_fields: - return Response( - {'detail': 'Only node version fields can be updated here.'}, - status=status.HTTP_400_BAD_REQUEST, - ) + Get current user's validator profile. - try: - validator = Validator.objects.get(user=request.user) - except Validator.DoesNotExist: - return Response( - {'detail': 'Validator profile not found for current user.'}, - status=status.HTTP_404_NOT_FOUND - ) - serializer = self.get_serializer(validator, data=request.data, partial=True) - if serializer.is_valid(): - serializer.save() - return Response(serializer.data) - return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + Read-only: node versions are sourced from Grafana (see + validators/grafana_service.py) and are not editable from the portal. + """ + try: + validator = Validator.objects.get(user=request.user) + serializer = self.get_serializer(validator) + return Response(serializer.data) + except Validator.DoesNotExist: + return Response( + {'detail': 'Validator profile not found for current user.'}, + status=status.HTTP_404_NOT_FOUND + ) @action(detail=False, methods=['get'], url_path='all') def all_validators(self, request): @@ -325,7 +304,6 @@ class ValidatorWalletViewSet(viewsets.ReadOnlyModelViewSet): SYNC_LOCK_STALE_AFTER_SECONDS = 1800 SYNC_LOCK_HEARTBEAT_INTERVAL_SECONDS = 60 WALL_OF_SHAME_CACHE_TTL_SECONDS = 60 - VERSION_SHAME_GRACE_DAYS = 3 def get_queryset(self): """ @@ -719,45 +697,8 @@ def _operator_user_payload(wallet): @classmethod def _version_context(cls, wallet, target, now): - field_name = f'node_version_{wallet.network}' - node_version = getattr(wallet.operator, field_name, None) if wallet.operator else None - target_version = target.version if target else None - target_date = target.target_date if target else None - - context = { - 'status': 'unknown' if not target else 'on', - 'node_version': node_version, - 'target_version': target_version, - 'target_date': target_date, - 'target_elapsed_days': None, - 'grace_days': cls.VERSION_SHAME_GRACE_DAYS, - 'grace_days_remaining': None, - 'shame_started_at': None, - } - if not target or not target_date or target_date > now: - return context - - context['target_elapsed_days'] = max(0, (now - target_date).days) - matches_target = bool( - wallet.operator - and node_version - and wallet.operator.version_matches_or_higher(target.version, node_version=node_version) - ) - if matches_target: - context['status'] = 'on' - return context - - if context['target_elapsed_days'] <= cls.VERSION_SHAME_GRACE_DAYS: - context['status'] = 'warning' - context['grace_days_remaining'] = max( - 0, - cls.VERSION_SHAME_GRACE_DAYS - context['target_elapsed_days'], - ) - return context - - context['status'] = 'shame' - context['shame_started_at'] = target_date + timedelta(days=cls.VERSION_SHAME_GRACE_DAYS) - return context + # Grace-aware version verdict; shared with the Grafana sync. + return compute_version_status(wallet, target, now) @classmethod def _sync_shame_started_at(cls, wallets, targets, now): @@ -861,8 +802,13 @@ def _wallet_reasons(cls, wallet, target, now): return reasons @classmethod - def _build_validator_groups(cls, wallets, targets, now): + def _build_validator_groups(cls, wallets, targets, now, snapshot_index=None, + streaks_by_wallet_id=None): groups = {} + snapshot_index = snapshot_index or {} + streaks_by_wallet_id = streaks_by_wallet_id or {} + # Collect each operator+network's wallet ids for the any-node-clean roll-up. + operator_network_wallet_ids = {} for wallet in wallets: operator_key = ( @@ -908,6 +854,11 @@ def _build_validator_groups(cls, wallets, targets, now): **reason, }) + node_streak = streaks_by_wallet_id.get(wallet.id) or {} + operator_network_wallet_ids.setdefault( + (operator_key, wallet.network), [] + ).append(wallet.id) + group['networks'].append({ 'network': wallet.network, 'wallet_id': wallet.id, @@ -921,8 +872,16 @@ def _build_validator_groups(cls, wallets, targets, now): 'target_version': version_context['target_version'], 'status': network_status, 'reasons': reasons, + 'clean_streak_days': node_streak.get('days'), + 'clean_streak_broken_by': node_streak.get('broken_by', []), }) + # One pass: (operator, network) pairs → {op_key: {net: [wallet_ids]}} so the + # per-group rollup below doesn't rescan every pair for every group. + wallet_ids_by_operator = {} + for (op_key, net), ids in operator_network_wallet_ids.items(): + wallet_ids_by_operator.setdefault(op_key, {})[net] = ids + priority = {'shame': 0, 'warning': 1, 'unknown': 2, 'on': 3} network_order = { network: index for index, network in enumerate(settings.TESTNET_NETWORKS.keys()) @@ -939,6 +898,25 @@ def _build_validator_groups(cls, wallets, targets, now): else: group['status'] = 'on' group['networks'].sort(key=lambda item: network_order.get(item['network'], 99)) + + # Per-network operator streak, any-node-clean across the operator's + # wallets on that network (a network-day is clean if ≥1 node was clean). + network_streaks = { + net: streaks_lib.clean_streak(wallet_ids, now, snapshot_index) + for net, wallet_ids in wallet_ids_by_operator.get(group['id'], {}).items() + } + group['network_streaks'] = { + net: { + 'network': net, + 'clean_streak_days': streak['days'], + 'clean_streak_broken_by': streak['broken_by'], + 'since': streak['since'], + } + for net, streak in sorted( + network_streaks.items(), + key=lambda kv: network_order.get(kv[0], 99), + ) + } group['shame_reasons'].sort( key=lambda item: ( network_order.get(item['network'], 99), @@ -1054,8 +1032,21 @@ def wall_of_shame(self, request): default=None, ) - serializer = WallOfShameSerializer(wallets, many=True) - validators = self._build_validator_groups(wallets, targets, now) + # Consecutive "not shamed" uptime streaks, from the stored daily rollup. + # One snapshot query for every wallet on the page, then compute in memory. + snapshot_index = streaks_lib.load_snapshot_index([w.id for w in wallets], now) + streaks_by_wallet_id = { + w.id: streaks_lib.clean_streak([w.id], now, snapshot_index) + for w in wallets + } + + serializer = WallOfShameSerializer( + wallets, many=True, + context={'streaks_by_wallet_id': streaks_by_wallet_id}, + ) + validators = self._build_validator_groups( + wallets, targets, now, snapshot_index, streaks_by_wallet_id, + ) on_count = sum(1 for validator in validators if validator['status'] == 'on') shame_count = sum(1 for validator in validators if validator['status'] == 'shame') warning_count = sum(1 for validator in validators if validator['status'] == 'warning') diff --git a/frontend/CLAUDE.md b/frontend/CLAUDE.md index adfb6f59..4d5a671f 100644 --- a/frontend/CLAUDE.md +++ b/frontend/CLAUDE.md @@ -437,8 +437,8 @@ const routes = { - Shows participant stats, contributions, validator status - Shows "Edit Profile" button if viewing own profile - **`/profile`** - Edit profile page (authenticated users only) - - Component: `EditProfile.svelte` - - Only allows editing display name and node version + - Component: `ProfileEdit.svelte` + - Allows editing display name and profile fields; node version is shown read-only (Grafana-sourced, auto-detected from node metrics — not editable) - Redirects to public profile after save with success message ### API Integration (`src/lib/api.js`) @@ -604,7 +604,7 @@ Investor-oriented home page (`routes/Overview.svelte`), top to bottom: hero → - reCAPTCHA token validated on backend before submission - Uses VITE_RECAPTCHA_SITE_KEY from environment (falls back to test key) - `EditSubmission.svelte` - Edit submitted contributions (supports URL and description evidence only - no file uploads) -- `EditProfile.svelte` - User profile editing (name and node version) +- `ProfileEdit.svelte` - User profile editing (name and profile fields; node version shown read-only, Grafana-sourced) - `Profile.svelte` - Public participant profile view ### Wallet Integration (`src/lib/wallet/`) diff --git a/frontend/src/routes/ProfileEdit.svelte b/frontend/src/routes/ProfileEdit.svelte index 0c1cb2b2..37212e57 100644 --- a/frontend/src/routes/ProfileEdit.svelte +++ b/frontend/src/routes/ProfileEdit.svelte @@ -17,8 +17,6 @@ // Form fields let name = $state(""); - let nodeVersionAsimov = $state(""); - let nodeVersionBradbury = $state(""); // New profile fields let email = $state(""); @@ -69,8 +67,6 @@ let hasChanges = $derived( user && (name !== (user.name || "") || - (user.validator && nodeVersionAsimov !== (user.validator?.node_version_asimov || "")) || - (user.validator && nodeVersionBradbury !== (user.validator?.node_version_bradbury || "")) || description !== (user.description || "") || website !== (user.website || "") || telegramHandle !== (user.telegram_handle || "") || @@ -98,8 +94,6 @@ const userData = await getCurrentUser(); user = userData; name = userData.name || ""; - nodeVersionAsimov = userData.validator?.node_version_asimov || ""; - nodeVersionBradbury = userData.validator?.node_version_bradbury || ""; // Load profile fields // Always show the email if it exists, regardless of verification status @@ -186,14 +180,6 @@ linkedin_handle: linkedinHandle.trim(), }; - // Only include node versions if they have changed - if (nodeVersionAsimov !== (user.validator?.node_version_asimov || "")) { - updateData.node_version_asimov = nodeVersionAsimov; - } - if (nodeVersionBradbury !== (user.validator?.node_version_bradbury || "")) { - updateData.node_version_bradbury = nodeVersionBradbury; - } - const updatedUser = await updateUserProfile(updateData); // Update the user store with new data userStore.updateUser(updatedUser); @@ -855,20 +841,11 @@ {/if}
- - -

Your Asimov network node version

+ Node Version +
+ {user.validator?.node_version_asimov || "Not detected yet"} +
+

Detected automatically from your node's metrics

{#if asimovWallets.length > 0} @@ -918,20 +895,11 @@ {/if}
- - -

Your Bradbury network node version

+ Node Version +
+ {user.validator?.node_version_bradbury || "Not detected yet"} +
+

Detected automatically from your node's metrics

{#if bradburyWallets.length > 0}