Database backups are often configured once and then quietly forgotten — until something goes wrong.
In modern environments, teams may operate dozens or hundreds of database servers supporting ERP, CRM, and other critical systems. At the same time, they are managing many other priorities — sales, inventory, procurement, HR, production, and more — each with its own dashboards and alerts.
In this landscape, backups don’t become less important — but they do become less visible.
The real challenge is not backups themselves, but the environment they exist in. Backup health is often monitored in a separate dashboard, making it easy to overlook alongside other operational tools.
As infrastructure evolves — with new servers, changing configurations, and ongoing updates — gaps in backup coverage can appear. Servers may lack backups, configurations may drift, and failures may go unnoticed.
What’s needed is not another dashboard, but a way to integrate backup visibility into existing monitoring workflows, so that critical backup signals appear alongside other business-critical metrics.
To ensure reliability, teams need automated monitoring that continuously verifies:
-
backups are running successfully
-
servers are online
-
retention policies are correctly applied
-
new servers are protected
The SqlBak API enables IT teams to integrate backup infrastructure into their existing monitoring systems and automatically audit backups across the entire environment.
In this article, we’ll explore how organizations use the SqlBak API to build automated backup monitoring and compliance checks.
A real-world use case
One of our customers integrated the SqlBak API into their internal monitoring system to bring backup visibility into their broader operational workflow.
Instead of relying solely on the SqlBak dashboard, they periodically query the API and analyze backup configuration programmatically.
Their automation generates internal reports that include:
-
total number of servers
-
number of configured backup jobs
-
backup success rate
-
servers that are up to date with backups
More importantly, the system proactively detects operational issues such as:
-
servers without backups
-
failed backup jobs
-
offline backup agents
-
retention policy violations
-
abnormal backup sizes
By embedding backup monitoring into their existing infrastructure, they ensure that backup health is continuously validated — without requiring manual checks or additional dashboards.
This approach allows them to maintain reliability and control even as their infrastructure scales.
SqlBak API basics
The SqlBak API exposes key information about backup infrastructure, including:
-
servers
-
backup jobs
-
job run history
-
job settings
-
backup destinations
-
DBMS connections
Authentication is performed using an API token.
Example header:
X-SqlBak-Token: YOUR_API_TOKEN
Example request:
GET https://sqlbak.com/public-api/v1/servers
Example response:
{
"data": {
"list": [
{
"entity": "server",
"id": 76,
"name": "My Server1",
"status": "online",
"app_type": "sqlbak_windows"
}
],
"page": 1,
"page_size": 50,
"total": 89,
"has_more": true
}
}
These endpoints allow infrastructure teams to query backup configuration and integrate it directly into their internal monitoring systems.
Typical backup monitoring architecture
Most organizations implement backup monitoring using scheduled scripts or lightweight services that integrate with their existing observability stack.
A typical architecture looks like this:
These scripts periodically query the SqlBak API and transform backup data into metrics, alerts, or reports that fit into the company’s standard monitoring pipeline.
As a result, backup infrastructure becomes part of the same ecosystem as application and database monitoring — rather than a separate system that needs to be checked independently.
This approach improves visibility, reduces operational overhead, and ensures that backup-related issues are surfaced alongside other critical signals.
Production monitoring checks
Below are several practical monitoring checks commonly implemented using the SqlBak API.
1. Detect servers without recent backups
The most critical check ensures that every production server has a recent successful backup.
Monitoring logic:
-
retrieve recent backup runs
-
identify the latest successful backup for each server
-
trigger an alert if the backup is older than the defined threshold
Example alert:
ALERT: No successful backup in 24 hours Server: prod-sql-03 Last backup: 2026-03-10
2. Detect failed backup jobs
Backup failures must be detected immediately to minimize the risk of data loss.
Example alert:
CRITICAL: Backup job failed Server: prod-sql-01 Job: Nightly Backup Time: 02:03 UTC
3. Detect offline servers
If a server reports an offline status in SqlBak, backups may stop running.
Example alert:
WARNING: Server offline Server: prod-sql-02 Last seen: 48 hours ago
4. Detect servers without backup jobs
When new servers are deployed, backup jobs may not be configured.
Example alert:
WARNING: Server without backup job Server: analytics-sql-02
5. Detect disabled backup jobs
Backup jobs may remain disabled after maintenance or configuration changes.
Example alert:
WARNING: Backup job disabled Server: prod-sql-05 Job: Daily Backup
6. Verify retention policy compliance
Organizations often enforce retention policies across environments.
Example policy:
| Environment | Full backups | Incremental backups |
|---|---|---|
| Production | 30 days | 14 days |
| Staging | 14 days | 7 days |
| Development | 7 days | 3 days |
Monitoring scripts can detect deviations from these policies.
Example alert:
POLICY VIOLATION Job: prod-nightly-backup Destination ID: 987 Configured full retention: 14 days Expected full retention: 30 days
7. Detect backup size anomalies
Unexpected changes in backup size may indicate underlying issues.
Possible causes include:
-
incomplete backups
-
compression failures
-
unexpected database growth
-
data loss
Example alert:
ANOMALY DETECTED Server: prod-sql-04 Backup size: 2.1 GB Expected range: ~6 GB
Production monitoring scripts
Below are example scripts teams can use to build backup monitoring around the SqlBak API.
1) Backup monitoring
This script covers:
-
failed backup detection
-
offline server detection
-
servers that have not had a successful backup within the defined threshold
import os
import sys
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
import requests
API_BASE = os.getenv("SQLBAK_API_BASE", "https://sqlbak.com/public-api/v1")
API_TOKEN = os.getenv("SQLBAK_API_TOKEN")
REQUEST_TIMEOUT = 30
OFFLINE_STATUS_VALUE = "offline"
NO_BACKUP_THRESHOLD_HOURS = int(os.getenv("SQLBAK_NO_BACKUP_THRESHOLD_HOURS", "24"))
JOBRUNS_PAGE_SIZE = int(os.getenv("SQLBAK_JOBRUNS_PAGE_SIZE", "100"))
DEBUG = os.getenv("SQLBAK_DEBUG", "0") == "1"
if not API_TOKEN:
print("ERROR: SQLBAK_API_TOKEN is not set", file=sys.stderr)
sys.exit(2)
class SqlBakClient:
def __init__(self, base_url: str, token: str) -> None:
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"X-SqlBak-Token": token,
"Accept": "application/json",
})
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
url = f"{self.base_url}/{path.lstrip('/')}"
if DEBUG:
print(f"DEBUG GET {url} params={params}", file=sys.stderr)
resp = self.session.get(url, params=params, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.json()
def paged_list(self, path: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
items: List[Dict[str, Any]] = []
page = 1
base_params = dict(params or {})
while True:
query = dict(base_params)
query["page"] = page
query.setdefault("page_size", 100)
payload = self.get(path, params=query)
data = payload.get("data", {})
items.extend(data.get("list", []))
if not data.get("has_more"):
break
page += 1
return items
def list_servers(self) -> List[Dict[str, Any]]:
return self.paged_list("/servers")
def list_jobs(self) -> List[Dict[str, Any]]:
return self.paged_list("/jobs")
def list_job_runs(self, job_id: int, started_after_ms: Optional[int] = None) -> List[Dict[str, Any]]:
params: Dict[str, Any] = {
"job_id": job_id,
"page_size": JOBRUNS_PAGE_SIZE,
}
if started_after_ms is not None:
params["started_after"] = started_after_ms
return self.paged_list("/jobruns", params=params)
def parse_unix_ms(value: Optional[Any]) -> Optional[datetime]:
if value is None:
return None
try:
return datetime.fromtimestamp(int(value) / 1000, tz=timezone.utc)
except (ValueError, TypeError, OSError):
return None
def normalize_status(value: Optional[str]) -> str:
return (value or "").strip().lower()
def main() -> int:
client = SqlBakClient(API_BASE, API_TOKEN)
try:
servers = client.list_servers()
jobs = client.list_jobs()
now = datetime.now(timezone.utc)
backup_cutoff = now - timedelta(hours=NO_BACKUP_THRESHOLD_HOURS)
started_after_ms = int(backup_cutoff.timestamp() * 1000)
runs: List[Dict[str, Any]] = []
for job in jobs:
job_id = job.get("id")
if job_id is None:
continue
try:
runs.extend(client.list_job_runs(job_id=int(job_id), started_after_ms=started_after_ms))
except requests.HTTPError as e:
print(f"WARNING: failed to fetch jobruns for job_id={job_id}: {e}", file=sys.stderr)
except requests.HTTPError as e:
print(f"HTTP ERROR: {e}", file=sys.stderr)
return 2
except requests.RequestException as e:
print(f"REQUEST ERROR: {e}", file=sys.stderr)
return 2
server_by_id = {str(s.get("id")): s for s in servers}
job_by_id = {str(j.get("id")): j for j in jobs}
jobs_by_server: Dict[str, List[Dict[str, Any]]] = {}
for job in jobs:
sid = str(job.get("server_id"))
jobs_by_server.setdefault(sid, []).append(job)
latest_success_by_server: Dict[str, datetime] = {}
failed_runs: List[Dict[str, Any]] = []
for run in runs:
job_id = str(run.get("job_id"))
job = job_by_id.get(job_id, {})
# Prefer the server_id from the job itself, since run.server_id may differ
server_id = str(job.get("server_id") or run.get("server_id"))
status = normalize_status(run.get("status"))
finished_at = parse_unix_ms(run.get("finished_at")) or parse_unix_ms(run.get("started_at"))
if status not in {"succeeded", "failed"}:
continue
if status == "failed":
failed_runs.append({
"job_id": job_id,
"job_name": job.get("name", "unknown"),
"server_id": server_id,
"server_name": server_by_id.get(server_id, {}).get("name", "unknown"),
"finished_at": finished_at.isoformat() if finished_at else "unknown",
})
if status == "succeeded" and finished_at and server_id:
prev = latest_success_by_server.get(server_id)
if prev is None or finished_at > prev:
latest_success_by_server[server_id] = finished_at
offline_servers = []
for server in servers:
status = normalize_status(server.get("status"))
if status == OFFLINE_STATUS_VALUE:
offline_servers.append({
"server_id": str(server.get("id")),
"server_name": server.get("name", "unknown"),
"status": status,
})
no_recent_success = []
for server in servers:
server_id = str(server.get("id"))
if server_id not in jobs_by_server:
continue
last_success = latest_success_by_server.get(server_id)
# Fallback to jobs[].last_run if recent history was not returned in the filtered window
if last_success is None:
for job in jobs_by_server.get(server_id, []):
last_run = job.get("last_run") or {}
if normalize_status(last_run.get("status")) == "succeeded":
dt = parse_unix_ms(last_run.get("finished_at")) or parse_unix_ms(last_run.get("started_at"))
if dt and (last_success is None or dt > last_success):
last_success = dt
if last_success is None or last_success < backup_cutoff:
no_recent_success.append({
"server_id": server_id,
"server_name": server.get("name", "unknown"),
"last_success": last_success.isoformat() if last_success else "never",
})
print("=== FAILED BACKUPS ===")
if failed_runs:
for item in failed_runs:
print(f"- {item['server_name']} | {item['job_name']} | finished_at={item['finished_at']}")
else:
print("No failed backups detected")
print("\n=== OFFLINE SERVERS ===")
if offline_servers:
for item in offline_servers:
print(f"- {item['server_name']} | status={item['status']}")
else:
print("No offline servers detected")
print("\n=== NO SUCCESSFUL BACKUPS IN THRESHOLD ===")
if no_recent_success:
for item in no_recent_success:
print(f"- {item['server_name']} | last_success={item['last_success']}")
else:
print("All monitored servers have recent successful backups")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Example run
export SQLBAK_API_TOKEN="YOUR_TOKEN" export SQLBAK_API_BASE="https://sqlbak.com/public-api/v1" python sqlbak_backup_monitor.py
2) Verify retention policy compliance
This script checks retention compliance using job settings, where retention is actually defined in the SqlBak API.
import os
import sys
import requests
from typing import Any, Dict, List, Optional, Tuple
API_BASE = os.getenv("SQLBAK_API_BASE", "https://sqlbak.com/public-api/v1")
API_TOKEN = os.getenv("SQLBAK_API_TOKEN")
REQUEST_TIMEOUT = 30
EXPECTED_RETENTION_BY_JOB_NAME = {
"prod": {"full_days": 30, "inc_days": 14},
"production": {"full_days": 30, "inc_days": 14},
"staging": {"full_days": 14, "inc_days": 7},
"stage": {"full_days": 14, "inc_days": 7},
"dev": {"full_days": 7, "inc_days": 3},
"development": {"full_days": 7, "inc_days": 3},
}
if not API_TOKEN:
print("ERROR: SQLBAK_API_TOKEN is not set", file=sys.stderr)
sys.exit(2)
def normalize_name(value: Optional[str]) -> str:
return (value or "").strip().lower()
def months_days_to_days(months: Optional[int], days: Optional[int]) -> int:
return int(months or 0) * 30 + int(days or 0)
class SqlBakClient:
def __init__(self, base_url: str, token: str) -> None:
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"X-SqlBak-Token": token,
"Accept": "application/json",
})
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
resp = self.session.get(
f"{self.base_url}/{path.lstrip('/')}",
params=params,
timeout=REQUEST_TIMEOUT,
)
resp.raise_for_status()
return resp.json()
def paged_list(self, path: str) -> List[Dict[str, Any]]:
items: List[Dict[str, Any]] = []
page = 1
while True:
payload = self.get(path, params={"page": page, "page_size": 100})
data = payload.get("data", {})
items.extend(data.get("list", []))
if not data.get("has_more"):
break
page += 1
return items
def list_jobs(self) -> List[Dict[str, Any]]:
return self.paged_list("/jobs")
def get_job_settings(self, job_id: int) -> Dict[str, Any]:
payload = self.get(f"/jobs/{job_id}/settings")
return payload.get("data", {})
def expected_policy_for_job(name: str) -> Optional[Dict[str, int]]:
n = normalize_name(name)
for key, policy in EXPECTED_RETENTION_BY_JOB_NAME.items():
if key in n:
return policy
return None
def main() -> int:
client = SqlBakClient(API_BASE, API_TOKEN)
try:
jobs = client.list_jobs()
except requests.RequestException as e:
print(f"ERROR: {e}", file=sys.stderr)
return 2
violations: List[Dict[str, Any]] = []
for job in jobs:
job_id = job.get("id")
job_name = job.get("name", "unknown")
expected = expected_policy_for_job(job_name)
if expected is None or job_id is None:
continue
try:
settings = client.get_job_settings(int(job_id))
except requests.RequestException as e:
print(f"WARNING: failed to fetch settings for job {job_name}: {e}", file=sys.stderr)
continue
destinations = settings.get("destinations", [])
for dst in destinations:
full_days = months_days_to_days(
dst.get("full_backups_keep_months"),
dst.get("full_backups_keep_days"),
)
inc_days = months_days_to_days(
dst.get("inc_backups_keep_months"),
dst.get("inc_backups_keep_days"),
)
if full_days < expected["full_days"] or inc_days < expected["inc_days"]:
violations.append({
"job_name": job_name,
"destination_id": dst.get("id"),
"actual_full_days": full_days,
"expected_full_days": expected["full_days"],
"actual_inc_days": inc_days,
"expected_inc_days": expected["inc_days"],
})
if violations:
print("=== RETENTION POLICY VIOLATIONS ===")
for v in violations:
print(
f"- {v['job_name']} | destination_id={v['destination_id']} | "
f"full={v['actual_full_days']}d (expected {v['expected_full_days']}d) | "
f"inc={v['actual_inc_days']}d (expected {v['expected_inc_days']}d)"
)
return 1
print("Retention policies are compliant")
return 0
if __name__ == "__main__":
raise SystemExit(main())
3) Check backup size anomalies
This script uses /jobruns and the API fields size and archive_size to detect unusual changes in backup size.
Note: if a single job produces multiple backup types with significantly different sizes, the baseline should be calculated separately per backup type. If backup type is not available in the job run payload, this example is best suited for jobs that primarily produce one backup type.
import os
import sys
import statistics
import requests
from collections import defaultdict
from typing import Any, Dict, List, Optional
API_BASE = os.getenv("SQLBAK_API_BASE", "https://sqlbak.com/public-api/v1")
API_TOKEN = os.getenv("SQLBAK_API_TOKEN")
REQUEST_TIMEOUT = 30
LOW_FACTOR = float(os.getenv("SQLBAK_SIZE_LOW_FACTOR", "0.5"))
HIGH_FACTOR = float(os.getenv("SQLBAK_SIZE_HIGH_FACTOR", "2.0"))
MIN_BASELINE_SAMPLES = int(os.getenv("SQLBAK_MIN_BASELINE_SAMPLES", "5"))
if not API_TOKEN:
print("ERROR: SQLBAK_API_TOKEN is not set", file=sys.stderr)
sys.exit(2)
class SqlBakClient:
def __init__(self, base_url: str, token: str) -> None:
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"X-SqlBak-Token": token,
"Accept": "application/json",
})
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
resp = self.session.get(
f"{self.base_url}/{path.lstrip('/')}",
params=params,
timeout=REQUEST_TIMEOUT,
)
resp.raise_for_status()
return resp.json()
def paged_list(self, path: str) -> List[Dict[str, Any]]:
items: List[Dict[str, Any]] = []
page = 1
while True:
payload = self.get(path, params={"page": page, "page_size": 100})
data = payload.get("data", {})
items.extend(data.get("list", []))
if not data.get("has_more"):
break
page += 1
return items
def list_job_runs(self) -> List[Dict[str, Any]]:
return self.paged_list("/jobruns")
def list_jobs(self) -> List[Dict[str, Any]]:
return self.paged_list("/jobs")
def normalize_status(v: Optional[str]) -> str:
return (v or "").strip().lower()
def extract_size_bytes(run: Dict[str, Any]) -> Optional[float]:
value = run.get("archive_size")
if isinstance(value, (int, float)) and value > 0:
return float(value)
value = run.get("size")
if isinstance(value, (int, float)) and value > 0:
return float(value)
return None
def main() -> int:
client = SqlBakClient(API_BASE, API_TOKEN)
try:
runs = client.list_job_runs()
jobs = client.list_jobs()
except requests.RequestException as e:
print(f"ERROR: {e}", file=sys.stderr)
return 2
job_names = {str(j.get("id")): j.get("name", "unknown") for j in jobs}
sizes_by_job: Dict[str, List[float]] = defaultdict(list)
for run in runs:
if normalize_status(run.get("status")) != "succeeded":
continue
job_id = str(run.get("job_id"))
size = extract_size_bytes(run)
if size is None:
continue
sizes_by_job[job_id].append(size)
anomalies = []
for job_id, samples in sizes_by_job.items():
if len(samples) < MIN_BASELINE_SAMPLES:
continue
current = samples[0]
baseline = statistics.median(samples[1:]) if len(samples) > 1 else statistics.median(samples)
if baseline <= 0:
continue
if current < baseline * LOW_FACTOR or current > baseline * HIGH_FACTOR:
anomalies.append({
"job_id": job_id,
"job_name": job_names.get(job_id, "unknown"),
"current": current,
"baseline": baseline,
"ratio": round(current / baseline, 2),
})
if anomalies:
print("=== BACKUP SIZE ANOMALIES ===")
for a in anomalies:
print(
f"- {a['job_name']} | current={int(a['current'])} | "
f"baseline={int(a['baseline'])} | ratio={a['ratio']}"
)
return 1
print("No backup size anomalies detected")
return 0
if __name__ == "__main__":
raise SystemExit(main())
4) PowerShell: detect servers without backup jobs
This check detects servers that do not have any backup jobs assigned.
param(
[string]$ApiBase = "https://sqlbak.com/public-api/v1",
[string]$ApiToken = $env:SQLBAK_API_TOKEN
)
if (-not $ApiToken) {
Write-Error "SQLBAK_API_TOKEN is not set"
exit 2
}
$Headers = @{
"X-SqlBak-Token" = $ApiToken
"Accept" = "application/json"
}
try {
$serversResponse = Invoke-RestMethod -Method Get -Uri "$ApiBase/servers?page=1&page_size=100" -Headers $Headers -TimeoutSec 30
$jobsResponse = Invoke-RestMethod -Method Get -Uri "$ApiBase/jobs?page=1&page_size=100" -Headers $Headers -TimeoutSec 30
}
catch {
Write-Error $_
exit 2
}
$servers = $serversResponse.data.list
$jobs = $jobsResponse.data.list
$serverIdsWithJobs = @{}
foreach ($job in $jobs) {
if ($null -ne $job.server_id) {
$serverIdsWithJobs["$($job.server_id)"] = $true
}
}
$serversWithoutJobs = @()
foreach ($server in $servers) {
$serverId = "$($server.id)"
if (-not $serverIdsWithJobs.ContainsKey($serverId)) {
$serversWithoutJobs += $server
}
}
if ($serversWithoutJobs.Count -gt 0) {
Write-Host "=== SERVERS WITHOUT BACKUP JOBS ==="
foreach ($server in $serversWithoutJobs) {
Write-Host "- $($server.name) (id=$($server.id))"
}
exit 1
}
else {
Write-Host "All servers have at least one backup job"
exit 0
}
5) PowerShell: detect disabled backup jobs
This check uses job settings and schedule_settings.is_enabled.
param(
[string]$ApiBase = "https://sqlbak.com/public-api/v1",
[string]$ApiToken = $env:SQLBAK_API_TOKEN
)
if (-not $ApiToken) {
Write-Error "SQLBAK_API_TOKEN is not set"
exit 2
}
$Headers = @{
"X-SqlBak-Token" = $ApiToken
"Accept" = "application/json"
}
try {
$jobsResponse = Invoke-RestMethod -Method Get -Uri "$ApiBase/jobs?page=1&page_size=100" -Headers $Headers -TimeoutSec 30
}
catch {
Write-Error $_
exit 2
}
$jobs = $jobsResponse.data.list
$disabledJobs = @()
foreach ($job in $jobs) {
try {
$settings = Invoke-RestMethod -Method Get -Uri "$ApiBase/jobs/$($job.id)/settings" -Headers $Headers -TimeoutSec 30
if ($settings.data.schedule_settings.is_enabled -eq $false) {
$disabledJobs += $job
}
}
catch {
Write-Warning "Failed to fetch settings for job $($job.name): $_"
}
}
if ($disabledJobs.Count -gt 0) {
Write-Host "=== DISABLED BACKUP JOBS ==="
foreach ($job in $disabledJobs) {
Write-Host "- $($job.name) (id=$($job.id), server_id=$($job.server_id))"
}
exit 1
}
else {
Write-Host "No disabled backup jobs detected"
exit 0
}
Integrating backup monitoring with infrastructure monitoring systems
SqlBak monitoring can be integrated with observability and automation systems to provide visibility into backup status. Metrics such as backup success rate, size trends, and server state can be visualized in tools like Grafana or exported to systems like Prometheus for alerting and analysis.
For a step-by-step guide on setting this up, see our article on building automated backup monitoring with Grafana and Prometheus using the SqlBak API.
Alerts can be forwarded to external systems, including Slack, Microsoft Teams, PagerDuty, or OpsGenie, typically via webhooks or automation tools.
Automation platforms such as Make (formerly Integromat) can be used to route alerts, send notifications, create incidents, or store backup data without custom development.
The SqlBak API also provides access to detailed job run logs via /jobruns/{jobrun_id}/logs, which can be used to include error details in alerts and support troubleshooting.
Final thoughts
Backups are only reliable if they are continuously verified.
The SqlBak API allows infrastructure teams to build automated monitoring that continuously validates backup configuration and reliability.
Instead of relying on manual checks, organizations can automatically detect:
-
failed backups
-
offline servers
-
missing backup jobs
-
retention policy violations
-
abnormal backup sizes
For teams managing large database environments, integrating backup infrastructure into observability systems significantly improves reliability and reduces operational risk.
If you’re already using the SqlBak API and have built automation around it, we’d love to hear about your use case.
