Skip to content

Validators API

Validation engine and result types for DPP validation.

ValidationEngine

The main validation engine supporting seven-layer validation.

dppvalidator.validators.ValidationEngine

Seven-layer validation engine for Digital Product Passports.

Provides configurable validation through seven layers: 1. Schema validation (JSON Schema Draft 2020-12) 2. Model validation (Pydantic v2) 3. Semantic validation (Business rules) 4. JSON-LD validation (Context expansion and term resolution) 5. Vocabulary validation (External code lists and ontologies) 6. Plugin validation (Custom validator plugins) 7. Signature verification (Verifiable Credential proofs)

Following the Result pattern, validation never raises exceptions. Check result.valid and inspect result.errors for details.

Source code in src/dppvalidator/validators/engine.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
class ValidationEngine:
    """Seven-layer validation engine for Digital Product Passports.

    Provides configurable validation through seven layers:
    1. Schema validation (JSON Schema Draft 2020-12)
    2. Model validation (Pydantic v2)
    3. Semantic validation (Business rules)
    4. JSON-LD validation (Context expansion and term resolution)
    5. Vocabulary validation (External code lists and ontologies)
    6. Plugin validation (Custom validator plugins)
    7. Signature verification (Verifiable Credential proofs)

    Following the Result pattern, validation never raises exceptions.
    Check `result.valid` and inspect `result.errors` for details.
    """

    # Default max input size: 10 MB (protects against DoS via huge payloads)
    DEFAULT_MAX_INPUT_SIZE = 10 * 1024 * 1024  # 10 MB

    def __init__(
        self,
        schema_version: str = "auto",
        strict_mode: bool = False,
        validate_vocabularies: bool = False,
        layers: list[Literal["schema", "model", "semantic", "jsonld"]] | None = None,
        validate_jsonld: bool = False,
        verify_signatures: bool = False,
        load_plugins: bool = True,
        max_input_size: int | None = None,
        enable_shacl: bool = False,
    ) -> None:
        """Initialize the validation engine.

        Args:
            schema_version: UNTP DPP schema version to validate against.
                Use "auto" to detect version from input data (default).
            strict_mode: If True, enables strict JSON Schema validation
            validate_vocabularies: If True, validates external vocabulary values
            layers: Specific layers to run. None means all layers.
            validate_jsonld: If True, enables JSON-LD semantic validation (requires pyld)
            verify_signatures: If True, verifies VC signatures (requires cryptography)
            load_plugins: If True, discovers and loads plugin validators
            max_input_size: Maximum input size in bytes. None uses default (10MB).
                Set to 0 to disable size limits.
            enable_shacl: If True, enables SHACL validation against official
                CIRPASS-2 shapes. Requires: uv add dppvalidator[rdf] or
                pip install dppvalidator[rdf]

        Raises:
            ImportError: If optional features are enabled but dependencies not installed:
                - enable_shacl=True requires [rdf] extra
                - validate_jsonld=True requires pyld (included in base install)
                - verify_signatures=True requires cryptography (included in base install)
        """
        # Explicit feature detection for optional dependencies
        if enable_shacl and not is_shacl_available():
            raise ImportError(
                "SHACL validation requires the [rdf] extra. "
                "Install with: uv add dppvalidator[rdf] or pip install dppvalidator[rdf]"
            )

        if validate_jsonld and not _is_jsonld_available():
            raise ImportError(
                "JSON-LD validation requires pyld. Install with: uv add pyld or pip install pyld"
            )

        if verify_signatures and not _is_crypto_available():
            raise ImportError(
                "Signature verification requires cryptography. "
                "Install with: uv add cryptography or pip install cryptography"
            )
        self._auto_detect = schema_version == "auto"
        self.schema_version = schema_version
        self.strict_mode = strict_mode
        self.validate_vocabularies = validate_vocabularies
        self.validate_jsonld = validate_jsonld
        self.verify_signatures = verify_signatures
        self.enable_shacl = enable_shacl
        self.layers = layers or ["schema", "model", "semantic"]
        self._load_plugins = load_plugins
        self.max_input_size = (
            max_input_size if max_input_size is not None else self.DEFAULT_MAX_INPUT_SIZE
        )

        # Defer validator initialization if auto-detecting
        self._schema_validator: SchemaValidator | None = None
        self._model_validator: ModelValidator | None = None
        self._semantic_validator: SemanticValidator | None = None
        self._jsonld_validator: Any = None  # JSONLDValidator (optional)
        self._credential_verifier: Any = None  # CredentialVerifier (optional)

        if not self._auto_detect:
            self._init_validators(schema_version)

        # Initialize vocabulary loader if needed
        self._vocab_loader = None
        if validate_vocabularies:
            self._init_vocabulary_loader()

        # Initialize credential verifier if needed
        if verify_signatures:
            self._init_credential_verifier()

        # Initialize plugin registry if needed
        self._plugin_registry = None
        if load_plugins:
            self._init_plugin_registry()

    def _init_validators(self, version: str) -> None:
        """Initialize validators for a specific schema version.

        Args:
            version: Schema version string

        """
        self._schema_validator = SchemaValidator(version, strict=self.strict_mode)
        self._model_validator = ModelValidator(version)
        self._semantic_validator = SemanticValidator(version)

        # Initialize JSON-LD validator if enabled
        if self.validate_jsonld or "jsonld" in self.layers:
            self._init_jsonld_validator(version)

    def _init_jsonld_validator(self, version: str) -> None:
        """Initialize JSON-LD semantic validator.

        Args:
            version: Schema version string

        """
        try:
            from dppvalidator.validators.jsonld_semantic import JSONLDValidator

            self._jsonld_validator = JSONLDValidator(
                schema_version=version,
                strict=self.strict_mode,
            )
            logger.debug("JSON-LD validator initialized")
        except ImportError:
            logger.warning(
                "pyld import failed - JSON-LD validation disabled. "
                "Try: pip install --force-reinstall dppvalidator"
            )

    def _init_vocabulary_loader(self) -> None:
        """Initialize the vocabulary loader for external vocabulary validation."""
        try:
            from dppvalidator.vocabularies.loader import VocabularyLoader

            self._vocab_loader = VocabularyLoader(offline_mode=False)
            logger.debug("Vocabulary loader initialized")
        except ImportError:
            logger.warning("Vocabulary loader not available")

    def _init_credential_verifier(self) -> None:
        """Initialize the credential verifier for VC signature verification."""
        try:
            from dppvalidator.verifier.verifier import CredentialVerifier

            self._credential_verifier = CredentialVerifier()
            logger.debug("Credential verifier initialized")
        except ImportError:
            logger.warning(
                "cryptography import failed - signature verification disabled. "
                "Try: pip install --force-reinstall dppvalidator"
            )

    def _init_plugin_registry(self) -> None:
        """Initialize the plugin registry for plugin validators.

        Uses the singleton registry via get_default_registry() to avoid
        redundant plugin discovery when multiple ValidationEngine instances
        are created.
        """
        try:
            from dppvalidator.plugins.registry import get_default_registry

            self._plugin_registry = get_default_registry()
            logger.debug(
                "Plugin registry initialized with %d validators",
                self._plugin_registry.validator_count,
            )
        except (ImportError, AttributeError, TypeError) as e:
            logger.warning("Plugin registry initialization failed: %s", e)

    def validate(
        self,
        data: dict[str, Any] | str | Path,
        *,
        fail_fast: bool = False,
        max_errors: int = 100,
    ) -> ValidationResult:
        """Validate DPP data through configured layers.

        Args:
            data: Raw JSON dict, JSON string, or path to JSON file
            fail_fast: Stop on first error if True
            max_errors: Maximum errors to collect before stopping

        Returns:
            ValidationResult with parsed passport if valid

        """
        start_time = time.perf_counter()

        parsed_data = self._parse_input(data)
        if isinstance(parsed_data, ValidationResult):
            return parsed_data

        # Auto-detect schema version if enabled
        effective_version = self.schema_version
        if self._auto_detect:
            effective_version = detect_schema_version(parsed_data)
            self._init_validators(effective_version)
            logger.debug("Auto-detected schema version: %s", effective_version)

        parse_time = (time.perf_counter() - start_time) * 1000
        context = ValidationContext(
            parsed_data=parsed_data,
            schema_version=effective_version,
            strict_mode=self.strict_mode,
            fail_fast=fail_fast,
            max_errors=max_errors,
        )
        context.result.parse_time_ms = parse_time

        # Build and execute validation layers
        validation_layers = self._build_layers(effective_version)
        for layer in validation_layers:
            if layer.should_run(context):
                layer_result = layer.execute(context)
                context.merge_result(layer_result)
                self._apply_signature_fields(context.result, layer_result, layer)
                if context.should_stop():
                    break

        context.result.passport = context.passport
        return context.result

    def _build_layers(self, schema_version: str) -> list[ValidationLayer]:
        """Build the ordered list of validation layers based on configuration."""
        layers: list[ValidationLayer] = []

        if "schema" in self.layers:
            layers.append(SchemaLayer(self._schema_validator))

        if "model" in self.layers:
            layers.append(ModelLayer(self._model_validator))

        if "semantic" in self.layers:
            layers.append(SemanticLayer(self._semantic_validator))

        if "jsonld" in self.layers or self.validate_jsonld:
            layers.append(JsonLdLayer(self._jsonld_validator))

        if self.validate_vocabularies:
            layers.append(VocabularyLayer(self._vocab_loader, schema_version))

        if self._load_plugins:
            layers.append(PluginLayer(self._plugin_registry, schema_version))

        if self.verify_signatures:
            layers.append(SignatureLayer(self._credential_verifier, schema_version))

        return layers

    def _apply_signature_fields(
        self,
        result: ValidationResult,
        layer_result: ValidationResult,
        layer: ValidationLayer,
    ) -> None:
        """Copy signature verification fields from SignatureLayer result."""
        if layer.name == "signature":
            result.signature_valid = layer_result.signature_valid
            result.issuer_did = layer_result.issuer_did
            result.verification_method = layer_result.verification_method

    def validate_file(self, path: Path | str) -> ValidationResult:
        """Validate a JSON file.

        Args:
            path: Path to JSON file

        Returns:
            ValidationResult

        """
        return self.validate(Path(path))

    async def validate_async(self, data: dict[str, Any]) -> ValidationResult:
        """Validate data asynchronously (thread-offloaded).

        This method wraps the synchronous `validate()` method using
        `asyncio.to_thread()` to avoid blocking the event loop. Use this
        for integrating with async frameworks like FastAPI or aiohttp.

        Note:
            For natively async operations (network I/O), use `validate_deep()`
            which uses httpx.AsyncClient for non-blocking HTTP requests.

        Args:
            data: Raw JSON dict

        Returns:
            ValidationResult

        Example:
            >>> async def handler(request):
            ...     result = await engine.validate_async(request.json())
            ...     return {"valid": result.valid}

        """
        return await asyncio.to_thread(self.validate, data)

    async def validate_batch(
        self,
        items: list[dict[str, Any]],
        *,
        concurrency: int = 10,
    ) -> list[ValidationResult]:
        """Validate multiple items concurrently.

        Args:
            items: List of raw JSON dicts
            concurrency: Maximum concurrent validations

        Returns:
            List of ValidationResults in same order as input

        """
        semaphore = asyncio.Semaphore(concurrency)

        async def validate_with_semaphore(item: dict[str, Any]) -> ValidationResult:
            async with semaphore:
                return await self.validate_async(item)

        return await asyncio.gather(*[validate_with_semaphore(item) for item in items])

    async def validate_deep(
        self,
        data: dict[str, Any],
        *,
        max_depth: int = 3,
        follow_links: list[str] | None = None,
        timeout: float = 30.0,
        auth_header: dict[str, str] | None = None,
    ) -> DeepValidationResult:
        """Perform deep/recursive validation following linked documents.

        Crawls the supply chain by following links in the DPP and validates
        each linked document, building a complete validation graph.

        Args:
            data: Root DPP document data
            max_depth: Maximum depth to traverse (0 = root only)
            follow_links: JSON paths to follow for links (uses defaults if None)
            timeout: HTTP request timeout in seconds
            auth_header: Authorization headers for authenticated requests

        Returns:
            DeepValidationResult with all validation results and link graph

        """
        from dppvalidator.validators.deep import DeepValidator

        def validator_factory() -> ValidationEngine:
            return ValidationEngine(
                schema_version=self.schema_version,
                strict_mode=self.strict_mode,
                validate_vocabularies=self.validate_vocabularies,
                validate_jsonld=self.validate_jsonld,
                verify_signatures=self.verify_signatures,
                load_plugins=self._load_plugins,
            )

        deep_validator = DeepValidator(
            validator_factory=validator_factory,
            max_depth=max_depth,
            follow_links=follow_links,
            timeout=timeout,
            auth_header=auth_header,
        )

        return await deep_validator.validate(data)

    def _parse_input(self, data: dict[str, Any] | str | Path) -> dict[str, Any] | ValidationResult:
        """Parse input data to dict.

        Returns:
            Parsed dict or ValidationResult with parse error

        """
        # Check input size for string inputs (DoS protection)
        if (
            isinstance(data, str)
            and self.max_input_size > 0
            and len(data.encode("utf-8")) > self.max_input_size
        ):
            return ValidationResult(
                valid=False,
                errors=[
                    ValidationError(
                        path="$",
                        message=(
                            f"Input size exceeds maximum allowed "
                            f"({self.max_input_size:,} bytes). "
                            "Consider splitting into smaller documents."
                        ),
                        code="PRS004",
                        layer="model",
                        severity="error",
                    )
                ],
                schema_version=self.schema_version,
            )

        if isinstance(data, dict):
            return data

        if isinstance(data, Path):
            try:
                # Check file size before reading (DoS protection)
                if self.max_input_size > 0:
                    try:
                        file_size = data.stat().st_size
                        if file_size > self.max_input_size:
                            return ValidationResult(
                                valid=False,
                                errors=[
                                    ValidationError(
                                        path="$",
                                        message=(
                                            f"File size ({file_size:,} bytes) exceeds maximum "
                                            f"allowed ({self.max_input_size:,} bytes)."
                                        ),
                                        code="PRS005",
                                        layer="model",
                                        severity="error",
                                    )
                                ],
                                schema_version=self.schema_version,
                            )
                    except OSError:
                        pass  # File doesn't exist yet, will be caught below

                return json.loads(data.read_text(encoding="utf-8"))
            except FileNotFoundError:
                return ValidationResult(
                    valid=False,
                    errors=[
                        ValidationError(
                            path="$",
                            message=f"File not found: {data}",
                            code="PRS001",
                            layer="model",
                        )
                    ],
                    schema_version=self.schema_version,
                )
            except json.JSONDecodeError as e:
                return ValidationResult(
                    valid=False,
                    errors=[
                        ValidationError(
                            path="$",
                            message=f"Invalid JSON: {e}",
                            code="PRS002",
                            layer="model",
                        )
                    ],
                    schema_version=self.schema_version,
                )

        if isinstance(data, str):
            try:
                return json.loads(data)
            except json.JSONDecodeError as e:
                return ValidationResult(
                    valid=False,
                    errors=[
                        ValidationError(
                            path="$",
                            message=f"Invalid JSON: {e}",
                            code="PRS002",
                            layer="model",
                        )
                    ],
                    schema_version=self.schema_version,
                )

        return ValidationResult(
            valid=False,
            errors=[
                ValidationError(
                    path="$",
                    message=f"Unsupported input type: {type(data).__name__}",
                    code="PRS003",
                    layer="model",
                )
            ],
            schema_version=self.schema_version,
        )

