Skip to main content
fast by design

Baseline Drift and Long-Term Performance Tracking

11 min read Chapter 90 of 90

Baseline Drift and Long-Term Performance Tracking

The performance gate from Section 1 catches regressions within a single PR. It does not catch drift. Drift is the slow accumulation of latency over weeks and months, where each individual change is within tolerance but the cumulative effect is not.

Consider the article endpoint. The baseline says p95 should be 120ms. The tolerance is 10%, so anything under 132ms passes.

Weekp95 (ms)vs BaselineGate Result
1118-1.7%PASS
4122+1.7%PASS
8126+5.0%PASS
12129+7.5%PASS
16131+9.2%PASS
20130+8.3%PASS
24132+10.0%PASS (barely)

Every PR passed the gate. Every PR was within tolerance. But over six months, the endpoint got 12% slower. No single commit caused it. Dozens of commits each added a fraction of a millisecond: an extra log statement, a new middleware, a slightly larger response payload, one more database column in the SELECT.

The baseline file from the main chapter is a point-in-time snapshot. It does not move. The application moves around it. This is drift.

Detecting Drift Programmatically

Drift detection requires comparing current performance against a rolling window, not a static baseline. The script reads archived CI results and computes the trend.

# drift_detector.py: Detect performance drift over time

import json
import statistics
from pathlib import Path
from datetime import datetime, timedelta


def load_history(results_dir: str, days: int = 90) -> list[dict]:
    cutoff = datetime.now() - timedelta(days=days)
    history = []

    for path in sorted(Path(results_dir).glob("*.json")):
        # Filename format: 20250115-143022-a1b2c3d4.json
        date_str = path.stem.split("-")[0]
        try:
            file_date = datetime.strptime(date_str, "%Y%m%d")
        except ValueError:
            continue
        if file_date < cutoff:
            continue
        data = json.loads(path.read_text())
        data["_date"] = file_date
        data["_file"] = path.name
        history.append(data)

    return history


def detect_drift(
    history: list[dict],
    endpoint: str,
    metric: str = "p95",
    window_days: int = 14,
) -> dict:
    if len(history) < 10:
        return {"status": "insufficient_data", "count": len(history)}

    # Split into early window and recent window
    history.sort(key=lambda x: x["_date"])
    midpoint = len(history) // 2
    early = history[:midpoint]
    recent = history[midpoint:]

    def extract_values(records, ep, m):
        values = []
        for r in records:
            ep_data = r.get("endpoints", {}).get(ep)
            if ep_data and m in ep_data:
                values.append(ep_data[m])
        return values

    early_values = extract_values(early, endpoint, metric)
    recent_values = extract_values(recent, endpoint, metric)

    if not early_values or not recent_values:
        return {"status": "no_data_for_endpoint"}

    early_median = statistics.median(early_values)
    recent_median = statistics.median(recent_values)
    drift_pct = ((recent_median - early_median) / early_median) * 100

    # Linear regression for trend direction
    all_values = extract_values(history, endpoint, metric)
    n = len(all_values)
    x_mean = (n - 1) / 2
    y_mean = statistics.mean(all_values)
    numerator = sum((i - x_mean) * (v - y_mean) for i, v in enumerate(all_values))
    denominator = sum((i - x_mean) ** 2 for i in range(n))
    slope = numerator / denominator if denominator != 0 else 0

    return {
        "status": "analyzed",
        "endpoint": endpoint,
        "metric": metric,
        "early_median_ms": round(early_median, 1),
        "recent_median_ms": round(recent_median, 1),
        "drift_pct": round(drift_pct, 1),
        "trend_slope_ms_per_run": round(slope, 3),
        "data_points": n,
        "verdict": classify_drift(drift_pct, slope),
    }


def classify_drift(drift_pct: float, slope: float) -> str:
    if drift_pct > 15:
        return "CRITICAL: significant drift detected"
    if drift_pct > 8:
        return "WARNING: moderate drift, baseline update recommended"
    if slope > 0.5:
        return "WATCH: upward trend, monitor next 2 weeks"
    if drift_pct < -5:
        return "IMPROVED: performance has gotten better, update baseline"
    return "STABLE: no significant drift"

