Failure Context and Operational Intent
Collections managers deploy synchronous polling scripts to mirror CMS records into downstream data warehouses. The goal is near-real-time synchronization of object metadata, rights statements, and digital asset manifests. Production environments frequently fail under high-volume loads. HTTP 429 and 503 responses cascade across worker threads. Hardcoded polling intervals and in-memory pagination tracking cause state drift. Drift produces duplicate records, orphaned media references, and broken provenance chains. Deterministic polling requires strict API constraint adherence and boundary validation. Baseline architecture patterns are documented in Automated Record Ingestion & Sync Workflows.
Root Cause Analysis
Museum API instability originates from three intersecting failure vectors. First, repeated requests.get() calls exhaust ephemeral ports and starve connection pools. TCP sockets open and close without reuse. Second, sliding-window rate limits communicated via X-RateLimit-Remaining headers are routinely ignored. Fixed time.sleep() intervals either throttle throughput or trigger immediate backoff. Third, loading complete JSON arrays into Python lists scales memory linearly. Constrained CI runners trigger OOM kills during bulk metadata extraction. Schema drift compounds these issues. Missing ObjectID fields, malformed ISO 8601 timestamps, and unescaped MARC fragments break downstream ETL pipelines. Validation must occur at the ingestion boundary.
Production Polling Architecture
Resolving instability requires restructuring the request lifecycle. The architecture centers on persistent session pooling, adaptive exponential backoff, and streaming payload processing. State tracking shifts from volatile memory to durable filesystem checkpoints. Schema compliance is enforced before persistence. This pattern replaces brittle loops with a deterministic engine. Concurrent execution strategies are detailed in Building Async Ingestion Pipelines.
flowchart TD
L["Load checkpoint<br/>cursor"] --> Fetch["GET page<br/>session + retry"]
Fetch --> Str["Stream JSON lines"]
Str --> V{"Validate record"}
V -->|fail| Log["Log + skip"]
V -->|ok| Sync["Sync record<br/>advance cursor"]
Sync --> Save["Save checkpoint"]
Log --> Save
Save --> More{"More pages?"}
More -->|yes| Fetch
More -->|no| Done["Cycle complete"]Step 1: Session Pooling and Adaptive Backoff
Initialize a persistent requests.Session to reuse TCP connections across requests. Configure urllib3 retry logic at the transport adapter level. This handles transient network drops without application-level intervention. Respect Retry-After headers for precise rate-limit compliance. Consult the official Requests Documentation for advanced adapter tuning.
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Iterator
import logging
logger = logging.getLogger(__name__)
def configure_session(max_retries: int = 3) -> requests.Session:
session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=1.0,
status_forcelist=[429, 502, 503, 504],
allowed_methods=["GET"],
respect_retry_after_header=True
)
adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=10, pool_maxsize=20)
session.mount("https://", adapter)
session.headers.update({"Accept": "application/json", "User-Agent": "MuseumSync/1.0"})
return sessionStep 2: Streaming Payloads and Memory Boundaries
Avoid buffering full response bodies in memory. Use response.iter_lines() or response.iter_content() to process records sequentially. This maintains a constant memory footprint regardless of collection size. Implement a generator to yield validated records one at a time. The reader below assumes the endpoint streams newline-delimited JSON (JSON Lines), one object per line — the common shape for bulk harvesting endpoints; for an endpoint that returns a single large JSON array, swap in an incremental parser such as ijson. The response is consumed inside a with block so the connection is always returned to the pool, even if iteration is abandoned mid-stream.
import json
def stream_records(session: requests.Session, endpoint: str, params: dict[str, str]) -> Iterator[dict]:
with session.get(endpoint, params=params, stream=True) as response:
response.raise_for_status()
for line in response.iter_lines(decode_unicode=True):
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
logger.warning("Malformed JSON payload skipped: %s", e)Step 3: Schema Validation and Standards Compliance
Enforce LIDO and IIIF Presentation 3.0 compliance at the ingestion boundary. Use Pydantic models to validate required fields, data types, and controlled vocabularies. Reject non-conforming records before they enter the persistence layer. Map CMS fields to LIDO XML structures programmatically. Reference the IIIF Presentation API 3.0 and LIDO Schema Specification for structural requirements.
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Optional
class LIDORecord(BaseModel):
object_id: str = Field(alias="ObjectID")
title: str
creation_date_iso: Optional[str] = None
iiif_manifest_url: Optional[str] = None
@field_validator("creation_date_iso")
@classmethod
def validate_iso8601(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return v
datetime.fromisoformat(v.replace("Z", "+00:00"))
return v
def validate_record(raw: dict) -> LIDORecord:
return LIDORecord.model_validate(raw)Step 4: Checkpointing and State Drift Prevention
Track pagination state and last-synced timestamps in durable storage. Write checkpoints to a local JSON manifest after every successful batch. Resume operations from the last verified cursor on failure. This eliminates duplicate ingestion and prevents provenance chain breaks.
from pathlib import Path
CHECKPOINT_FILE = Path("sync_state.json")
def load_checkpoint() -> dict[str, str]:
if CHECKPOINT_FILE.exists():
return json.loads(CHECKPOINT_FILE.read_text())
return {"cursor": "0", "last_sync": datetime.now().isoformat()}
def save_checkpoint(cursor: str) -> None:
state = {"cursor": cursor, "last_sync": datetime.now().isoformat()}
CHECKPOINT_FILE.write_text(json.dumps(state, indent=2))Data Flow and Execution Pipeline
The complete pipeline orchestrates session management, streaming, validation, and checkpointing in a single deterministic loop. Each record passes through the validation boundary before persistence. Failed validations are logged and skipped so a single malformed record never halts the cycle; wiring them to a durable quarantine queue is a straightforward extension of the except branch. Successful records trigger IIIF manifest generation and LIDO serialization. The loop terminates gracefully upon cursor exhaustion or explicit cancellation.
def run_sync(session: requests.Session, endpoint: str) -> None:
checkpoint = load_checkpoint()
cursor = checkpoint.get("cursor", "0")
while True:
params = {"cursor": cursor, "limit": "500"}
page_cursor = cursor
received = False
for record in stream_records(session, endpoint, params):
received = True
try:
validated = validate_record(record)
logger.info("Synced record: %s", validated.object_id)
cursor = record.get("next_cursor", cursor)
except Exception:
logger.error("Validation failed for record: %s", record.get("ObjectID"))
continue
save_checkpoint(cursor)
# Stop when a page returns nothing or the cursor stops advancing.
if not received or cursor == page_cursor:
break
logger.info("Sync cycle complete. Next cursor: %s", cursor)Operational Considerations
Monitor connection pool utilization and retry queue depth via Prometheus metrics. Implement circuit breakers to halt polling when upstream APIs degrade. Rotate API keys and refresh OAuth2 tokens before expiration. Align polling frequency with CMS batch processing windows to reduce contention. For large-scale deployments, transition to asynchronous HTTP clients. Review Building Async Ingestion Pipelines for httpx migration paths.
Conclusion
Reliable synchronous polling requires three invariants: reused TCP connections via a persistent Session, durable cursor checkpoints that survive process restarts, and schema validation at the ingestion boundary rather than after database commit. Without all three, high-volume collection syncs degrade into state-drifted, memory-exhausting loops that require manual reconciliation to recover.