__init__(schema_version='auto', strict_mode=False, validate_vocabularies=False, layers=None, validate_jsonld=False, verify_signatures=False, load_plugins=True, max_input_size=None, enable_shacl=False)

Initialize the validation engine.

Parameters:

Name Type Description Default
schema_version str

UNTP DPP schema version to validate against. Use "auto" to detect version from input data (default).

'auto'
strict_mode bool

If True, enables strict JSON Schema validation

False
validate_vocabularies bool

If True, validates external vocabulary values

False
layers list[Literal['schema', 'model', 'semantic', 'jsonld']] | None

Specific layers to run. None means all layers.

None
validate_jsonld bool

If True, enables JSON-LD semantic validation (requires pyld)

False
verify_signatures bool

If True, verifies VC signatures (requires cryptography)

False
load_plugins bool

If True, discovers and loads plugin validators

True
max_input_size int | None

Maximum input size in bytes. None uses default (10MB). Set to 0 to disable size limits.

None
enable_shacl bool

If True, enables SHACL validation against official CIRPASS-2 shapes. Requires: uv add dppvalidator[rdf] or pip install dppvalidator[rdf]

False

Raises:

Type Description
ImportError

If optional features are enabled but dependencies not installed: - enable_shacl=True requires [rdf] extra - validate_jsonld=True requires pyld (included in base install) - verify_signatures=True requires cryptography (included in base install)