The drift detector splits the history into halves and compares medians. It also computes a linear regression slope to distinguish between “jumped once and stabilized” (high drift, near-zero slope) and “steadily increasing” (moderate drift, positive slope). The second pattern is more dangerous because it will continue.

Run this weekly as a scheduled GitHub Action:

name: Performance Drift Report
on:
  schedule:
    - cron: '0 9 * * 1'  # Every Monday at 9am UTC

jobs:
  drift-check:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Download recent results from S3
        run: |
          mkdir -p perf-history
          aws s3 sync s3://perf-results-bucket/content-platform/ perf-history/ \
            --exclude "*" \
            --include "202*.json"

      - name: Run drift detection
        run: python tests/perf/drift_detector.py --dir perf-history --output drift-report.json

      - name: Post to Slack
        if: always()
        run: |
          python tests/perf/format_drift_slack.py drift-report.json | \
            curl -X POST -H 'Content-type: application/json' \
              -d @- "${{ secrets.SLACK_WEBHOOK_URL }}"

The Monday morning Slack message tells the team whether performance is stable, drifting, or improved. If drift is detected, the message includes the top 3 endpoints by drift percentage and the date range of the trend.

Pushing CI Results to Prometheus

Archived JSON files in S3 work for batch analysis. For real-time dashboards and alerting, push CI results to Prometheus using the Pushgateway.

# push_metrics.py: Push Locust CI results to Prometheus Pushgateway

import json
from pathlib import Path
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

PUSHGATEWAY_URL = "http://pushgateway.internal:9091"
JOB_NAME = "ci_performance_test"


def push_results(results_path: str, commit_sha: str, branch: str):
    results = json.loads(Path(results_path).read_text())
    registry = CollectorRegistry()

    p50_gauge = Gauge(
        "ci_perf_p50_ms",
        "CI performance test p50 latency",
        ["endpoint", "branch"],
        registry=registry,
    )
    p95_gauge = Gauge(
        "ci_perf_p95_ms",
        "CI performance test p95 latency",
        ["endpoint", "branch"],
        registry=registry,
    )
    p99_gauge = Gauge(
        "ci_perf_p99_ms",
        "CI performance test p99 latency",
        ["endpoint", "branch"],
        registry=registry,
    )
    error_gauge = Gauge(
        "ci_perf_error_rate",
        "CI performance test error rate",
        ["endpoint", "branch"],
        registry=registry,
    )

    for endpoint, metrics in results["endpoints"].items():
        labels = [endpoint, branch]
        p50_gauge.labels(*labels).set(metrics["p50"])
        p95_gauge.labels(*labels).set(metrics["p95"])
        p99_gauge.labels(*labels).set(metrics["p99"])
        error_rate = metrics.get("failures", 0) / max(metrics.get("count", 1), 1)
        error_gauge.labels(*labels).set(error_rate)

    push_to_gateway(
        PUSHGATEWAY_URL,
        job=JOB_NAME,
        grouping_key={"commit": commit_sha[:8]},
        registry=registry,
    )
    print(f"Pushed metrics for {len(results['endpoints'])} endpoints to Pushgateway")

Add this to the CI workflow after the comparison step:

- name: Push metrics to Prometheus
  if: github.ref == 'refs/heads/main'
  run: |
    pip install prometheus-client
    python tests/perf/push_metrics.py \
      perf-results.json \
      "${{ github.sha }}" \
      "main"

Only push from the main branch. PR branches would pollute the time series with data from code that may never ship.

With CI results in Prometheus, build a Grafana dashboard that shows performance over time. The dashboard has three panels.

Panel 1: p95 Latency Trend by Endpoint. A time series graph with one line per endpoint. The Y axis is milliseconds. The X axis is time. Each data point is one CI run on main. Add the baseline as a constant horizontal line using Grafana’s threshold feature.

PromQL for the article endpoint trend:

ci_perf_p95_ms{endpoint="/api/articles/[slug]", branch="main"}

Add a threshold annotation at the baseline value:

# Baseline: 120ms with 10% tolerance = 132ms threshold
vector(132)

