Baseline Drift and Long-Term Performance Tracking
Baseline Drift and Long-Term Performance Tracking
The performance gate from Section 1 catches regressions within a single PR. It does not catch drift. Drift is the slow accumulation of latency over weeks and months, where each individual change is within tolerance but the cumulative effect is not.
Consider the article endpoint. The baseline says p95 should be 120ms. The tolerance is 10%, so anything under 132ms passes.
| Week | p95 (ms) | vs Baseline | Gate Result |
|---|---|---|---|
| 1 | 118 | -1.7% | PASS |
| 4 | 122 | +1.7% | PASS |
| 8 | 126 | +5.0% | PASS |
| 12 | 129 | +7.5% | PASS |
| 16 | 131 | +9.2% | PASS |
| 20 | 130 | +8.3% | PASS |
| 24 | 132 | +10.0% | PASS (barely) |
Every PR passed the gate. Every PR was within tolerance. But over six months, the endpoint got 12% slower. No single commit caused it. Dozens of commits each added a fraction of a millisecond: an extra log statement, a new middleware, a slightly larger response payload, one more database column in the SELECT.
The baseline file from the main chapter is a point-in-time snapshot. It does not move. The application moves around it. This is drift.
Detecting Drift Programmatically
Drift detection requires comparing current performance against a rolling window, not a static baseline. The script reads archived CI results and computes the trend.
# drift_detector.py: Detect performance drift over time
import json
import statistics
from pathlib import Path
from datetime import datetime, timedelta
def load_history(results_dir: str, days: int = 90) -> list[dict]:
cutoff = datetime.now() - timedelta(days=days)
history = []
for path in sorted(Path(results_dir).glob("*.json")):
# Filename format: 20250115-143022-a1b2c3d4.json
date_str = path.stem.split("-")[0]
try:
file_date = datetime.strptime(date_str, "%Y%m%d")
except ValueError:
continue
if file_date < cutoff:
continue
data = json.loads(path.read_text())
data["_date"] = file_date
data["_file"] = path.name
history.append(data)
return history
def detect_drift(
history: list[dict],
endpoint: str,
metric: str = "p95",
window_days: int = 14,
) -> dict:
if len(history) < 10:
return {"status": "insufficient_data", "count": len(history)}
# Split into early window and recent window
history.sort(key=lambda x: x["_date"])
midpoint = len(history) // 2
early = history[:midpoint]
recent = history[midpoint:]
def extract_values(records, ep, m):
values = []
for r in records:
ep_data = r.get("endpoints", {}).get(ep)
if ep_data and m in ep_data:
values.append(ep_data[m])
return values
early_values = extract_values(early, endpoint, metric)
recent_values = extract_values(recent, endpoint, metric)
if not early_values or not recent_values:
return {"status": "no_data_for_endpoint"}
early_median = statistics.median(early_values)
recent_median = statistics.median(recent_values)
drift_pct = ((recent_median - early_median) / early_median) * 100
# Linear regression for trend direction
all_values = extract_values(history, endpoint, metric)
n = len(all_values)
x_mean = (n - 1) / 2
y_mean = statistics.mean(all_values)
numerator = sum((i - x_mean) * (v - y_mean) for i, v in enumerate(all_values))
denominator = sum((i - x_mean) ** 2 for i in range(n))
slope = numerator / denominator if denominator != 0 else 0
return {
"status": "analyzed",
"endpoint": endpoint,
"metric": metric,
"early_median_ms": round(early_median, 1),
"recent_median_ms": round(recent_median, 1),
"drift_pct": round(drift_pct, 1),
"trend_slope_ms_per_run": round(slope, 3),
"data_points": n,
"verdict": classify_drift(drift_pct, slope),
}
def classify_drift(drift_pct: float, slope: float) -> str:
if drift_pct > 15:
return "CRITICAL: significant drift detected"
if drift_pct > 8:
return "WARNING: moderate drift, baseline update recommended"
if slope > 0.5:
return "WATCH: upward trend, monitor next 2 weeks"
if drift_pct < -5:
return "IMPROVED: performance has gotten better, update baseline"
return "STABLE: no significant drift"
The drift detector splits the history into halves and compares medians. It also computes a linear regression slope to distinguish between “jumped once and stabilized” (high drift, near-zero slope) and “steadily increasing” (moderate drift, positive slope). The second pattern is more dangerous because it will continue.
Run this weekly as a scheduled GitHub Action:
name: Performance Drift Report
on:
schedule:
- cron: '0 9 * * 1' # Every Monday at 9am UTC
jobs:
drift-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download recent results from S3
run: |
mkdir -p perf-history
aws s3 sync s3://perf-results-bucket/content-platform/ perf-history/ \
--exclude "*" \
--include "202*.json"
- name: Run drift detection
run: python tests/perf/drift_detector.py --dir perf-history --output drift-report.json
- name: Post to Slack
if: always()
run: |
python tests/perf/format_drift_slack.py drift-report.json | \
curl -X POST -H 'Content-type: application/json' \
-d @- "${{ secrets.SLACK_WEBHOOK_URL }}"
The Monday morning Slack message tells the team whether performance is stable, drifting, or improved. If drift is detected, the message includes the top 3 endpoints by drift percentage and the date range of the trend.
Pushing CI Results to Prometheus
Archived JSON files in S3 work for batch analysis. For real-time dashboards and alerting, push CI results to Prometheus using the Pushgateway.
# push_metrics.py: Push Locust CI results to Prometheus Pushgateway
import json
from pathlib import Path
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
PUSHGATEWAY_URL = "http://pushgateway.internal:9091"
JOB_NAME = "ci_performance_test"
def push_results(results_path: str, commit_sha: str, branch: str):
results = json.loads(Path(results_path).read_text())
registry = CollectorRegistry()
p50_gauge = Gauge(
"ci_perf_p50_ms",
"CI performance test p50 latency",
["endpoint", "branch"],
registry=registry,
)
p95_gauge = Gauge(
"ci_perf_p95_ms",
"CI performance test p95 latency",
["endpoint", "branch"],
registry=registry,
)
p99_gauge = Gauge(
"ci_perf_p99_ms",
"CI performance test p99 latency",
["endpoint", "branch"],
registry=registry,
)
error_gauge = Gauge(
"ci_perf_error_rate",
"CI performance test error rate",
["endpoint", "branch"],
registry=registry,
)
for endpoint, metrics in results["endpoints"].items():
labels = [endpoint, branch]
p50_gauge.labels(*labels).set(metrics["p50"])
p95_gauge.labels(*labels).set(metrics["p95"])
p99_gauge.labels(*labels).set(metrics["p99"])
error_rate = metrics.get("failures", 0) / max(metrics.get("count", 1), 1)
error_gauge.labels(*labels).set(error_rate)
push_to_gateway(
PUSHGATEWAY_URL,
job=JOB_NAME,
grouping_key={"commit": commit_sha[:8]},
registry=registry,
)
print(f"Pushed metrics for {len(results['endpoints'])} endpoints to Pushgateway")
Add this to the CI workflow after the comparison step:
- name: Push metrics to Prometheus
if: github.ref == 'refs/heads/main'
run: |
pip install prometheus-client
python tests/perf/push_metrics.py \
perf-results.json \
"${{ github.sha }}" \
"main"
Only push from the main branch. PR branches would pollute the time series with data from code that may never ship.
Grafana Dashboard for Performance Trends
With CI results in Prometheus, build a Grafana dashboard that shows performance over time. The dashboard has three panels.
Panel 1: p95 Latency Trend by Endpoint. A time series graph with one line per endpoint. The Y axis is milliseconds. The X axis is time. Each data point is one CI run on main. Add the baseline as a constant horizontal line using Grafana’s threshold feature.
PromQL for the article endpoint trend:
ci_perf_p95_ms{endpoint="/api/articles/[slug]", branch="main"}
Add a threshold annotation at the baseline value:
# Baseline: 120ms with 10% tolerance = 132ms threshold
vector(132)
Panel 2: Drift Rate (Week-over-Week). A bar chart showing the percentage change in p95 between this week and last week, per endpoint. Green bars mean improvement. Red bars mean regression. This is the dashboard equivalent of the drift detector script.
# This week's average p95
avg_over_time(ci_perf_p95_ms{endpoint="/api/articles/[slug]", branch="main"}[7d])
/
# Last week's average p95
avg_over_time(ci_perf_p95_ms{endpoint="/api/articles/[slug]", branch="main"}[7d] offset 7d)
- 1
Multiply by 100 for percentage. Values above 0 are regressions. Values below 0 are improvements.
Panel 3: Error Rate Over Time. A stacked area chart of error rates per endpoint. Should be near zero. Any sustained increase indicates a reliability regression, not just a latency regression.
ci_perf_error_rate{branch="main"} * 100
Automated Baseline Updates
The baseline should not be static forever. As the application grows, some endpoints will legitimately get slower. New features add complexity. Larger datasets increase query time. The baseline needs to track intentional changes.
Automated baseline updates prevent the baseline from becoming so stale that every PR fails the gate. But fully automated updates defeat the purpose of the gate. The solution is semi-automated: a script proposes a new baseline, and a human reviews it.
# update_baseline.py: Propose new baseline from recent CI data
import json
import statistics
from pathlib import Path
def propose_baseline(
history_dir: str,
current_baseline_path: str,
output_path: str,
lookback_runs: int = 20,
):
current = json.loads(Path(current_baseline_path).read_text())
history_files = sorted(Path(history_dir).glob("*.json"))[-lookback_runs:]
if len(history_files) < 10:
print(f"Only {len(history_files)} runs available, need at least 10")
return
runs = [json.loads(f.read_text()) for f in history_files]
proposed = json.loads(json.dumps(current)) # deep copy
changes = []
for endpoint in current["thresholds"]:
for metric_key, result_key in [
("p50_ms", "p50"),
("p95_ms", "p95"),
("p99_ms", "p99"),
]:
if metric_key not in current["thresholds"][endpoint]:
continue
values = []
for run in runs:
ep_data = run.get("endpoints", {}).get(endpoint, {})
if result_key in ep_data:
values.append(ep_data[result_key])
if not values:
continue
current_threshold = current["thresholds"][endpoint][metric_key]
observed_p90 = sorted(values)[int(len(values) * 0.9)]
proposed_value = round(observed_p90 * 1.05) # 5% headroom
if abs(proposed_value - current_threshold) / current_threshold > 0.05:
proposed["thresholds"][endpoint][metric_key] = proposed_value
direction = "UP" if proposed_value > current_threshold else "DOWN"
changes.append(
f" {direction} {endpoint} {metric_key}: "
f"{current_threshold}ms -> {proposed_value}ms"
)
if changes:
Path(output_path).write_text(json.dumps(proposed, indent=2))
print(f"Proposed baseline changes ({len(changes)}):")
for change in changes:
print(change)
print(f"\nWritten to {output_path}")
print("Review the changes and copy to perf-baseline.json if acceptable.")
else:
print("No significant changes detected. Baseline is current.")
The script takes the 90th percentile of recent runs (not the maximum, which might be a flaky outlier) and adds 5% headroom. It writes a proposed baseline to a separate file. A developer reviews the changes, checks whether the regressions are intentional, and commits the update.
Run this monthly or when the drift detector reports a WARNING:
python tests/perf/update_baseline.py \
--history-dir perf-history/ \
--current perf-baseline.json \
--output perf-baseline-proposed.json
Capacity Planning from CI Data
CI performance data is a proxy for production capacity. If the article endpoint’s p95 increases by 2ms per month in CI, and the CI environment has 2 CPU cores, production with 8 cores will see a proportional increase. The absolute numbers differ, but the trend is transferable.
A capacity planning query combines CI trend data with production traffic projections:
# capacity_forecast.py: Project when performance SLOs will be breached
import statistics
from datetime import datetime, timedelta
def forecast_breach(
history: list[dict],
endpoint: str,
metric: str,
slo_ms: float,
) -> dict:
values = []
for record in sorted(history, key=lambda x: x["_date"]):
ep_data = record.get("endpoints", {}).get(endpoint, {})
if metric in ep_data:
values.append({
"date": record["_date"],
"value": ep_data[metric],
})
if len(values) < 10:
return {"status": "insufficient_data"}
# Linear regression
n = len(values)
x_values = list(range(n))
y_values = [v["value"] for v in values]
x_mean = statistics.mean(x_values)
y_mean = statistics.mean(y_values)
numerator = sum(
(x - x_mean) * (y - y_mean)
for x, y in zip(x_values, y_values)
)
denominator = sum((x - x_mean) ** 2 for x in x_values)
slope = numerator / denominator if denominator else 0
intercept = y_mean - slope * x_mean
current_value = slope * (n - 1) + intercept
if slope <= 0:
return {
"status": "no_breach",
"reason": "performance is stable or improving",
"current_ms": round(current_value, 1),
"slope_ms_per_run": round(slope, 3),
}
# How many more runs until SLO is breached?
runs_to_breach = (slo_ms - current_value) / slope
if runs_to_breach < 0:
return {
"status": "already_breached",
"current_ms": round(current_value, 1),
"slo_ms": slo_ms,
}
# Estimate days (assuming ~1 main branch merge per day)
days_between_runs = 1
days_to_breach = runs_to_breach * days_between_runs
breach_date = datetime.now() + timedelta(days=days_to_breach)
return {
"status": "projected_breach",
"current_ms": round(current_value, 1),
"slo_ms": slo_ms,
"slope_ms_per_run": round(slope, 3),
"runs_to_breach": int(runs_to_breach),
"estimated_breach_date": breach_date.strftime("%Y-%m-%d"),
"days_remaining": int(days_to_breach),
}
Sample output:
{
"status": "projected_breach",
"current_ms": 145.2,
"slo_ms": 200,
"slope_ms_per_run": 0.34,
"runs_to_breach": 161,
"estimated_breach_date": "2025-11-15",
"days_remaining": 161
}
This says: at the current rate of drift, the article endpoint will breach its 200ms SLO in 161 days. That is five months of warning. Enough time to plan optimization work, allocate engineering resources, or adjust the SLO.
Without this data, the team discovers the SLO breach when it happens in production. With this data, the tech lead can put “article endpoint optimization” on the Q3 roadmap and point to the trend line as justification.
The Feedback Loop
The complete system creates a closed loop:
- Developer opens a PR
- CI runs Locust test
- Comparison script checks against baseline
- Gate blocks or warns on regression
- Results archived to S3 and pushed to Prometheus
- Weekly drift detector checks for cumulative regression
- Monthly baseline proposer suggests updates
- Quarterly capacity forecast projects SLO breach dates
- Team prioritizes optimization work based on projections
Each layer catches what the previous layer misses. The CI gate catches acute regressions. The drift detector catches chronic regressions. The capacity forecast catches trend-based risks. No single tool is sufficient. The system works because the tools compose.
Trade-offs
| Decision | Benefit | Cost |
|---|---|---|
| Prometheus + Grafana for CI data | Real-time dashboards, alerting | Infrastructure to maintain |
| Semi-automated baseline updates | Prevents stale baselines | Requires human review |
| Weekly drift reports | Catches cumulative regression | Alert fatigue if thresholds are too sensitive |
| Capacity forecasting | Months of advance warning | Linear extrapolation is naive, real degradation is rarely linear |
| Pushgateway for CI metrics | Simple integration | Single point of failure, stale metrics if CI stops running |
The biggest risk is false confidence. A green CI gate and a stable drift report do not mean production is fast. CI tests run on synthetic data, with synthetic traffic patterns, on hardware that does not match production. CI performance testing answers one question: “did this change make things slower?” It does not answer “is production fast enough?” Production monitoring, covered in earlier chapters, answers that question. The two systems complement each other. Neither replaces the other.