Source code in src/dppvalidator/validators/engine.py
def __init__(
    self,
    schema_version: str = "auto",
    strict_mode: bool = False,
    validate_vocabularies: bool = False,
    layers: list[Literal["schema", "model", "semantic", "jsonld"]] | None = None,
    validate_jsonld: bool = False,
    verify_signatures: bool = False,
    load_plugins: bool = True,
    max_input_size: int | None = None,
    enable_shacl: bool = False,
) -> None:
    """Initialize the validation engine.

    Args:
        schema_version: UNTP DPP schema version to validate against.
            Use "auto" to detect version from input data (default).
        strict_mode: If True, enables strict JSON Schema validation
        validate_vocabularies: If True, validates external vocabulary values
        layers: Specific layers to run. None means all layers.
        validate_jsonld: If True, enables JSON-LD semantic validation (requires pyld)
        verify_signatures: If True, verifies VC signatures (requires cryptography)
        load_plugins: If True, discovers and loads plugin validators
        max_input_size: Maximum input size in bytes. None uses default (10MB).
            Set to 0 to disable size limits.
        enable_shacl: If True, enables SHACL validation against official
            CIRPASS-2 shapes. Requires: uv add dppvalidator[rdf] or
            pip install dppvalidator[rdf]

    Raises:
        ImportError: If optional features are enabled but dependencies not installed:
            - enable_shacl=True requires [rdf] extra
            - validate_jsonld=True requires pyld (included in base install)
            - verify_signatures=True requires cryptography (included in base install)
    """
    # Explicit feature detection for optional dependencies
    if enable_shacl and not is_shacl_available():
        raise ImportError(
            "SHACL validation requires the [rdf] extra. "
            "Install with: uv add dppvalidator[rdf] or pip install dppvalidator[rdf]"
        )

    if validate_jsonld and not _is_jsonld_available():
        raise ImportError(
            "JSON-LD validation requires pyld. Install with: uv add pyld or pip install pyld"
        )

    if verify_signatures and not _is_crypto_available():
        raise ImportError(
            "Signature verification requires cryptography. "
            "Install with: uv add cryptography or pip install cryptography"
        )
    self._auto_detect = schema_version == "auto"
    self.schema_version = schema_version
    self.strict_mode = strict_mode
    self.validate_vocabularies = validate_vocabularies
    self.validate_jsonld = validate_jsonld
    self.verify_signatures = verify_signatures
    self.enable_shacl = enable_shacl
    self.layers = layers or ["schema", "model", "semantic"]
    self._load_plugins = load_plugins
    self.max_input_size = (
        max_input_size if max_input_size is not None else self.DEFAULT_MAX_INPUT_SIZE
    )

    # Defer validator initialization if auto-detecting
    self._schema_validator: SchemaValidator | None = None
    self._model_validator: ModelValidator | None = None
    self._semantic_validator: SemanticValidator | None = None
    self._jsonld_validator: Any = None  # JSONLDValidator (optional)
    self._credential_verifier: Any = None  # CredentialVerifier (optional)

    if not self._auto_detect:
        self._init_validators(schema_version)

    # Initialize vocabulary loader if needed
    self._vocab_loader = None
    if validate_vocabularies:
        self._init_vocabulary_loader()

    # Initialize credential verifier if needed
    if verify_signatures:
        self._init_credential_verifier()

    # Initialize plugin registry if needed
    self._plugin_registry = None
    if load_plugins:
        self._init_plugin_registry()