Panel 2: Drift Rate (Week-over-Week). A bar chart showing the percentage change in p95 between this week and last week, per endpoint. Green bars mean improvement. Red bars mean regression. This is the dashboard equivalent of the drift detector script.

# This week's average p95
avg_over_time(ci_perf_p95_ms{endpoint="/api/articles/[slug]", branch="main"}[7d])
/
# Last week's average p95
avg_over_time(ci_perf_p95_ms{endpoint="/api/articles/[slug]", branch="main"}[7d] offset 7d)
- 1

Multiply by 100 for percentage. Values above 0 are regressions. Values below 0 are improvements.

Panel 3: Error Rate Over Time. A stacked area chart of error rates per endpoint. Should be near zero. Any sustained increase indicates a reliability regression, not just a latency regression.

ci_perf_error_rate{branch="main"} * 100

Automated Baseline Updates

The baseline should not be static forever. As the application grows, some endpoints will legitimately get slower. New features add complexity. Larger datasets increase query time. The baseline needs to track intentional changes.

Automated baseline updates prevent the baseline from becoming so stale that every PR fails the gate. But fully automated updates defeat the purpose of the gate. The solution is semi-automated: a script proposes a new baseline, and a human reviews it.

# update_baseline.py: Propose new baseline from recent CI data

import json
import statistics
from pathlib import Path


def propose_baseline(
    history_dir: str,
    current_baseline_path: str,
    output_path: str,
    lookback_runs: int = 20,
):
    current = json.loads(Path(current_baseline_path).read_text())
    history_files = sorted(Path(history_dir).glob("*.json"))[-lookback_runs:]

    if len(history_files) < 10:
        print(f"Only {len(history_files)} runs available, need at least 10")
        return

    runs = [json.loads(f.read_text()) for f in history_files]
    proposed = json.loads(json.dumps(current))  # deep copy

    changes = []
    for endpoint in current["thresholds"]:
        for metric_key, result_key in [
            ("p50_ms", "p50"),
            ("p95_ms", "p95"),
            ("p99_ms", "p99"),
        ]:
            if metric_key not in current["thresholds"][endpoint]:
                continue

            values = []
            for run in runs:
                ep_data = run.get("endpoints", {}).get(endpoint, {})
                if result_key in ep_data:
                    values.append(ep_data[result_key])

            if not values:
                continue

            current_threshold = current["thresholds"][endpoint][metric_key]
            observed_p90 = sorted(values)[int(len(values) * 0.9)]
            proposed_value = round(observed_p90 * 1.05)  # 5% headroom

            if abs(proposed_value - current_threshold) / current_threshold > 0.05:
                proposed["thresholds"][endpoint][metric_key] = proposed_value
                direction = "UP" if proposed_value > current_threshold else "DOWN"
                changes.append(
                    f"  {direction} {endpoint} {metric_key}: "
                    f"{current_threshold}ms -> {proposed_value}ms"
                )

    if changes:
        Path(output_path).write_text(json.dumps(proposed, indent=2))
        print(f"Proposed baseline changes ({len(changes)}):")
        for change in changes:
            print(change)
        print(f"\nWritten to {output_path}")
        print("Review the changes and copy to perf-baseline.json if acceptable.")
    else:
        print("No significant changes detected. Baseline is current.")

The script takes the 90th percentile of recent runs (not the maximum, which might be a flaky outlier) and adds 5% headroom. It writes a proposed baseline to a separate file. A developer reviews the changes, checks whether the regressions are intentional, and commits the update.

Run this monthly or when the drift detector reports a WARNING:

python tests/perf/update_baseline.py \
  --history-dir perf-history/ \
  --current perf-baseline.json \
  --output perf-baseline-proposed.json

Capacity Planning from CI Data

CI performance data is a proxy for production capacity. If the article endpoint’s p95 increases by 2ms per month in CI, and the CI environment has 2 CPU cores, production with 8 cores will see a proportional increase. The absolute numbers differ, but the trend is transferable.

A capacity planning query combines CI trend data with production traffic projections:

# capacity_forecast.py: Project when performance SLOs will be breached

import statistics
from datetime import datetime, timedelta


