SqlBak Blog

Building Automated Backup Monitoring with the SqlBak API

Database backups are often configured once and then quietly forgotten — until something goes wrong.

In modern environments, teams may operate dozens or hundreds of database servers supporting ERP, CRM, and other critical systems. At the same time, they are managing many other priorities — sales, inventory, procurement, HR, production, and more — each with its own dashboards and alerts.

In this landscape, backups don’t become less important — but they do become less visible.

The real challenge is not backups themselves, but the environment they exist in. Backup health is often monitored in a separate dashboard, making it easy to overlook alongside other operational tools.

As infrastructure evolves — with new servers, changing configurations, and ongoing updates — gaps in backup coverage can appear. Servers may lack backups, configurations may drift, and failures may go unnoticed.

What’s needed is not another dashboard, but a way to integrate backup visibility into existing monitoring workflows, so that critical backup signals appear alongside other business-critical metrics.

To ensure reliability, teams need automated monitoring that continuously verifies:

  • backups are running successfully

  • servers are online

  • retention policies are correctly applied

  • new servers are protected

The SqlBak API enables IT teams to integrate backup infrastructure into their existing monitoring systems and automatically audit backups across the entire environment.

In this article, we’ll explore how organizations use the SqlBak API to build automated backup monitoring and compliance checks. You can find the full SqlBak.com API documentation here: https://sqlbak.docs.apiary.io/#


A real-world use case

One of our customers integrated the SqlBak API into their internal monitoring system to bring backup visibility into their broader operational workflow.

Instead of relying solely on the SqlBak dashboard, they periodically query the API and analyze backup configuration programmatically.

Their automation generates internal reports that include:

  • total number of servers

  • number of configured backup jobs

  • backup success rate

  • servers that are up to date with backups

More importantly, the system proactively detects operational issues such as:

  • servers without backups

  • failed backup jobs

  • offline backup agents

  • retention policy violations

  • abnormal backup sizes

By embedding backup monitoring into their existing infrastructure, they ensure that backup health is continuously validated — without requiring manual checks or additional dashboards.

This approach allows them to maintain reliability and control even as their infrastructure scales.


SqlBak API basics

The SqlBak API exposes key information about backup infrastructure, including:

  • servers

  • backup jobs

  • job run history

  • job settings

  • backup destinations

  • DBMS connections

Authentication is performed using an API token. You can generate an API token in your SqlBak account by logging in and navigating to the Developers page:
https://sqlbak.com/account/developers

Example header:

X-SqlBak-Token: YOUR_API_TOKEN

Example request:

GET https://sqlbak.com/public-api/v1/servers

Example response:

{
  "data": {
    "list": [
      {
        "entity": "server",
        "id": 76,
        "name": "My Server1",
        "status": "online",
        "app_type": "sqlbak_windows"
      }
    ],
    "page": 1,
    "page_size": 50,
    "total": 89,
    "has_more": true
  }
}

These endpoints allow infrastructure teams to query backup configuration and integrate it directly into their internal monitoring systems.


Typical backup monitoring architecture

Most organizations implement backup monitoring using scheduled scripts or lightweight services that integrate with their existing observability stack.

A typical architecture looks like this:

Backup monitoring architecture
Backup monitoring architecture

These scripts periodically query the SqlBak API and transform backup data into metrics, alerts, or reports that fit into the company’s standard monitoring pipeline.

As a result, backup infrastructure becomes part of the same ecosystem as application and database monitoring — rather than a separate system that needs to be checked independently.

This approach improves visibility, reduces operational overhead, and ensures that backup-related issues are surfaced alongside other critical signals.


Production monitoring checks and scripts example

In real-world environments, these checks are typically combined into a single monitoring script that runs on a schedule and reports issues automatically.

Below is an example of what such a script may return during a monitoring run, along with simplified code snippets that implement each check.

All examples use the same basic setup and helper functions.


1. Common setup

import os
import sys
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

import requests

API_BASE = "https://sqlbak.com/public-api/v1"
API_TOKEN = os.getenv("SQLBAK_API_TOKEN")

REQUEST_TIMEOUT = 30
OFFLINE_STATUS_VALUE = "offline"

# defines how old the latest successful backup can be before an alert is triggered
NO_BACKUP_THRESHOLD_HOURS = 24

# controls how many job runs are requested per API page
JOBRUNS_PAGE_SIZE = 100