validate(data, *, fail_fast=False, max_errors=100)

Validate DPP data through configured layers.

Parameters:

Name Type Description Default
data dict[str, Any] | str | Path

Raw JSON dict, JSON string, or path to JSON file

required
fail_fast bool

Stop on first error if True

False
max_errors int

Maximum errors to collect before stopping

100

Returns:

Type Description
ValidationResult

ValidationResult with parsed passport if valid

Source code in src/dppvalidator/validators/engine.py
def validate(
    self,
    data: dict[str, Any] | str | Path,
    *,
    fail_fast: bool = False,
    max_errors: int = 100,
) -> ValidationResult:
    """Validate DPP data through configured layers.

    Args:
        data: Raw JSON dict, JSON string, or path to JSON file
        fail_fast: Stop on first error if True
        max_errors: Maximum errors to collect before stopping

    Returns:
        ValidationResult with parsed passport if valid

    """
    start_time = time.perf_counter()

    parsed_data = self._parse_input(data)
    if isinstance(parsed_data, ValidationResult):
        return parsed_data

    # Auto-detect schema version if enabled
    effective_version = self.schema_version
    if self._auto_detect:
        effective_version = detect_schema_version(parsed_data)
        self._init_validators(effective_version)
        logger.debug("Auto-detected schema version: %s", effective_version)

    parse_time = (time.perf_counter() - start_time) * 1000
    context = ValidationContext(
        parsed_data=parsed_data,
        schema_version=effective_version,
        strict_mode=self.strict_mode,
        fail_fast=fail_fast,
        max_errors=max_errors,
    )
    context.result.parse_time_ms = parse_time

    # Build and execute validation layers
    validation_layers = self._build_layers(effective_version)
    for layer in validation_layers:
        if layer.should_run(context):
            layer_result = layer.execute(context)
            context.merge_result(layer_result)
            self._apply_signature_fields(context.result, layer_result, layer)
            if context.should_stop():
                break

    context.result.passport = context.passport
    return context.result