def forecast_breach(
    history: list[dict],
    endpoint: str,
    metric: str,
    slo_ms: float,
) -> dict:
    values = []
    for record in sorted(history, key=lambda x: x["_date"]):
        ep_data = record.get("endpoints", {}).get(endpoint, {})
        if metric in ep_data:
            values.append({
                "date": record["_date"],
                "value": ep_data[metric],
            })

    if len(values) < 10:
        return {"status": "insufficient_data"}

    # Linear regression
    n = len(values)
    x_values = list(range(n))
    y_values = [v["value"] for v in values]
    x_mean = statistics.mean(x_values)
    y_mean = statistics.mean(y_values)

    numerator = sum(
        (x - x_mean) * (y - y_mean)
        for x, y in zip(x_values, y_values)
    )
    denominator = sum((x - x_mean) ** 2 for x in x_values)
    slope = numerator / denominator if denominator else 0
    intercept = y_mean - slope * x_mean

    current_value = slope * (n - 1) + intercept

    if slope <= 0:
        return {
            "status": "no_breach",
            "reason": "performance is stable or improving",
            "current_ms": round(current_value, 1),
            "slope_ms_per_run": round(slope, 3),
        }

    # How many more runs until SLO is breached?
    runs_to_breach = (slo_ms - current_value) / slope
    if runs_to_breach < 0:
        return {
            "status": "already_breached",
            "current_ms": round(current_value, 1),
            "slo_ms": slo_ms,
        }

    # Estimate days (assuming ~1 main branch merge per day)
    days_between_runs = 1
    days_to_breach = runs_to_breach * days_between_runs
    breach_date = datetime.now() + timedelta(days=days_to_breach)

    return {
        "status": "projected_breach",
        "current_ms": round(current_value, 1),
        "slo_ms": slo_ms,
        "slope_ms_per_run": round(slope, 3),
        "runs_to_breach": int(runs_to_breach),
        "estimated_breach_date": breach_date.strftime("%Y-%m-%d"),
        "days_remaining": int(days_to_breach),
    }

Sample output:

{
  "status": "projected_breach",
  "current_ms": 145.2,
  "slo_ms": 200,
  "slope_ms_per_run": 0.34,
  "runs_to_breach": 161,
  "estimated_breach_date": "2025-11-15",
  "days_remaining": 161
}

This says: at the current rate of drift, the article endpoint will breach its 200ms SLO in 161 days. That is five months of warning. Enough time to plan optimization work, allocate engineering resources, or adjust the SLO.

Without this data, the team discovers the SLO breach when it happens in production. With this data, the tech lead can put “article endpoint optimization” on the Q3 roadmap and point to the trend line as justification.

The Feedback Loop

The complete system creates a closed loop:

  1. Developer opens a PR
  2. CI runs Locust test
  3. Comparison script checks against baseline
  4. Gate blocks or warns on regression
  5. Results archived to S3 and pushed to Prometheus
  6. Weekly drift detector checks for cumulative regression
  7. Monthly baseline proposer suggests updates
  8. Quarterly capacity forecast projects SLO breach dates
  9. Team prioritizes optimization work based on projections

Each layer catches what the previous layer misses. The CI gate catches acute regressions. The drift detector catches chronic regressions. The capacity forecast catches trend-based risks. No single tool is sufficient. The system works because the tools compose.

Trade-offs

DecisionBenefitCost
Prometheus + Grafana for CI dataReal-time dashboards, alertingInfrastructure to maintain
Semi-automated baseline updatesPrevents stale baselinesRequires human review
Weekly drift reportsCatches cumulative regressionAlert fatigue if thresholds are too sensitive
Capacity forecastingMonths of advance warningLinear extrapolation is naive, real degradation is rarely linear
Pushgateway for CI metricsSimple integrationSingle point of failure, stale metrics if CI stops running

The biggest risk is false confidence. A green CI gate and a stable drift report do not mean production is fast. CI tests run on synthetic data, with synthetic traffic patterns, on hardware that does not match production. CI performance testing answers one question: “did this change make things slower?” It does not answer “is production fast enough?” Production monitoring, covered in earlier chapters, answers that question. The two systems complement each other. Neither replaces the other.