DEBUG = os.getenv("SQLBAK_DEBUG", "0") == "1"

if not API_TOKEN:
    print("ERROR: SQLBAK_API_TOKEN is not set", file=sys.stderr)
    sys.exit(2)

2. API helper

class SqlBakClient:
    def __init__(self, base_url: str, token: str) -> None:
        self.base_url = base_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update({
            "X-SqlBak-Token": token,
            "Accept": "application/json",
        })

    def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
        url = f"{self.base_url}/{path.lstrip('/')}"
        if DEBUG:
            print(f"DEBUG GET {url} params={params}", file=sys.stderr)

        resp = self.session.get(url, params=params, timeout=REQUEST_TIMEOUT)
        resp.raise_for_status()
        return resp.json()

    def paged_list(self, path: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
        items: List[Dict[str, Any]] = []
        page = 1
        base_params = dict(params or {})

        while True:
            query = dict(base_params)
            query["page"] = page
            query.setdefault("page_size", 100)

            payload = self.get(path, params=query)
            data = payload.get("data", {})
            items.extend(data.get("list", []))

            if not data.get("has_more"):
                break

            page += 1

        return items

    def list_servers(self) -> List[Dict[str, Any]]:
        return self.paged_list("/servers")

    def list_jobs(self) -> List[Dict[str, Any]]:
        return self.paged_list("/jobs")

    def list_job_runs(
        self,
        job_id: int,
        started_after_ms: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        params: Dict[str, Any] = {
            "job_id": job_id,
            "page_size": JOBRUNS_PAGE_SIZE,
        }

        if started_after_ms is not None:
            params["started_after"] = started_after_ms

        return self.paged_list("/jobruns", params=params)

3. Helper functions

def parse_unix_ms(value: Optional[Any]) -> Optional[datetime]:
    if value is None:
        return None
    try:
        return datetime.fromtimestamp(int(value) / 1000, tz=timezone.utc)
    except (ValueError, TypeError, OSError):
        return None


def normalize_status(value: Optional[str]) -> str:
    return (value or "").strip().lower()

4. Data loading

def main() -> int:
    client = SqlBakClient(API_BASE, API_TOKEN)

    try:
        servers = client.list_servers()
        jobs = client.list_jobs()

        now = datetime.now(timezone.utc)
        backup_cutoff = now - timedelta(hours=NO_BACKUP_THRESHOLD_HOURS)
        started_after_ms = int(backup_cutoff.timestamp() * 1000)

        runs: List[Dict[str, Any]] = []

        for job in jobs:
            job_id = job.get("id")
            if job_id is None:
                continue

            try:
                runs.extend(
                    client.list_job_runs(
                        job_id=int(job_id),
                        started_after_ms=started_after_ms
                    )
                )
            except requests.HTTPError as e:
                print(
                    f"WARNING: failed to fetch jobruns for job_id={job_id}: {e}",
                    file=sys.stderr
                )

    except requests.HTTPError as e:
        print(f"HTTP ERROR: {e}", file=sys.stderr)
        return 2

    except requests.RequestException as e:
        print(f"REQUEST ERROR: {e}", file=sys.stderr)
        return 2

    server_by_id = {str(s.get("id")): s for s in servers}
    job_by_id = {str(j.get("id")): j for j in jobs}

    jobs_by_server: Dict[str, List[Dict[str, Any]]] = {}

    for job in jobs:
        sid = str(job.get("server_id"))
        jobs_by_server.setdefault(sid, []).append(job)

5. Detect servers without recent backups

The most critical check ensures that every production server has a recent successful backup.

Monitoring logic:

  • retrieve recent backup runs

  • identify the latest successful backup for each server

  • trigger an alert if the backup is older than the defined threshold

Example alert:

ALERT: No successful backup in 24 hours
Server: prod-sql-03
Last backup: 2026-03-10

Example script (Python):

latest_success_by_server: Dict[str, datetime] = {}

for run in runs:
    job_id = str(run.get("job_id"))
    job = job_by_id.get(job_id, {})

    # Prefer the server_id from the job itself, since run.server_id may differ
    server_id = str(job.get("server_id") or run.get("server_id"))
    status = normalize_status(run.get("status"))
    finished_at = parse_unix_ms(run.get("finished_at")) or parse_unix_ms(run.get("started_at"))

    if status not in {"succeeded", "failed"}:
        continue

    if status == "succeeded" and finished_at and server_id:
        prev = latest_success_by_server.get(server_id)
        if prev is None or finished_at > prev:
            latest_success_by_server[server_id] = finished_at


no_recent_success = []

for server in servers:
    server_id = str(server.get("id"))
    if server_id not in jobs_by_server:
        continue

    last_success = latest_success_by_server.get(server_id)

    # Fallback to jobs[].last_run if recent history was not returned in the filtered window
    if last_success is None:
        for job in jobs_by_server.get(server_id, []):
            last_run = job.get("last_run") or {}
            if normalize_status(last_run.get("status")) == "succeeded":
                dt = parse_unix_ms(last_run.get("finished_at")) or parse_unix_ms(last_run.get("started_at"))
                if dt and (last_success is None or dt > last_success):
                    last_success = dt

    if last_success is None or last_success < backup_cutoff:
        no_recent_success.append({
            "server_id": server_id,
            "server_name": server.get("name", "unknown"),
            "last_success": last_success.isoformat() if last_success else "never",
        })

6. Detect failed backup jobs

Backup failures must be detected immediately to minimize the risk of data loss.

Example alert:

CRITICAL: Backup job failed
Server: prod-sql-01
Job: Nightly Backup
Time: 02:03 UTC

Example script (Python):

failed_runs: List[Dict[str, Any]] = []

for run in runs:
    job_id = str(run.get("job_id"))
    job = job_by_id.get(job_id, {})

    server_id = str(job.get("server_id") or run.get("server_id"))
    status = normalize_status(run.get("status"))
    finished_at = parse_unix_ms(run.get("finished_at")) or parse_unix_ms(run.get("started_at"))

    if status not in {"succeeded", "failed"}:
        continue

    if status == "failed":
        failed_runs.append({
            "job_id": job_id,
            "job_name": job.get("name", "unknown"),
            "server_id": server_id,
            "server_name": server_by_id.get(server_id, {}).get("name", "unknown"),
            "finished_at": finished_at.isoformat() if finished_at else "unknown",
        })

7. Detect offline servers

If a server reports an offline status in SqlBak, it means the agent is not responding. This may happen because the server itself is offline or unreachable, or because the backup agent is not running or cannot connect.

Example alert:

WARNING: Server offline
Server: prod-sql-02
Last seen: 48 hours ago

Example script (Python):

offline_servers = []

for server in servers:
    status = normalize_status(server.get("status"))
    if status == OFFLINE_STATUS_VALUE:
        offline_servers.append({
            "server_id": str(server.get("id")),
            "server_name": server.get("name", "unknown"),
            "status": status,
        })

8. Detect servers without backup jobs

When new servers are deployed, backup jobs may not be configured.

Example alert:

WARNING: Server without backup job
Server: analytics-sql-02

Example script (Python):

servers_without_jobs = []

for server in servers:
    server_id = str(server.get("id"))

    if server_id not in jobs_by_server:
        servers_without_jobs.append({
            "server_id": server_id,
            "server_name": server.get("name", "unknown"),
        })

9. Verify retention policy compliance

Organizations often enforce retention policies across environments.

Example policy:

Environment Full backups Incremental backups
Production 30 days 14 days
Staging 14 days 7 days
Development 7 days 3 days

Monitoring scripts can detect deviations from these policies.

Example alert:

POLICY VIOLATION
Job: prod-nightly-backup
Destination ID: 987
Configured full retention: 14 days
Expected full retention: 30 days

Example script (Python):

EXPECTED_RETENTION_BY_JOB_NAME = {
    "prod": {"full_days": 30, "inc_days": 14},
    "production": {"full_days": 30, "inc_days": 14},
    "staging": {"full_days": 14, "inc_days": 7},
    "stage": {"full_days": 14, "inc_days": 7},
    "dev": {"full_days": 7, "inc_days": 3},
    "development": {"full_days": 7, "inc_days": 3},
}


def expected_policy_for_job(name: str) -> Optional[Dict[str, int]]:
    n = normalize_status(name)
    for key, policy in EXPECTED_RETENTION_BY_JOB_NAME.items():
        if key in n:
            return policy
    return None


def months_days_to_days(months: Optional[int], days: Optional[int]) -> int:
    return int(months or 0) * 30 + int(days or 0)


retention_violations = []

for job in jobs:
    job_id = job.get("id")
    job_name = job.get("name", "unknown")
    expected = expected_policy_for_job(job_name)

    if expected is None or job_id is None:
        continue

    try:
        settings = client.get(f"/jobs/{job_id}/settings").get("data", {})
    except requests.HTTPError as e:
        print(f"WARNING: failed to fetch settings for job {job_name}: {e}", file=sys.stderr)
        continue

    destinations = settings.get("destinations", [])
    for dst in destinations:
        full_days = months_days_to_days(
            dst.get("full_backups_keep_months"),
            dst.get("full_backups_keep_days"),
        )
        inc_days = months_days_to_days(
            dst.get("inc_backups_keep_months"),
            dst.get("inc_backups_keep_days"),
        )

        if full_days < expected["full_days"] or inc_days < expected["inc_days"]:
            retention_violations.append({
                "job_name": job_name,
                "destination_id": dst.get("id"),
                "actual_full_days": full_days,
                "expected_full_days": expected["full_days"],
                "actual_inc_days": inc_days,
                "expected_inc_days": expected["inc_days"],
            })

10. Detect backup size anomalies

Unexpected changes in backup size may indicate underlying issues.

Possible causes include:

  • incomplete backups

  • compression failures

  • unexpected database growth

  • data loss

Example alert:

ANOMALY DETECTED
Server: prod-sql-04
Backup size: 2.1 GB
Expected range: ~6 GB

Example script (Python):

import statistics
from collections import defaultdict
# Defines acceptable backup size range compared to baseline
LOW_FACTOR = 0.5  
HIGH_FACTOR = 2.0 
# Minimum number of samples required to calculate baseline
MIN_BASELINE_SAMPLES = 5  


def extract_size_bytes(run: Dict[str, Any]) -> Optional[float]:
    value = run.get("archive_size")
    if isinstance(value, (int, float)) and value > 0:
        return float(value)

    value = run.get("size")
    if isinstance(value, (int, float)) and value > 0:
        return float(value)

    return None


sizes_by_job = defaultdict(list)

for run in runs:
    if normalize_status(run.get("status")) != "succeeded":
        continue

    job_id = str(run.get("job_id"))
    size = extract_size_bytes(run)
    if size is None:
        continue

    sizes_by_job[job_id].append(size)

size_anomalies = []

for job_id, samples in sizes_by_job.items():
    if len(samples) < MIN_BASELINE_SAMPLES:
        continue

    current = samples[0]
    baseline = statistics.median(samples[1:]) if len(samples) > 1 else statistics.median(samples)

    if baseline <= 0:
        continue

    if current < baseline * LOW_FACTOR or current > baseline * HIGH_FACTOR:
        size_anomalies.append({
            "job_id": job_id,
            "job_name": job_by_id.get(job_id, {}).get("name", "unknown"),
            "current": current,
            "baseline": baseline,
            "ratio": round(current / baseline, 2),
        })

Integrating backup monitoring with infrastructure monitoring systems

SqlBak monitoring can be integrated with observability and automation systems to provide visibility into backup status. Metrics such as backup success rate, size trends, and server state can be visualized in tools like Grafana or exported to systems like Prometheus for alerting and analysis.

For a step-by-step guide on setting this up, see our article on building automated backup monitoring with Grafana and Prometheus using the SqlBak API.

Alerts can be forwarded to external systems, including Slack, Microsoft Teams, PagerDuty, or OpsGenie, typically via webhooks or automation tools.

Automation platforms such as Make (formerly Integromat) can be used to route alerts, send notifications, create incidents, or store backup data without custom development.

The SqlBak API also provides access to detailed job run logs via /jobruns/{jobrun_id}/logs, which can be used to include error details in alerts and support troubleshooting.

Final thoughts

Backups are only reliable if they are continuously verified.

The SqlBak API allows infrastructure teams to build automated monitoring that continuously validates backup configuration and reliability.

Instead of relying on manual checks, organizations can automatically detect:

  • failed backups

  • offline servers

  • missing backup jobs

  • retention policy violations

  • abnormal backup sizes

For teams managing large database environments, integrating backup infrastructure into observability systems significantly improves reliability and reduces operational risk.

If you’re already using the SqlBak API and have built automation around it, we’d love to hear about your use case.

Leave a Comment