validate_async(data) async

Validate data asynchronously (thread-offloaded).

This method wraps the synchronous validate() method using asyncio.to_thread() to avoid blocking the event loop. Use this for integrating with async frameworks like FastAPI or aiohttp.

Note

For natively async operations (network I/O), use validate_deep() which uses httpx.AsyncClient for non-blocking HTTP requests.

Parameters:

Name Type Description Default
data dict[str, Any]

Raw JSON dict

required

Returns:

Type Description
ValidationResult

ValidationResult

Example

async def handler(request): ... result = await engine.validate_async(request.json()) ... return {"valid": result.valid}

Source code in src/dppvalidator/validators/engine.py
async def validate_async(self, data: dict[str, Any]) -> ValidationResult:
    """Validate data asynchronously (thread-offloaded).

    This method wraps the synchronous `validate()` method using
    `asyncio.to_thread()` to avoid blocking the event loop. Use this
    for integrating with async frameworks like FastAPI or aiohttp.

    Note:
        For natively async operations (network I/O), use `validate_deep()`
        which uses httpx.AsyncClient for non-blocking HTTP requests.

    Args:
        data: Raw JSON dict

    Returns:
        ValidationResult

    Example:
        >>> async def handler(request):
        ...     result = await engine.validate_async(request.json())
        ...     return {"valid": result.valid}

    """
    return await asyncio.to_thread(self.validate, data)

validate_batch(items, *, concurrency=10) async

Validate multiple items concurrently.

Parameters:

