Workflow Context
Museum digital asset pipelines ingest heterogeneous manifests from legacy TMS exports, donor spreadsheets, and digitization workstations. Without strict schema enforcement, malformed dates, missing rights flags, or invalid asset URIs corrupt downstream CMS records. Pydantic provides runtime type coercion, constraint validation, and structured error reporting. It acts as the primary gatekeeper in Automated Record Ingestion & Sync Workflows. Validation must occur before any database transaction or DAM upload. Only compliant records advance to the staging layer. This eliminates silent data degradation and enforces institutional metadata standards across batch operations.
flowchart LR
R["Raw manifest records"] --> M["DigitalAssetRecord<br/>model_validate (thread pool)"]
M --> V{"Valid?"}
V -->|yes| Y["Yield to sync layer"]
V -->|no| E["format_validation_error<br/>→ quarantine table"]Core Model Architecture
Production validation requires a deterministic parsing sequence. Define a Pydantic v2 model that mirrors your institutional metadata schema. Use strict typing, explicit field aliases for legacy column names, and custom validators for domain-specific logic. Align fields with LIDO XML element definitions to ensure downstream interoperability. Configure the model to reject unknown fields and enforce type strictness.
from datetime import date, datetime, timezone
from typing import Optional, List
from pydantic import BaseModel, Field, field_validator, ConfigDict, ValidationError
import logging
logger = logging.getLogger("museum_pipeline.validation")
class DigitalAssetRecord(BaseModel):
model_config = ConfigDict(strict=True, extra="forbid")
accession_number: str = Field(..., pattern=r"^[A-Z]{2,4}\.\d{4}\.\d{1,4}$")
title: str = Field(..., min_length=2, max_length=255)
creator: Optional[str] = None
creation_date: Optional[date] = None
rights_statement: str = Field(..., alias="rights_code")
asset_uris: List[str] = Field(default_factory=list)
checksum: Optional[str] = Field(None, pattern=r"^[a-f0-9]{64}$")Domain-Specific Validation Logic
Legacy museum data frequently contains inconsistent date formats and non-standard URI schemes. Implement field_validator decorators to normalize inputs before type coercion. Parse ISO 8601 variants and legacy year-only strings. Validate asset paths against institutional DAM routing rules. Ensure rights statements match controlled vocabularies like RightsStatements.org.
# These validators are defined inside the DigitalAssetRecord class above.
class DigitalAssetRecord: # continuation — validators added to the class
@field_validator("creation_date", mode="before")
@classmethod
def parse_iso_or_legacy_date(cls, v: object) -> object:
if isinstance(v, str):
for fmt in ("%Y-%m-%d", "%Y-%m", "%Y"):
try:
return datetime.strptime(v, fmt).date()
except ValueError:
continue
raise ValueError(f"Unrecognized date format: {v}")
return v
@field_validator("asset_uris")
@classmethod
def validate_dam_paths(cls, v: List[str]) -> List[str]:
valid_schemes = ("https://", "s3://", "file://")
for uri in v:
if not uri.startswith(valid_schemes):
raise ValueError(f"Invalid asset URI scheme: {uri}")
return vAsync Batch Processing Integration
Large museum manifests often exceed memory limits when loaded synchronously. Stream records through an async generator. Validate payloads in parallel using thread offloading. Yield compliant records directly to the sync layer. This architecture prevents event loop blocking during heavy I/O operations. Refer to Building Async Ingestion Pipelines for executor pool configuration.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import AsyncGenerator, Dict, Any
executor = ThreadPoolExecutor(max_workers=4)
async def validate_batch(records: list[Dict[str, Any]]) -> AsyncGenerator[DigitalAssetRecord, None]:
loop = asyncio.get_running_loop()
tasks = []
for record in records:
task = loop.run_in_executor(executor, DigitalAssetRecord.model_validate, record)
tasks.append(task)
for completed in asyncio.as_completed(tasks):
try:
yield await completed
except ValidationError as e:
logger.warning("Validation failure: %s", e.errors())
continueError Routing & Compliance Logging
Structured error reporting enables deterministic compliance routing. Capture ValidationError payloads and map them to institutional remediation queues. Log field-level failures with accession numbers for collections staff review. Route malformed records to a quarantine database table. This prevents pipeline stalls while preserving audit trails. Integrate with Automating OCR Metadata Extraction to reprocess failed text fields automatically.
def format_validation_error(error: ValidationError) -> Dict[str, Any]:
detail = error.errors(include_url=False)
return {
"status": "rejected",
"errors": detail,
"input_data": [e.get("input") for e in detail],
"timestamp": datetime.now(timezone.utc).isoformat()
}Production Deployment & Standards Alignment
Deploy validation models as immutable configuration artifacts. Version control schema definitions alongside pipeline code. Map validated outputs to LIDO XML structures using lxml or dicttoxml. Generate IIIF Presentation API manifests from compliant asset_uris and rights_statement fields. Monitor validation throughput using Prometheus metrics. Scale executor pools based on manifest size and network latency. Strict validation guarantees that downstream systems receive only canonical, standards-compliant records. Consult the official Pydantic v2 documentation for advanced serialization strategies.
Conclusion
Pydantic v2 delivers three properties critical to museum ingestion: fail-fast validation that surfaces schema errors before any I/O, field aliases that bridge legacy CMS column names to canonical identifiers without silent coercion, and structured ValidationError payloads that give curators actionable remediation information rather than cryptic stack traces. Thread-pool offloading keeps the async event loop responsive while CPU-bound validation runs concurrently.