Name Type Description Default
items list[dict[str, Any]]

List of raw JSON dicts

required
concurrency int

Maximum concurrent validations

10

Returns:

Type Description
list[ValidationResult]

List of ValidationResults in same order as input

Source code in src/dppvalidator/validators/engine.py
async def validate_batch(
    self,
    items: list[dict[str, Any]],
    *,
    concurrency: int = 10,
) -> list[ValidationResult]:
    """Validate multiple items concurrently.

    Args:
        items: List of raw JSON dicts
        concurrency: Maximum concurrent validations

    Returns:
        List of ValidationResults in same order as input

    """
    semaphore = asyncio.Semaphore(concurrency)

    async def validate_with_semaphore(item: dict[str, Any]) -> ValidationResult:
        async with semaphore:
            return await self.validate_async(item)

    return await asyncio.gather(*[validate_with_semaphore(item) for item in items])

validate_deep(data, *, max_depth=3, follow_links=None, timeout=30.0, auth_header=None) async

Perform deep/recursive validation following linked documents.

Crawls the supply chain by following links in the DPP and validates each linked document, building a complete validation graph.

Parameters:

Name Type Description Default
data dict[str, Any]

Root DPP document data

required
max_depth int

Maximum depth to traverse (0 = root only)

3
follow_links list[str] | None

JSON paths to follow for links (uses defaults if None)

None
timeout float

HTTP request timeout in seconds

30.0
auth_header dict[str, str] | None

Authorization headers for authenticated requests

None

Returns:

Type Description
DeepValidationResult

DeepValidationResult with all validation results and link graph

Source code in src/dppvalidator/validators/engine.py
async def validate_deep(
    self,
    data: dict[str, Any],
    *,
    max_depth: int = 3,
    follow_links: list[str] | None = None,
    timeout: float = 30.0,
    auth_header: dict[str, str] | None = None,
) -> DeepValidationResult:
    """Perform deep/recursive validation following linked documents.

    Crawls the supply chain by following links in the DPP and validates
    each linked document, building a complete validation graph.

    Args:
        data: Root DPP document data
        max_depth: Maximum depth to traverse (0 = root only)
        follow_links: JSON paths to follow for links (uses defaults if None)
        timeout: HTTP request timeout in seconds
        auth_header: Authorization headers for authenticated requests

    Returns:
        DeepValidationResult with all validation results and link graph

    """
    from dppvalidator.validators.deep import DeepValidator

    def validator_factory() -> ValidationEngine:
        return ValidationEngine(
            schema_version=self.schema_version,
            strict_mode=self.strict_mode,
            validate_vocabularies=self.validate_vocabularies,
            validate_jsonld=self.validate_jsonld,
            verify_signatures=self.verify_signatures,
            load_plugins=self._load_plugins,
        )

    deep_validator = DeepValidator(
        validator_factory=validator_factory,
        max_depth=max_depth,
        follow_links=follow_links,
        timeout=timeout,
        auth_header=auth_header,
    )

    return await deep_validator.validate(data)

validate_file(path)

Validate a JSON file.

Parameters:

Name Type Description Default
path Path | str

Path to JSON file

required

Returns:

Type Description
ValidationResult

ValidationResult

Source code in src/dppvalidator/validators/engine.py
def validate_file(self, path: Path | str) -> ValidationResult:
    """Validate a JSON file.

    Args:
        path: Path to JSON file

    Returns:
        ValidationResult

    """
    return self.validate(Path(path))

options: show_source: false members: - init - validate

ValidationResult

Result of a validation operation.

dppvalidator.validators.ValidationResult dataclass

Result of DPP validation following the Result pattern.

Never raises exceptions for validation failures. Instead, check the valid property and inspect errors for details.

Attributes:

Name Type Description
valid bool

Whether the passport passed all validation layers

errors list[ValidationError]

List of validation errors (severity="error")

warnings list[ValidationError]

List of validation warnings (severity="warning")

info list[ValidationError]

List of informational messages (severity="info")

schema_version str

UNTP DPP schema version used

validated_at datetime

Timestamp of validation

passport DigitalProductPassport | None

Parsed DigitalProductPassport if valid, None otherwise

parse_time_ms float

Time spent parsing input

validation_time_ms float

Time spent on validation layers

Source code in src/dppvalidator/validators/results.py
@dataclass
class ValidationResult:
    """Result of DPP validation following the Result pattern.

    Never raises exceptions for validation failures. Instead, check
    the `valid` property and inspect `errors` for details.

    Attributes:
        valid: Whether the passport passed all validation layers
        errors: List of validation errors (severity="error")
        warnings: List of validation warnings (severity="warning")
        info: List of informational messages (severity="info")
        schema_version: UNTP DPP schema version used
        validated_at: Timestamp of validation
        passport: Parsed DigitalProductPassport if valid, None otherwise
        parse_time_ms: Time spent parsing input
        validation_time_ms: Time spent on validation layers
    """

    valid: bool
    errors: list[ValidationError] = field(default_factory=list)
    warnings: list[ValidationError] = field(default_factory=list)
    info: list[ValidationError] = field(default_factory=list)
    schema_version: str = "0.6.1"
    validated_at: datetime = field(default_factory=datetime.now)
    passport: DigitalProductPassport | None = None
    parse_time_ms: float = 0.0
    validation_time_ms: float = 0.0
    # Signature verification fields
    signature_valid: bool | None = None
    issuer_did: str | None = None
    verification_method: str | None = None

    @property
    def error_count(self) -> int:
        """Total number of errors."""
        return len(self.errors)

    @property
    def warning_count(self) -> int:
        """Total number of warnings."""
        return len(self.warnings)

    @property
    def all_issues(self) -> list[ValidationError]:
        """All errors, warnings, and info messages combined."""
        return self.errors + self.warnings + self.info

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        result = {
            "valid": self.valid,
            "errors": [e.to_dict() for e in self.errors],
            "warnings": [w.to_dict() for w in self.warnings],
            "info": [i.to_dict() for i in self.info],
            "schema_version": self.schema_version,
            "validated_at": self.validated_at.isoformat(),
            "parse_time_ms": self.parse_time_ms,
            "validation_time_ms": self.validation_time_ms,
        }
        if self.signature_valid is not None:
            result["signature_valid"] = self.signature_valid
        if self.issuer_did:
            result["issuer_did"] = self.issuer_did
        if self.verification_method:
            result["verification_method"] = self.verification_method
        return result

    def to_json(self, *, indent: int | None = 2) -> str:
        """Serialize result to JSON string."""
        return json.dumps(self.to_dict(), indent=indent)

    def raise_for_errors(self) -> None:
        """Raise ValidationException if there are errors.

        This is an opt-in method for users who prefer exception-based flow.
        """
        if not self.valid:
            raise ValidationException(self)

    def __repr__(self) -> str:
        """Return a concise representation for debugging."""
        return (
            f"ValidationResult(valid={self.valid}, "
            f"errors={len(self.errors)}, "
            f"warnings={len(self.warnings)}, "
            f"info={len(self.info)})"
        )

    def merge(self, other: ValidationResult) -> ValidationResult:
        """Merge another result into this one."""
        return ValidationResult(
            valid=self.valid and other.valid,
            errors=self.errors + other.errors,
            warnings=self.warnings + other.warnings,
            info=self.info + other.info,
            schema_version=self.schema_version,
            validated_at=self.validated_at,
            passport=self.passport if self.valid else other.passport,
            parse_time_ms=self.parse_time_ms + other.parse_time_ms,
            validation_time_ms=self.validation_time_ms + other.validation_time_ms,
        )

all_issues property

All errors, warnings, and info messages combined.

error_count property

Total number of errors.

warning_count property

Total number of warnings.

__repr__()

Return a concise representation for debugging.

Source code in src/dppvalidator/validators/results.py
def __repr__(self) -> str:
    """Return a concise representation for debugging."""
    return (
        f"ValidationResult(valid={self.valid}, "
        f"errors={len(self.errors)}, "
        f"warnings={len(self.warnings)}, "
        f"info={len(self.info)})"
    )

merge(other)

Merge another result into this one.

Source code in src/dppvalidator/validators/results.py
def merge(self, other: ValidationResult) -> ValidationResult:
    """Merge another result into this one."""
    return ValidationResult(
        valid=self.valid and other.valid,
        errors=self.errors + other.errors,
        warnings=self.warnings + other.warnings,
        info=self.info + other.info,
        schema_version=self.schema_version,
        validated_at=self.validated_at,
        passport=self.passport if self.valid else other.passport,
        parse_time_ms=self.parse_time_ms + other.parse_time_ms,
        validation_time_ms=self.validation_time_ms + other.validation_time_ms,
    )

raise_for_errors()

Raise ValidationException if there are errors.

This is an opt-in method for users who prefer exception-based flow.

Source code in src/dppvalidator/validators/results.py
def raise_for_errors(self) -> None:
    """Raise ValidationException if there are errors.

    This is an opt-in method for users who prefer exception-based flow.
    """
    if not self.valid:
        raise ValidationException(self)

to_dict()

Convert to dictionary for JSON serialization.

Source code in src/dppvalidator/validators/results.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    result = {
        "valid": self.valid,
        "errors": [e.to_dict() for e in self.errors],
        "warnings": [w.to_dict() for w in self.warnings],
        "info": [i.to_dict() for i in self.info],
        "schema_version": self.schema_version,
        "validated_at": self.validated_at.isoformat(),
        "parse_time_ms": self.parse_time_ms,
        "validation_time_ms": self.validation_time_ms,
    }
    if self.signature_valid is not None:
        result["signature_valid"] = self.signature_valid
    if self.issuer_did:
        result["issuer_did"] = self.issuer_did
    if self.verification_method:
        result["verification_method"] = self.verification_method
    return result

to_json(*, indent=2)

Serialize result to JSON string.

Source code in src/dppvalidator/validators/results.py
def to_json(self, *, indent: int | None = 2) -> str:
    """Serialize result to JSON string."""
    return json.dumps(self.to_dict(), indent=indent)

options: show_source: false

ValidationError

A single validation error or warning.

dppvalidator.validators.ValidationError dataclass

Represents a single validation error with full context.

Attributes:

Name Type Description
path str

JSON path to the error location (e.g., "$.credentialSubject.product.id")

message str

Human-readable error description

code str

Machine-readable error code (e.g., "SEM001")

layer Literal['schema', 'model', 'semantic', 'jsonld', 'plugin', 'vocabulary']

Validation layer that produced this error

severity Literal['error', 'warning', 'info']

Error severity level

suggestion str | None

Suggested fix for the error

docs_url str | None

Link to detailed error documentation

did_you_mean tuple[str, ...]

Similar valid values (for typo correction)

context dict[str, Any]

Additional context for debugging

Source code in src/dppvalidator/validators/results.py
@dataclass(frozen=True, slots=True)
class ValidationError:
    """Represents a single validation error with full context.

    Attributes:
        path: JSON path to the error location (e.g., "$.credentialSubject.product.id")
        message: Human-readable error description
        code: Machine-readable error code (e.g., "SEM001")
        layer: Validation layer that produced this error
        severity: Error severity level
        suggestion: Suggested fix for the error
        docs_url: Link to detailed error documentation
        did_you_mean: Similar valid values (for typo correction)
        context: Additional context for debugging
    """

    path: str
    message: str
    code: str
    layer: Literal["schema", "model", "semantic", "jsonld", "plugin", "vocabulary"]
    severity: Literal["error", "warning", "info"] = "error"
    suggestion: str | None = None
    docs_url: str | None = None
    did_you_mean: tuple[str, ...] = ()
    context: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        result = {
            "path": self.path,
            "message": self.message,
            "code": self.code,
            "layer": self.layer,
            "severity": self.severity,
            "context": self.context,
        }
        if self.suggestion:
            result["suggestion"] = self.suggestion
        if self.docs_url:
            result["docs_url"] = self.docs_url
        if self.did_you_mean:
            result["did_you_mean"] = list(self.did_you_mean)
        return result

to_dict()

Convert to dictionary for JSON serialization.

Source code in src/dppvalidator/validators/results.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    result = {
        "path": self.path,
        "message": self.message,
        "code": self.code,
        "layer": self.layer,
        "severity": self.severity,
        "context": self.context,
    }
    if self.suggestion:
        result["suggestion"] = self.suggestion
    if self.docs_url:
        result["docs_url"] = self.docs_url
    if self.did_you_mean:
        result["did_you_mean"] = list(self.did_you_mean)
    return result

options: show_source: false

Usage Example

from dppvalidator.validators import ValidationEngine, ValidationResult

# Create engine with specific layers
engine = ValidationEngine(layers=["model", "semantic"])

# Validate data
result = engine.validate(
    {
        "id": "https://example.com/dpp/001",
        "issuer": {"id": "https://example.com/issuer", "name": "Acme Corp"},
    }
)

# Check result
if result.valid:
    print("Passport is valid!")
else:
    for error in result.errors:
        print(f"[{error.code}] {error.path}: {error.message}")

# Access validation metadata
print(f"Schema version: {result.schema_version}")
print(f"Validation time: {result.validation_time_ms:.2f}ms")

Validation Layers

The engine supports seven validation layers:

Layer Description
schema JSON Schema validation (Draft 2020-12)
model Pydantic model validation
semantic Business rule validation
jsonld JSON-LD context expansion and validation
vocabulary External vocabulary validation
plugin Custom plugin validators
signature Verifiable Credential signatures

Performance Features

Module-Level Schema Caching

Schemas are cached at the module level for better performance:

from dppvalidator.schemas.loader import clear_schema_cache

# Clear cache if needed (e.g., after updating schema files)
clear_schema_cache()

Plugin Registry Singleton

The plugin registry uses a singleton pattern:

from dppvalidator.plugins.registry import get_default_registry, reset_default_registry

# Get the shared registry
registry = get_default_registry()

# Reset for testing
reset_default_registry()

Error Codes

Code Layer Description
SCH001 schema Required field missing
SCH002 schema Invalid type
MOD001 model Model validation error
JLD001 jsonld Invalid context
SEM001 semantic Invalid vocabulary value
SEM002 semantic Invalid date range
SIG001 crypto Invalid signature

Note: This table shows common examples. See Error Reference for the complete list of 70+ error codes.