Source code for rocrate_validator.models

# Copyright (c) 2024-2026 CRS4
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import bisect
import enum
import importlib
import inspect
import json
import re
from abc import ABC, abstractmethod
from collections.abc import Collection
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from functools import total_ordering
from pathlib import Path
from typing import Optional, Protocol, Tuple, Type, Union
from urllib.error import HTTPError

import enum_tools
from rdflib import RDF, RDFS, Graph, Namespace, URIRef

from rocrate_validator import __version__
from rocrate_validator.constants import (
    DEFAULT_HTTP_CACHE_MAX_AGE,
    DEFAULT_ONTOLOGY_FILE,
    DEFAULT_PROFILE_IDENTIFIER,
    DEFAULT_PROFILE_README_FILE,
    IGNORED_PROFILE_DIRECTORIES,
    JSON_OUTPUT_FORMAT_VERSION,
    PROF_NS,
    PROFILE_FILE_EXTENSIONS,
    PROFILE_SPECIFICATION_FILE,
    ROCRATE_METADATA_FILE,
    SCHEMA_ORG_NS,
)
from rocrate_validator.errors import (
    DuplicateRequirementCheck,
    InvalidProfilePath,
    ProfileNotFound,
    ProfileSpecificationError,
    ProfileSpecificationNotFound,
    ROCrateMetadataNotFoundError,
)
from rocrate_validator.events import Event, EventType, Publisher, Subscriber
from rocrate_validator.rocrate import ROCrate
from rocrate_validator.utils import log as logging
from rocrate_validator.utils.cache_warmup import auto_warm_up_for_settings
from rocrate_validator.utils.collections import MapIndex, MultiIndexMap
from rocrate_validator.utils.document_loader import install_document_loader
from rocrate_validator.utils.http import HttpRequester, find_offline_cache_miss
from rocrate_validator.utils.paths import (
    get_default_http_cache_path,
    get_profiles_path,
)
from rocrate_validator.utils.python_helpers import (
    get_requirement_name_from_file,
)
from rocrate_validator.utils.uri import URI

# set the default profiles path
DEFAULT_PROFILES_PATH = get_profiles_path()

logger = logging.getLogger(__name__)

BaseTypes = Union[str, Path, bool, int, None]


[docs] @enum.unique @enum_tools.documentation.document_enum @total_ordering class Severity(enum.Enum): """ Enum ordering "strength" of conditions to be verified """ #: the condition is not mandatory OPTIONAL = 0 #: the condition is recommended RECOMMENDED = 2 #: the condition is mandatory REQUIRED = 4 def __lt__(self, other: object) -> bool: if isinstance(other, Severity): return self.value < other.value else: raise TypeError(f"Comparison not supported between instances of {type(self)} and {type(other)}") @staticmethod def get(name: str) -> Severity: return getattr(Severity, name.upper())
[docs] @total_ordering @dataclass class RequirementLevel: """ Represents a requirement level. A requirement has a name and a severity level of type :class:`.Severity`. It implements the comparison operators to allow ordering of the requirement levels. """ name: str severity: Severity def __eq__(self, other: object) -> bool: if not isinstance(other, RequirementLevel): return False return self.name == other.name and self.severity == other.severity def __lt__(self, other: object) -> bool: # TODO: this ordering is not totally coherent, since for two objects a and b # with equal Severity but different names you would have # not a < b, which implies a >= b # and also a != b and not a > b, which is incoherent with a >= b if not isinstance(other, RequirementLevel): raise TypeError(f"Cannot compare {type(self)} with {type(other)}") return self.severity < other.severity def __hash__(self) -> int: return hash((self.name, self.severity)) def __repr__(self) -> str: return f"RequirementLevel(name={self.name}, severity={self.severity})" def __str__(self) -> str: return self.name def __int__(self) -> int: return self.severity.value def __index__(self) -> int: return self.severity.value
[docs] class LevelCollection: """ Collection of :class:`.RequirementLevel` instances. Provides a set of predefined RequirementLevel instances that can be used to define the severity of a requirement. They map the keywords defined in **RFC 2119** to the corresponding severity levels. .. note:: The keywords **MUST**, **MUST NOT**, **REQUIRED**, **SHALL**, **SHALL NOT**, **SHOULD**, **SHOULD NOT**, **RECOMMENDED**, **MAY**, and **OPTIONAL** in this document are to be interpreted as described in **RFC 2119**. """ #: The requirement level OPTIONAL is mapped to the OPTIONAL severity level OPTIONAL = RequirementLevel("OPTIONAL", Severity.OPTIONAL) #: The requirement level MAY is mapped to the OPTIONAL severity level MAY = RequirementLevel("MAY", Severity.OPTIONAL) #: The requirement level REQUIRED is mapped to the REQUIRED severity level REQUIRED = RequirementLevel("REQUIRED", Severity.REQUIRED) #: The requirement level SHOULD is mapped to the RECOMMENDED severity level SHOULD = RequirementLevel("SHOULD", Severity.RECOMMENDED) #: The requirement level SHOULD NOT is mapped to the RECOMMENDED severity level SHOULD_NOT = RequirementLevel("SHOULD_NOT", Severity.RECOMMENDED) #: The requirement level RECOMMENDED is mapped to the RECOMMENDED severity level RECOMMENDED = RequirementLevel("RECOMMENDED", Severity.RECOMMENDED) #: The requirement level MUST is mapped to the REQUIRED severity level MUST = RequirementLevel("MUST", Severity.REQUIRED) #: The requirement level MUST_NOT is mapped to the REQUIRED severity level MUST_NOT = RequirementLevel("MUST_NOT", Severity.REQUIRED) #: The requirement level SHALL is mapped to the REQUIRED severity level SHALL = RequirementLevel("SHALL", Severity.REQUIRED) #: The requirement level SHALL_NOT is mapped to the REQUIRED severity level SHALL_NOT = RequirementLevel("SHALL_NOT", Severity.REQUIRED) def __init__(self): raise NotImplementedError(f"{type(self)} can't be instantiated") @staticmethod def all() -> list[RequirementLevel]: return [ level for name, level in inspect.getmembers(LevelCollection) if not inspect.isroutine(level) and not inspect.isdatadescriptor(level) and not name.startswith("__") ] @staticmethod def get(name: str) -> RequirementLevel: try: return getattr(LevelCollection, name.upper()) except AttributeError: raise ValueError(f"Invalid RequirementLevel: {name}")
[docs] @total_ordering class Profile: """ RO-Crate Validator profile. A profile is a named set of requirements that can be used to validate an RO-Crate. """ # store the map of profiles: profile URI -> Profile instance __profiles_map: MultiIndexMap = MultiIndexMap( "uri", indexes=[ MapIndex("name"), MapIndex("token", unique=False), MapIndex("identifier", unique=True), MapIndex("token_path", unique=False), ], ) def __init__( self, profiles_base_path: Path, profile_path: Path, requirements: Optional[list[Requirement]] = None, identifier: str = None, publicID: Optional[str] = None, severity: Severity = Severity.REQUIRED, ): """ Initialize the Profile instance :param profile_path: the path of the profile :type profile_path: Path :param requirements: the list of requirements of the profile :type requirements: list[Requirement] :param identifier: the identifier of the profile :type identifier: str :param publicID: the public identifier of the profile :type publicID: str :meta private: :param severity: the severity of the profile :type severity: Severity : raises ProfileSpecificationNotFound: if the profile specification file is not found : raises ProfileSpecificationError: if the profile specification file contains more than one profile : raises InvalidProfilePath: if the profile path is not a directory :meta private: """ self._identifier: Optional[str] = identifier self._profiles_base_path = profiles_base_path self._profile_path = profile_path self._name: Optional[str] = None self._description: Optional[str] = None self._requirements: list[Requirement] = requirements if requirements is not None else [] self._publicID = publicID self._severity = severity self._overrides: list[Profile] = [] self._overridden_by: list[Profile] = [] # init property to store the RDF node which is the root of the profile specification graph self._profile_node = None # init property to store the RDF graph of the profile specification self._profile_specification_graph: Optional[Graph] = None # check if the profile specification file exists spec_file = self.profile_specification_file_path if not spec_file or not spec_file.exists(): raise ProfileSpecificationNotFound(spec_file) # load the profile specification expressed using the Profiles Vocabulary profile = Graph() profile.parse(str(spec_file), format="turtle") # check that the specification Graph hosts only one profile profiles = list(profile.subjects(predicate=RDF.type, object=PROF_NS.Profile)) if len(profiles) == 1: self._profile_node = profiles[0] self._profile_specification_graph = profile # initialize the token and version self._token, self._version = self.__init_token_version__() # Check if the profile is overriding an existing profile existing_profile = self.__profiles_map.get_by_key(self._profile_node.toPython()) if existing_profile: # Check if the existing profile is different from the current one if existing_profile.path != profile_path: # if the profile already exists, log a warning logger.warning( "Profile with identifier %s at %s is being overridden by the profile loaded from %s.", existing_profile.identifier, existing_profile.path, profile_path, ) # add the existing profile as an override self.__add_override__(existing_profile) # add the profile to the profiles map self.__profiles_map.add( self._profile_node.toPython(), self, token=self.token, name=self.name, identifier=self.identifier, token_path=self.__extract_token_from_path__(), ) # add the profile to the profiles map else: raise ProfileSpecificationError( message=f"Profile specification file {spec_file} must contain exactly one profile" ) def __get_specification_property__( self, property: str, namespace: Namespace, pop_first: bool = True, as_Python_object: bool = True, ) -> Union[str, list[Union[str, URIRef]]]: assert self._profile_specification_graph is not None, "Profile specification graph not loaded" values = list(self._profile_specification_graph.objects(self._profile_node, namespace[property])) if values and as_Python_object: values = [v.toPython() for v in values] if pop_first: return values[0] if values and len(values) >= 1 else None return values def __add_override__(self, profile: Profile): """ Add an override profile to this profile. :param profile: the profile that overrides this profile :type profile: Profile """ if not isinstance(profile, Profile): raise TypeError(f"Expected a Profile instance, got {type(profile)}") if profile not in self._overrides: self._overrides.append(profile) profile._overridden_by.append(self) @property def overrides(self) -> list[Profile]: """ The list of profiles that override this profile. """ return self._overrides @property def overridden_by(self) -> list[Profile]: """ The list of profiles that are overridden by this profile. """ return self._overridden_by @property def path(self): """ The path of the profile directory """ return self._profile_path @property def identifier(self) -> str: """ The identifier of the profile. """ if not self._identifier: version = self.version self._identifier = f"{self.token}-{version}" if version else self.token return self._identifier @property def name(self): """ The name of the profile as specified in the profile specification file (i.e., the value of the rdfs: label property in the `profile.ttl` file) or a default name if the label is not specified. """ return self.label or f"Profile {self.uri}" @property def profile_specification_graph(self) -> Graph: """ The RDF graph of the profile specification. """ return self._profile_specification_graph # type: ignore @property def profile_node(self): return self._profile_node @property def token(self): """ A token that uniquely identifies the profile as specified in the profile specification file (i.e., the value of the prof: hasToken property in the `profile.ttl` file). """ return self._token @property def uri(self): """ The URI of the profile. """ return self._profile_node.toPython() @property def label(self): return self.__get_specification_property__("label", RDFS) @property def comment(self): """ The comment added to the profile in the profile specification file (i.e., the value of the rdfs: comment property in the `profile.ttl` file). """ return self.__get_specification_property__("comment", RDFS) @property def version(self): """ The version of the profile as specified in the profile specification file (i.e., the value of the prof: version property in the `profile.ttl` file). """ return self._version @property def is_profile_of(self) -> list[str]: """ The list of profiles that this profile is a profile of as specified in the profile specification file (i.e., the value of the prof: isProfileOf property in the `profile.ttl` file). """ return self.__get_specification_property__("isProfileOf", PROF_NS, pop_first=False) @property def is_transitive_profile_of(self) -> list[str]: """ The list of profiles that this profile is a transitive profile of as specified in the profile specification file (i.e., the value of the prof: isTransitiveProfileOf property in the `profile.ttl` file). """ return self.__get_specification_property__("isTransitiveProfileOf", PROF_NS, pop_first=False) @property def parents(self) -> list[Profile]: """ The list of profiles that this profile is a profile of as specified in the profile specification file. """ return [self.__profiles_map.get_by_key(_) for _ in self.is_profile_of] @property def siblings(self) -> list[Profile]: """ The list of profiles that are siblings of this profile (i.e., profiles that share the same parent profile). """ return self.get_sibling_profiles(self) @property def descendants(self) -> list[Profile]: """ The list of profiles that are descendants of this profile (i.e., profiles that have this profile among their inherited profiles). """ return self.get_descendants(self) @property def readme_file_path(self) -> Path: """ The path of the README file of the profile. """ return self.path / DEFAULT_PROFILE_README_FILE @property def profile_specification_file_path(self) -> Path: """ The path of the profile specification file. """ return self.path / PROFILE_SPECIFICATION_FILE @property def publicID(self) -> Optional[str]: """ The public identifier of the RO-Crate which is validated by the profile. :meta private: """ return self._publicID @property def severity(self) -> Severity: """ The severity of the profile which the profile is loaded with, i.e., the minimum severity level of the requirements of the profile. """ return self._severity @property def description(self) -> str: """ The description of the profile as specified in the profile specification file (i.e., the value of the rdfs: comment property in the `profile.ttl` file). """ if not self._description: if self.path and self.readme_file_path.exists(): with open(self.readme_file_path, "r") as f: self._description = f.read() else: self._description = self.comment return self._description @property def requirements(self) -> list[Requirement]: """ The list of requirements of the profile. """ if not self._requirements: self._requirements = RequirementLoader.load_requirements(self, severity=self.severity) return self._requirements
[docs] def get_requirements(self, severity: Severity = Severity.REQUIRED, exact_match: bool = False) -> list[Requirement]: """ Get the requirements of the profile with the given severity level. If the exact_match flag is set to `True`, only the requirements with the exact severity level are returned; otherwise, the requirements with severity level greater than or equal to the given severity level are returned. """ return [ requirement for requirement in self.requirements if (not exact_match and (not requirement.severity_from_path or requirement.severity_from_path >= severity)) or (exact_match and requirement.severity_from_path == severity) ]
[docs] def get_requirement(self, name: str) -> Optional[Requirement]: """ Get the requirement with the given name """ for requirement in self.requirements: if requirement.name == name: return requirement return None
[docs] def get_requirement_check(self, check_name: str) -> Optional[RequirementCheck]: """ Get the requirement check with the given name """ for requirement in self.requirements: check = requirement.get_check(check_name) if check: return check return None
@classmethod def __get_nested_profiles__(cls, source: str) -> list[str]: result = [] visited = [] queue = [source] while len(queue) > 0: p = queue.pop() if p not in visited: visited.append(p) profile = cls.__profiles_map.get_by_key(p) inherited_profiles = profile.is_profile_of if inherited_profiles: for p in sorted(inherited_profiles, reverse=True): if p not in visited: queue.append(p) if p not in result: result.insert(0, p) return result @property def inherited_profiles(self) -> list[Profile]: inherited_profiles = self.is_transitive_profile_of if not inherited_profiles or len(inherited_profiles) == 0: inherited_profiles = Profile.__get_nested_profiles__(self.uri) profile_keys = self.__profiles_map.keys return [self.__profiles_map.get_by_key(_) for _ in inherited_profiles if _ in profile_keys] def add_requirement(self, requirement: Requirement): self._requirements.append(requirement) def remove_requirement(self, requirement: Requirement): self._requirements.remove(requirement) def __eq__(self, other: object) -> bool: return isinstance(other, Profile) and self.identifier == other.identifier and self.path == other.path def __lt__(self, other: object) -> bool: if not isinstance(other, Profile): raise TypeError(f"Cannot compare {type(self)} with {type(other)}") # If one profile is a parent of the other, the parent is greater if other in self.parents: return False # If the number of inherited profiles is the same, compare based on identifier return self.identifier < other.identifier def __hash__(self) -> int: return hash((self.identifier, self.path)) def __repr__(self) -> str: return ( f"Profile(identifier={self.identifier}, name={self.name}, path={self.path}, " if self.path else f"requirements={self.requirements})" ) def __str__(self) -> str: return f"{self.name} ({self.identifier})" def to_dict(self) -> dict: return { "identifier": self.identifier, "uri": self.uri, "name": self.name, "description": self.description, } @staticmethod def __extract_version_from_token__(token: str) -> Optional[str]: if not token: return None pattern = r"\Wv?(\d+(\.\d+(\.\d+)?)?)" matches = re.findall(pattern, token) if matches: return matches[-1][0] return None def __get_consistent_version__(self, candidate_token: str) -> str: candidates = { _ for _ in [ self.__get_specification_property__("version", SCHEMA_ORG_NS), self.__extract_version_from_token__(candidate_token), self.__extract_version_from_token__(str(self.path.relative_to(self._profiles_base_path))), self.__extract_version_from_token__(str(self.uri)), ] if _ is not None } if len(candidates) > 1: raise ProfileSpecificationError(f"Inconsistent versions found: {candidates}") logger.debug("Candidate versions: %s", candidates) return candidates.pop() if len(candidates) == 1 else None def __extract_token_from_path__(self) -> str: base_path = str(self._profiles_base_path.absolute()) identifier = str(self.path.absolute()) # Check if the path starts with the base path if not identifier.startswith(base_path): raise ValueError("Path does not start with the base path") # Remove the base path from the identifier identifier = identifier.replace(f"{base_path}/", "") # Replace slashes with hyphens identifier = identifier.replace("/", "-") return identifier def __init_token_version__(self) -> Tuple[str, str, str]: # try to extract the token from the specs or the path candidate_token = self.__get_specification_property__("hasToken", PROF_NS) if not candidate_token: candidate_token = self.__extract_token_from_path__() logger.debug("Candidate token: %s", candidate_token) # try to extract the version from the specs or the token or the path or the URI version = self.__get_consistent_version__(candidate_token) logger.debug("Extracted version: %s", version) # remove the version from the token if it is present if version: candidate_token = re.sub(r"[\W|_]+" + re.escape(version) + r"$", "", candidate_token) # return the candidate token and version return candidate_token, version @classmethod def __load_profile_path__( cls, profiles_base_path: str, profile_path: Union[str, Path], publicID: Optional[str] = None, severity: Severity = Severity.REQUIRED, ) -> Profile: # if the path is a string, convert it to a Path if isinstance(profile_path, str): profile_path = Path(profile_path) # check if the path is a directory if not profile_path.is_dir(): raise InvalidProfilePath(profile_path) # create a new profile profile = Profile( profiles_base_path=profiles_base_path, profile_path=profile_path, publicID=publicID, severity=severity, ) logger.debug("Loaded profile: %s", profile) return profile @classmethod def __load_profiles_paths__( cls, profiles_path: Union[str, Path] = None, extra_profiles_path: Union[str, Path] = None, ) -> list[Tuple[Path, Path]]: """ Load the paths of the profiles from the given profiles path and extra profiles path. :param profiles_path: the path to the profiles directory :type profiles_path: Union[str, Path] :param extra_profiles_path: an additional path to search for profiles :type extra_profiles_path: Union[str, Path] :return: a list of tuples containing the root profile directory and the profile directory :rtype: list[Tuple[Path, Path]] :raises InvalidProfilePath: if the profiles path is not a directory """ result = [] # set the list of root profile directories root_profile_directories = [profiles_path] if profiles_path else [] if extra_profiles_path is not None and extra_profiles_path != profiles_path: root_profile_directories.append(extra_profiles_path) # collect profiles nested in the root profile directories for root_profile_directory in root_profile_directories: # if the path is a string, convert it to a Path if isinstance(root_profile_directory, str): root_profile_directory = Path(root_profile_directory) # check if the path is a directory and raise an error if not if not root_profile_directory.is_dir(): raise InvalidProfilePath(root_profile_directory) # if the path is a directory, get the profile directories result.extend( [ (root_profile_directory, p.parent) for p in root_profile_directory.rglob("*.*") if p.name == PROFILE_SPECIFICATION_FILE ] ) # return the list of profile directories return result @classmethod def load_profiles( cls, profiles_path: Union[str, Path], extra_profiles_path: Union[str, Path] = None, publicID: Optional[str] = None, severity: Severity = Severity.REQUIRED, allow_requirement_check_override: bool = True, ) -> list[Profile]: # initialize the profiles list profiles = [] # calculate the list of profiles path as the subdirectories of the profiles path # where the profile specification file is present profiles_paths = cls.__load_profiles_paths__(profiles_path, extra_profiles_path) # iterate through the directories and load the profiles for root_profile_path, profile_path in profiles_paths: logger.debug( "Checking profile path: %s %s %r", profile_path, profile_path.is_dir(), IGNORED_PROFILE_DIRECTORIES, ) # check if the profile path is a directory and not in the ignored directories if profile_path.is_dir() and profile_path not in IGNORED_PROFILE_DIRECTORIES: profile = Profile.__load_profile_path__( root_profile_path, profile_path, publicID=publicID, severity=severity, ) # if the profile overrides another profile, # remove the overridden profiles from the list of profiles # to avoid duplicates and ensure that the most specific profile is used if profile.overrides: for overridden_profile in profile.overrides: if overridden_profile in profiles: profiles.remove(overridden_profile) # add the profile to the list of profiles profiles.append(profile) logger.debug("Loaded profile: %s (%s)", profile.identifier, profile.path) # order profiles based on the inheritance hierarchy, # from the most specific to the most general # (i.e., from the leaves of the graph to the root) profiles = sorted(profiles, reverse=True) # Check for overridden checks if not allow_requirement_check_override: # Navigate the profiles to check for overridden checks. # If the override is not enabled in the settings raise an error. profiles_checks = set() # Search for duplicated checks in the profiles for profile in profiles: profile_checks = [_ for r in profile.get_requirements() for _ in r.get_checks()] for check in profile_checks: # If the check is already present in the list of checks, # raise an error if the override is not enabled. if check in profiles_checks: raise DuplicateRequirementCheck(check.name, profile.identifier) # Add the check to the list of checks profiles_checks.add(check) # order profiles according to the number of profiles they depend on: # i.e, first the profiles that do not depend on any other profile # then the profiles that depend on the previous ones, and so on return sorted( profiles, key=lambda x: f"{len(x.inherited_profiles)}_{x.identifier}", )
[docs] @classmethod def get_by_identifier(cls, identifier: str) -> Profile: """ Get the profile with the given identifier :param identifier: the identifier :type identifier: str :return: the profile :rtype: Profile """ return cls.__profiles_map.get_by_index("identifier", identifier)
[docs] @classmethod def get_by_uri(cls, uri: str) -> Profile: """ Get the profile with the given URI :param uri: the URI :type uri: str :return: the profile :rtype: Profile """ return cls.__profiles_map.get_by_key(uri)
[docs] @classmethod def get_by_name(cls, name: str) -> list[Profile]: """ Get the profile with the given name :param name: the name :type name: str :return: the profile :rtype: Profile """ return cls.__profiles_map.get_by_index("name", name)
[docs] @classmethod def get_by_token(cls, token: str) -> Profile: """ Get the profile with the given token :param token: the token :type token: str :return: the profile :rtype: Profile """ return cls.__profiles_map.get_by_index("token", token)
[docs] @classmethod def get_sibling_profiles(cls, profile: Profile) -> list[Profile]: """ Get the sibling profiles of the given profile :param profile: the profile :type profile: Profile :return: the list of sibling profiles :rtype: list[Profile] """ return [p for p in cls.__profiles_map.values() if profile in p.parents]
[docs] @classmethod def get_descendants(cls, profile: Profile) -> list[Profile]: """ Get the transitive descendants of the given profile (any profile that has `profile` among its `inherited_profiles`). :param profile: the profile :type profile: Profile :return: the list of descendant profiles :rtype: list[Profile] """ return [p for p in cls.__profiles_map.values() if profile in p.inherited_profiles]
[docs] @classmethod def all(cls) -> list[Profile]: """ Get all the profiles :return: the list of profiles :rtype: list[Profile] """ return cls.__profiles_map.values()
[docs] @classmethod def find_in_list(cls, profiles: Collection[Profile], profile_identifier: str) -> Optional[Profile]: """ Find a profile with the given identifier in the given list of profiles :param profiles: the list of profiles :type profiles: Collection[Profile] :param identifier: the identifier :type identifier: str :return: the profile if found, None otherwise :rtype: Optional[Profile] """ profile = next((p for p in profiles if p.identifier == profile_identifier), None) or next( (p for p in profiles if str(p.identifier).replace(f"-{p.version}", "") == profile_identifier), None, ) if not profile: raise ProfileNotFound(profile_identifier) return profile
class SkipRequirementCheck(Exception): def __init__(self, check: RequirementCheck, message: str = ""): self.check = check self.message = message def __str__(self): return f"SkipRequirementCheck(check={self.check})"
[docs] @total_ordering class Requirement(ABC): """ Abstract class representing a requirement of a profile. A requirement is a named set of checks that can be used to validate an RO-Crate. """
[docs] def __init__( self, profile: Profile, name: str = "", description: Optional[str] = None, path: Optional[Path] = None, initialize_checks: bool = True, ): """ Initialize the Requirement instance :meta private: """ self._order_number: Optional[int] = None self._profile = profile self._description = description self._path = path # path of code implementing the requirement self._level_from_path = None self._checks: list[RequirementCheck] = [] self._overridden = None if not name and path: self._name = get_requirement_name_from_file(path) else: self._name = name # set flag to indicate if the checks have been initialized self._checks_initialized = False # initialize the checks if the flag is set if initialize_checks: _ = self.__init_checks__() # assign order numbers to checks self.__reorder_checks__() # update the checks initialized flag self._checks_initialized = True
@property def order_number(self) -> int: """ The order number of the requirement in the profile :return: the order number :rtype: int """ assert self._order_number is not None return self._order_number @property def identifier(self) -> str: """ The identifier of the requirement :return: the identifier :rtype: str """ return f"{self.profile.identifier}_{self.relative_identifier}" @property def relative_identifier(self) -> str: """ The relative identifier of the requirement :return: the relative identifier :rtype: str :meta private: """ return f"{self.order_number}" @property def name(self) -> str: return self._name @property def severity_from_path(self) -> Severity: return self.requirement_level_from_path.severity if self.requirement_level_from_path else None @property def requirement_level_from_path(self) -> RequirementLevel: if not self._level_from_path: try: self._level_from_path = LevelCollection.get(self._path.parent.name) except ValueError: logger.debug( "The requirement level could not be determined from the path: %s", self._path, ) return self._level_from_path @property def profile(self) -> Profile: return self._profile @property def description(self) -> str: if not self._description: self._description = ( self.__class__.__doc__.strip() if self.__class__.__doc__ else f"Profile Requirement {self.name}" ) return self._description @property def overridden(self) -> bool: # Check if the requirement has been overridden. # The requirement can be considered overridden if all its checks have been overridden if self._overridden is None: self._overridden = len([_ for _ in self._checks if not _.overridden]) == 0 return self._overridden @property @abstractmethod def hidden(self) -> bool: pass @property def path(self) -> Optional[Path]: return self._path @abstractmethod def __init_checks__(self) -> list[RequirementCheck]: pass def get_checks(self) -> list[RequirementCheck]: return self._checks.copy() def get_check(self, name: str) -> Optional[RequirementCheck]: for check in self._checks: if check.name == name: return check return None def get_checks_by_level(self, level: RequirementLevel) -> list[RequirementCheck]: return list({check for check in self._checks if check.level.severity == level.severity}) def __reorder_checks__(self) -> None: for i, check in enumerate(self._checks): check.order_number = i + 1 def _do_validate_(self, context: ValidationContext) -> bool: """ Internal method to perform the validation Returns whether all checks in this requirement passed. :meta private: """ logger.debug( "Validating Requirement %s with %s checks", self.name, len(self._checks), ) logger.debug( "Running %s checks for Requirement '%s'", len(self._checks), self.name, ) all_passed = True checks_to_perform = [ _ for _ in self._checks if not context.settings.skip_checks or _.identifier not in context.settings.skip_checks ] for check in checks_to_perform: try: if check.overridden and not check.requirement.profile.identifier == context.profile_identifier: logger.debug( "Skipping check '%s' because overridden by '%r'", check.identifier, [_.identifier for _ in check.overridden_by], ) continue if check.deactivated: logger.debug("Skipping check '%s' because deactivated", check.identifier) context.result._add_skipped_check(check) continue # Determine whether to skip event notification for inherited profiles skip_event_notify = False if ( check.requirement.profile.identifier != context.profile_identifier and context.settings.disable_inherited_profiles_issue_reporting ): logger.debug( "Inherited profiles reporting disabled. " "Skipping requirement %s as it belongs to an inherited profile %s", check.requirement.identifier, check.requirement.profile.identifier, ) skip_event_notify = True # Notify the start of the check execution if not skip_event_notify is set to True if not skip_event_notify: context.validator.notify( RequirementCheckValidationEvent(EventType.REQUIREMENT_CHECK_VALIDATION_START, check) ) # Execute the check check_result = check.execute_check(context) logger.debug("Result of check %s: %s", check.identifier, check_result) context.result._add_executed_check(check, check_result) # Notify the end of the check execution if not skip_event_notify is set to True if not skip_event_notify: context.validator.notify( RequirementCheckValidationEvent( EventType.REQUIREMENT_CHECK_VALIDATION_END, check, validation_result=check_result, ) ) logger.debug( "Ran check '%s'. Got result %s", check.identifier, check_result, ) # Ensure the check result is a boolean if not isinstance(check_result, bool): logger.warning( "Ignoring the check %s as it returned the value %r instead of a boolean", check.name, ) raise RuntimeError(f"Ignoring invalid result from check {check.name}") # Aggregate the check result all_passed = all_passed and check_result if not all_passed and context.fail_fast: break except SkipRequirementCheck as e: logger.debug("Skipping check '%s' because: %s", check.name, e) context.result._add_skipped_check(check) continue except Exception as e: # Ignore the fact that the check failed as far as the validation result is concerned. if context.maybe_warn_offline_cache_miss(e): logger.debug("Offline cache miss during check %s: %s", check, e) else: logger.warning("Unexpected error during check %s. Exception: %s", check, e) logger.warning("Consider reporting this as a bug.") if logger.isEnabledFor(logging.DEBUG): logger.exception(e) skipped_checks = set(self._checks) - set(checks_to_perform) context.result.skipped_checks.update(skipped_checks) logger.debug( "Checks for Requirement '%s' completed. Checks passed? %s", self.name, all_passed, ) return all_passed
[docs] def __eq__(self, other: object) -> bool: if not isinstance(other, Requirement): raise TypeError(f"Cannot compare {type(self)} with {type(other)}") return self.name == other.name and self.description == other.description and self.path == other.path
[docs] def __ne__(self, other: object) -> bool: return not self.__eq__(other)
[docs] def __hash__(self): return hash((self.name, self.description, self.path))
[docs] def __lt__(self, other: object) -> bool: if not isinstance(other, Requirement): raise ValueError(f"Cannot compare Requirement with {type(other)}") return (self._order_number, self.name) < ( other._order_number, other.name, )
[docs] def __repr__(self): return ( f"ProfileRequirement(" f"_order_number={self._order_number}, " f"name={self.name}, " f"description={self.description}" f", path={self.path}, " if self.path else ")" )
[docs] def __str__(self) -> str: return self.name
def to_dict(self, with_profile: bool = True, with_checks: bool = True) -> dict: result = { "identifier": self.identifier, "name": self.name, "description": self.description, "order": self.order_number, } if with_profile: result["profile"] = self.profile.to_dict() if with_checks: result["checks"] = [_.to_dict(with_requirement=False, with_profile=False) for _ in self._checks] return result @classmethod def initialize(cls, context: ValidationContext) -> None: logger.debug( "Starting %s requirement initialization for context %s", cls.__name__, context, ) # do initialization logic here (empty for now) logger.debug( "Completed %s requirement initialization for context %s", cls.__name__, context, ) @classmethod def finalize(cls, context: ValidationContext) -> None: logger.debug( "Starting %s requirement finalization for context %s", cls.__name__, context, ) # do finalization logic here (empty for now) logger.debug( "Completed %s requirement finalization for context %s", cls.__name__, context, )
class RequirementLoader: def __init__(self, profile: Profile): self._profile = profile @property def profile(self) -> Profile: return self._profile @staticmethod def __get_requirement_type__(requirement_path: Path) -> str: if requirement_path.suffix == ".py": return "python" elif requirement_path.suffix == ".ttl": return "shacl" else: raise ValueError(f"Unsupported requirement type: {requirement_path.suffix}") @classmethod def __get_requirement_loader__(cls, profile: Profile, requirement_path: Path) -> RequirementLoader: requirement_type = cls.__get_requirement_type__(requirement_path) loader_instance_name = f"_{requirement_type}_loader_instance" loader_instance = getattr(profile, loader_instance_name, None) if loader_instance is None: module_name = f"rocrate_validator.requirements.{requirement_type}" logger.debug("Loading module: %s", module_name) module = importlib.import_module(module_name) loader_class_name = f"{'Py' if requirement_type == 'python' else 'SHACL'}RequirementLoader" loader_class = getattr(module, loader_class_name) loader_instance = loader_class(profile) setattr(profile, loader_instance_name, loader_instance) return loader_instance @staticmethod def __get_requirement_classes__() -> list[Type[Requirement]]: # Ensure known requirement modules are imported so subclasses are registered. for requirement_type in ("python", "shacl"): module_name = f"rocrate_validator.requirements.{requirement_type}" try: importlib.import_module(module_name) except Exception: logger.debug( "Unable to import requirement module: %s", module_name, exc_info=True, ) def all_subclasses( base_class: Type[Requirement], ) -> list[Type[Requirement]]: result: list[Type[Requirement]] = [] for subcls in base_class.__subclasses__(): result.append(subcls) result.extend(all_subclasses(subcls)) return result return all_subclasses(Requirement) @staticmethod def load_requirements(profile: Profile, severity: Severity = None) -> list[Requirement]: """ Load the requirements related to the profile """ def ok_file(p: Path) -> bool: return ( p.is_file() and p.suffix in PROFILE_FILE_EXTENSIONS and not p.name == DEFAULT_ONTOLOGY_FILE and not p.name == PROFILE_SPECIFICATION_FILE and not p.name.startswith(".") and not p.name.startswith("_") ) files = sorted( (p for p in profile.path.rglob("*.*") if ok_file(p)), key=lambda x: (not x.suffix == ".py", x), ) # set the requirement level corresponding to the severity requirement_level = LevelCollection.get(severity.name) requirements = [] for requirement_path in files: try: requirement_level_from_path = LevelCollection.get(requirement_path.parent.name) if requirement_level_from_path < requirement_level: continue except ValueError: logger.debug( "The requirement level could not be determined from the path: %s", requirement_path, ) requirement_loader = RequirementLoader.__get_requirement_loader__(profile, requirement_path) for requirement in requirement_loader.load( profile, requirement_level, requirement_path, publicID=profile.publicID, ): requirements.append(requirement) # sort the requirements by severity requirements = sorted( requirements, key=lambda x: ( (-x.severity_from_path.value, x.path.name, x.name) if x.severity_from_path is not None else (0, x.path.name, x.name) ), reverse=False, ) # assign order numbers to requirements for i, requirement in enumerate(requirements): requirement._order_number = i + 1 # log and return the requirements logger.debug("Profile %s loaded %s requirements: %s", profile.identifier, len(requirements), requirements) return requirements @dataclass(frozen=True) class SourceSnippet: """ A snippet of source code backing a :class:`RequirementCheck`. :ivar language: language tag for syntax highlighting (e.g. ``"python"``, ``"turtle"``). :ivar code: the source code as text. :ivar source_path: path to the file the snippet was extracted from, when available. """ language: str code: str source_path: Optional[Path] = None
[docs] @total_ordering class RequirementCheck(ABC): def __init__( self, requirement: Requirement, name: str, level: Optional[RequirementLevel] = LevelCollection.REQUIRED, description: Optional[str] = None, hidden: Optional[bool] = None, deactivated: bool = False, ): self._requirement: Requirement = requirement self._order_number = 0 self._name = name self._level = level self._description = description self._hidden = hidden self._deactivated = deactivated @property def order_number(self) -> int: return self._order_number @order_number.setter def order_number(self, value: int) -> None: if value < 0: raise ValueError("order_number can't be < 0") self._order_number = value @property def identifier(self) -> str: return f"{self.requirement.identifier}.{self.order_number}" @property def relative_identifier(self) -> str: return f"{self.level.name} {self.requirement.relative_identifier}.{self.order_number}" @property def name(self) -> str: if not self._name: return self.__class__.__name__.replace("Check", "") return self._name @property def description(self) -> str: if not self._description: return self.__class__.__doc__.strip() if self.__class__.__doc__ else f"Check {self.name}" return self._description @property def requirement(self) -> Requirement: return self._requirement @property def level(self) -> RequirementLevel: return self._level or self.requirement.requirement_level_from_path or LevelCollection.REQUIRED @property def severity(self) -> Severity: return self.level.severity @property def overridden_by(self) -> list[RequirementCheck]: overridden_by = [] for sibling_profile in self.requirement.profile.siblings: check = sibling_profile.get_requirement_check(self.name) if check: overridden_by.append(check) return overridden_by @property def overrides(self) -> list[RequirementCheck]: overrides = [] for parent in self.requirement.profile.parents: check = parent.get_requirement_check(self.name) if check: overrides.append(check) return overrides @property def overridden(self) -> bool: return len(self.overridden_by) > 0 @property def deactivated(self) -> bool: return self._deactivated @property def hidden(self) -> bool: if self._hidden is not None: return self._hidden return self.requirement.hidden @abstractmethod def execute_check(self, context: ValidationContext) -> bool: raise NotImplementedError()
[docs] def get_source_snippet(self) -> Optional[SourceSnippet]: """ Return the source code that implements this check, or ``None`` if the backing source cannot be extracted for this check kind. Concrete subclasses should override this method. """ return None
def to_dict(self, with_requirement: bool = True, with_profile: bool = True) -> dict: result = { "identifier": self.identifier, "label": self.relative_identifier, "order": self.order_number, "name": self.name, "description": self.description, "severity": self.severity.name, } if with_requirement: result["requirement"] = self.requirement.to_dict(with_profile=with_profile, with_checks=False) return result def __eq__(self, other: object) -> bool: if not isinstance(other, RequirementCheck): raise ValueError(f"Cannot compare RequirementCheck with {type(other)}") return self.requirement == other.requirement and self.name == other.name def __lt__(self, other: object) -> bool: if not isinstance(other, RequirementCheck): raise ValueError(f"Cannot compare RequirementCheck with {type(other)}") return (self.requirement, self.identifier) < ( other.requirement, other.identifier, ) def __ne__(self, other: object) -> bool: return not self.__eq__(other) def __hash__(self) -> int: return hash((self.requirement, self.name or ""))
[docs] @total_ordering class CheckIssue: """ Represents an issue with a check that has been executed during the validation process. """ def __init__( self, check: RequirementCheck, message: Optional[str] = None, violatingProperty: Optional[str] = None, violatingEntity: Optional[str] = None, value: Optional[str] = None, ): self._message = message self._check: RequirementCheck = check self._violatingProperty = violatingProperty self._violatingEntity = violatingEntity self._propertyValue = value @property def message(self) -> Optional[str]: """The message associated with the issue""" return self._message @property def level(self) -> RequirementLevel: """The level of the issue""" return self._check.level @property def severity(self) -> Severity: """Severity of the RequirementLevel associated with this check.""" return self._check.severity @property def level_name(self) -> str: return self.level.name @property def check(self) -> RequirementCheck: """The check that generated the issue""" return self._check @property def violatingEntity(self) -> Optional[str]: """ It represents the specific element being evaluated that fails to meet the defined rules or constraints within a validation process. Also referred to as `focusNode` in SHACL terminology in the context of an RDF graph, it is the subject of a triple that violates a given constraint on the subject’s property/predicate, represented by the violatingProperty. """ return self._violatingEntity @property def violatingProperty(self) -> Optional[str]: """ It refers to the specific property or relationship within an item that leads to a validation failure. It identifies the part of the data structure that is causing the issue. Also referred to as `resultPath` in SHACL terminology, in the context of an RDF graph, it is the predicate of a triple that violates a given constraint on the subject’s property/predicate, represented by the violatingProperty. """ return self._violatingProperty @property def violatingPropertyValue(self) -> Optional[str]: """ It represents the value of the violatingProperty that leads to a validation failure. """ return self._propertyValue def __eq__(self, other: object) -> bool: return isinstance(other, CheckIssue) and self._check == other._check and self._message == other._message def __lt__(self, other: object) -> bool: if not isinstance(other, CheckIssue): raise TypeError(f"Cannot compare {type(self)} with {type(other)}") return (self._check, self._message) < (other._check, other._message) def __hash__(self) -> int: return hash((self._check, self._message)) def __repr__(self) -> str: return f"CheckIssue(severity={self.severity}, check={self.check}, message={self.message})" def __str__(self) -> str: return f'Issue of severity {self.severity.name} with check "{self.check.identifier}": {self.message}' def to_dict( self, with_check: bool = True, with_requirement: bool = True, with_profile: bool = True, ) -> dict: result = { "severity": self.severity.name, "message": self.message, "violatingEntity": self.violatingEntity, "violatingProperty": self.violatingProperty, "violatingPropertyValue": self.violatingPropertyValue, } if with_check: result["check"] = self.check.to_dict(with_requirement=with_requirement, with_profile=with_profile) return result def to_json( self, with_checks: bool = True, with_requirements: bool = True, with_profile: bool = True, ) -> str: return json.dumps( self.to_dict( with_check=with_checks, with_requirement=with_requirements, with_profile=with_profile, ), indent=4, cls=CustomEncoder, )
class ValidationStatisticsListener(Protocol): """ Protocol for listeners interested in validation statistics updates. """ def on_statistics_updated(self, statistics: ValidationStatistics): logger.debug("Statistics updated: %r", statistics.statistics) class ValidationStatistics(Subscriber): """ Computes and stores statistical metrics about the RO-Crate validation process. """ def __init__( self, settings: Union[dict, ValidationSettings], context: Optional[ValidationContext] = None, skip_initialization: bool = False, ): if isinstance(settings, dict): settings = ValidationSettings.parse(settings) self._settings = settings self._context = context self._stats = self.__initialise__(settings) if not skip_initialization else {} self._result: Optional[ValidationResult] = None self._listeners = [] # self._target_profile: Optional[Profile] = None @property def validation_settings(self) -> ValidationSettings: """ Get the validation settings used for statistics computation """ return self._settings @property def validation_result(self) -> Optional[ValidationResult]: """ Get the validation result """ return self._result def add_listener(self, listener: ValidationStatisticsListener): """ Add a listener to be notified on statistics updates """ self._listeners.append(listener) logger.debug("Listener added: %r", listener) def notify_listeners(self): """ Notify all registered listeners about statistics updates """ for listener in self._listeners: listener.on_statistics_updated(self) logger.debug("Notified listener: %r", listener) @property def statistics(self) -> dict: """ Get the computed validation statistics """ return self._stats.copy() @property def profile(self) -> Profile: """ Get the profile being validated """ return self._stats.get("profile") @property def profiles(self) -> list[Profile]: """ Get all profiles involved in validation """ return self._stats.get("profiles", []) @property def severity(self) -> Severity: """ Get the validation severity level """ return self._stats.get("severity") @property def checks_by_severity(self) -> dict: """ Get the checks grouped by severity """ return self._stats.get("checks_by_severity", {}) @property def check_count_by_severity(self) -> dict: """ Get the count of checks grouped by severity """ return {k: len(v) for k, v in self._stats.get("checks_by_severity", {}).items()} @property def requirements(self) -> list[Requirement]: """ Get all requirements being validated """ return self._stats.get("requirements", []) @property def passed_requirements(self) -> list[Requirement]: """ Get the list of passed requirements """ return self._stats.get("passed_requirements", []) @property def failed_requirements(self) -> list[Requirement]: """ Get the list of failed requirements """ return self._stats.get("failed_requirements", []) @property def total_requirements(self) -> int: """ Get the total number of requirements """ return len(self._stats.get("requirements", [])) @property def checks(self) -> list[RequirementCheck]: """ Get all checks being validated """ return self._stats.get("checks", []) @property def passed_checks(self) -> list[RequirementCheck]: """ Get the list of passed checks """ return self._stats.get("passed_checks", []) @property def failed_checks(self) -> list[RequirementCheck]: """ Get the list of failed checks """ return self._stats.get("failed_checks", []) @property def total_checks(self) -> int: """ Get the total number of checks """ return len(self._stats.get("checks", [])) @property def validated_profiles(self) -> list[Profile]: """ Get the list of validated profiles """ return self._stats.get("validated_profiles", []) @property def validated_requirements(self) -> list[Requirement]: """ Get the list of validated requirements """ return self._stats.get("validated_requirements", []) @property def validated_checks(self) -> list[RequirementCheck]: """ Get the list of validated checks """ return self._stats.get("validated_checks", []) @property def started_at(self) -> Optional[datetime]: """ Get the timestamp when validation started """ return self._stats.get("started_at") @property def finished_at(self) -> Optional[datetime]: """ Get the timestamp when validation finished """ return self._stats.get("finished_at") @property def duration(self) -> Optional[float]: """ Get the duration of the validation process in seconds """ started_at = self.started_at finished_at = self.finished_at if started_at and finished_at: return (finished_at - started_at).total_seconds() return None @classmethod def __initialise__(cls, validation_settings: ValidationSettings): """ Compute the statistics of the profile """ # extract the validation settings severity_validation = validation_settings.requirement_severity profiles: list[Profile] = Profile.load_profiles( validation_settings.profiles_path, extra_profiles_path=validation_settings.extra_profiles_path, severity=severity_validation, allow_requirement_check_override=validation_settings.allow_requirement_check_override, ) profile: Profile = Profile.find_in_list(profiles, validation_settings.profile_identifier) target_profile_identifier = profile.identifier # initialize the profiles list profiles = [profile] # add inherited profiles if enabled if not validation_settings.disable_inherited_profiles_issue_reporting: profiles.extend(profile.inherited_profiles) logger.debug("Inherited profiles: %r", profile.inherited_profiles) # Initialize the counters checks_by_severity = {} checks: set[RequirementCheck] = set() requirements: set[Requirement] = set() # Initialize the counters for severity in ( Severity.REQUIRED, Severity.RECOMMENDED, Severity.OPTIONAL, ): checks_by_severity[severity] = set() # Process the requirements and checks processed_requirements = [] for profile in profiles: for requirement in profile.requirements: if requirement in processed_requirements: continue processed_requirements.append(requirement) if requirement.hidden: continue requirement_checks_count = 0 for severity in ( Severity.REQUIRED, Severity.RECOMMENDED, Severity.OPTIONAL, ): logger.debug( f"Checking requirement: {requirement} severity: {severity} {severity < severity_validation}" ) # skip requirements with lower severity if severity < severity_validation: continue # count the checks requirement_checks = [ _ for _ in requirement.get_checks_by_level(LevelCollection.get(severity.name)) if (not validation_settings.skip_checks or _.identifier not in validation_settings.skip_checks) and (not _.overridden or _.requirement.profile.identifier == target_profile_identifier) ] num_checks = len(requirement_checks) requirement_checks_count += num_checks if num_checks > 0: logger.debug(f"Requirement: {requirement} has {num_checks} checks of severity: {severity}") checks.update(requirement_checks) checks_by_severity[severity].update(requirement_checks) # count the requirements and checks if requirement_checks_count == 0: logger.debug(f"No checks for requirement: {requirement}") else: # Only if there are checks for the requirement count it logger.debug(f"Requirement: {requirement} checks count: {requirement_checks_count}") assert not requirement.hidden, "Hidden requirements should not be counted" # add the requirement to the list requirements.add(requirement) # log processed requirements logger.debug( "Processed requirements %r: %r", len(processed_requirements), processed_requirements, ) # Prepare the result result = { "profile": profile, "profiles": profiles, "requirements": requirements, "checks": checks, "severity": severity_validation, "checks_by_severity": checks_by_severity, "failed_requirements": [], "failed_checks": [], "passed_requirements": [], "passed_checks": [], "started_at": None, "finished_at": None, "validated_profiles": [], "validated_requirements": [], "validated_checks": [], } logger.debug(result) return result def update(self, event: Event, ctx: Optional[ValidationContext] = None) -> None: # logger.debug("Event: %s", event.event_type) if event.event_type == EventType.VALIDATION_START: logger.debug("Validation started") self._stats["started_at"] = datetime.now(timezone.utc) if event.event_type == EventType.PROFILE_VALIDATION_START: logger.debug("Profile validation start: %s", event.profile.identifier) elif event.event_type == EventType.REQUIREMENT_VALIDATION_START: logger.debug("Requirement validation start") elif event.event_type == EventType.REQUIREMENT_CHECK_VALIDATION_START: logger.debug("Requirement check validation start") elif event.event_type == EventType.REQUIREMENT_CHECK_VALIDATION_END: target_profile = ctx.target_validation_profile if not event.requirement_check.requirement.hidden and ( not event.requirement_check.overridden or target_profile.identifier == event.requirement_check.requirement.profile.identifier ): if event.validation_result is not None: if event.validation_result: self._stats["passed_checks"].append(event.requirement_check) else: self._stats["failed_checks"].append(event.requirement_check) self._stats["validated_checks"].append(event.requirement_check) self.notify_listeners() else: logger.debug( "Requirement check validation result is None: %s", event.requirement_check.identifier, ) else: logger.debug( "Skipping requirement check validation: %s", event.requirement_check.identifier, ) elif event.event_type == EventType.REQUIREMENT_VALIDATION_END: if not event.requirement.hidden: if event.validation_result: self._stats["passed_requirements"].append(event.requirement) else: self._stats["failed_requirements"].append(event.requirement) self._stats["validated_requirements"].append(event.requirement) self.notify_listeners() elif event.event_type == EventType.PROFILE_VALIDATION_END: self._stats["validated_profiles"].append(event.profile) logger.debug("Profile validation ended: %s", event.profile.identifier) elif event.event_type == EventType.VALIDATION_END: self._result = event.validation_result self._stats["finished_at"] = datetime.now(timezone.utc) logger.debug("Validation ended with result: %s", event.validation_result) def to_dict(self) -> dict: """ Get the computed validation statistics as a dictionary """ return { # Execution time details "started_at": self.started_at.isoformat() if self.started_at else None, "finished_at": self.finished_at.isoformat() if self.finished_at else None, "duration": self.duration, # Profile details "profile": self.profile.identifier if self.profile else None, "profiles": [p.identifier for p in self.profiles], "severity": self.severity.name if self.severity else None, # Computed totals "total_requirements": self.total_requirements, "total_passed_requirements": len(self.passed_requirements), "total_failed_requirements": len(self.failed_requirements), "total_checks": self.total_checks, "total_passed_checks": len(self.passed_checks), "total_failed_checks": len(self.failed_checks), "total_checks_by_severity": {k.name: len(v) for k, v in self.checks_by_severity.items()}, # Requirements involved "requirements": { "count": self.total_requirements, "passed": { "count": len(self.passed_requirements), "percentage": ( (len(self.passed_requirements) / self.total_requirements * 100) if self.total_requirements > 0 else 0.0 ), "identifiers": sorted([r.identifier for r in self.passed_requirements]), }, "failed": { "count": len(self.failed_requirements), "percentage": ( (len(self.failed_requirements) / self.total_requirements * 100) if self.total_requirements > 0 else 0.0 ), "identifiers": sorted([r.identifier for r in self.failed_requirements]), }, "identifiers": sorted([r.identifier for r in self.requirements]), }, # Checks involved "checks": { "count": self.total_checks, "passed": { "count": len(self.passed_checks), "percentage": (len(self.passed_checks) / self.total_checks * 100) if self.total_checks > 0 else 0.0, "identifiers": sorted([c.identifier for c in self.passed_checks]), }, "failed": { "count": len(self.failed_checks), "percentage": (len(self.failed_checks) / self.total_checks * 100) if self.total_checks > 0 else 0.0, "identifiers": sorted([c.identifier for c in self.failed_checks]), }, "identifiers": sorted([c.identifier for c in self.checks]), "by_severity": {k.name: len(v) for k, v in self._stats.get("checks_by_severity", {}).items()}, }, } def to_json(self) -> str: """ Get the computed validation statistics as a JSON string """ return json.dumps(self.to_dict(), indent=4, cls=CustomEncoder) class AggregatedValidationStatistics: """ Represents aggregated validation statistics from multiple validation runs. """ def __init__(self, statistics_list: list[ValidationStatistics]): if not statistics_list: raise ValueError("statistics_list cannot be empty") # Store the individual statistics self._statistics_list = statistics_list # Aggregate the statistics self._overall_stats = self.__compute_averall_stats__() @property def individual_statistics(self) -> list[ValidationStatistics]: """ Get the individual validation statistics """ return self._statistics_list def to_dict(self) -> dict: """ Get the overall aggregated statistics as a dictionary """ return { # Execution time details "started_at": self.started_at.isoformat() if self.started_at else None, "finished_at": self.finished_at.isoformat() if self.finished_at else None, "duration": self.duration, # Profiles involved "profiles": [p.identifier for p in self.profiles], # Computed totals "total_requirements": self.total_requirements, "total_passed_requirements": len(self.passed_requirements), "total_failed_requirements": len(self.failed_requirements), "total_checks": self.total_checks, "total_passed_checks": len(self.passed_checks), "total_failed_checks": len(self.failed_checks), "total_checks_by_severity": {k.name: len(v) for k, v in self.checks_by_severity.items()}, # Requirements involved "requirements": { "count": self.total_requirements, "passed": { "count": len(self.passed_requirements), "percentage": ( (len(self.passed_requirements) / self.total_requirements * 100) if self.total_requirements > 0 else 0.0 ), "identifiers": [r.identifier for r in self.passed_requirements], }, "failed": { "count": len(self.failed_requirements), "percentage": ( (len(self.failed_requirements) / self.total_requirements * 100) if self.total_requirements > 0 else 0.0 ), "identifiers": [r.identifier for r in self.failed_requirements], }, "identifiers": [r.identifier for r in self.requirements], }, # Checks involved "checks": { "count": self.total_checks, "passed": { "count": len(self.passed_checks), "percentage": (len(self.passed_checks) / self.total_checks * 100) if self.total_checks > 0 else 0.0, "identifiers": [c.identifier for c in self.passed_checks], }, "failed": { "count": len(self.failed_checks), "percentage": (len(self.failed_checks) / self.total_checks * 100) if self.total_checks > 0 else 0.0, "identifiers": [c.identifier for c in self.failed_checks], }, "identifiers": [c.identifier for c in self.checks], }, } @property def profiles(self) -> set[Profile]: """ Get the set of profiles involved in the aggregated validation """ return self._overall_stats.get("profiles", set()) @property def total_profiles(self) -> int: """ Get the total number of profiles involved in the aggregated validation """ return len(self._overall_stats.get("profiles", set())) @property def requirements(self) -> set[Requirement]: """ Get the set of requirements in the aggregated validation """ return self._overall_stats.get("requirements", set()) @property def passed_requirements(self) -> set[Requirement]: """ Get the set of passed requirements in the aggregated validation """ return self._overall_stats.get("passed_requirements", set()) @property def failed_requirements(self) -> set[Requirement]: """ Get the set of failed requirements in the aggregated validation """ return self._overall_stats.get("failed_requirements", set()) @property def total_requirements(self) -> int: """ Get the total number of requirements in the aggregated validation """ return len(self._overall_stats.get("requirements", set())) @property def checks(self) -> set[RequirementCheck]: """ Get the set of checks in the aggregated validation """ return self._overall_stats.get("checks", set()) @property def checks_by_severity(self) -> dict: """ Get the checks grouped by severity in the aggregated validation """ return self._overall_stats.get("checks_by_severity", {}) @property def total_checks(self) -> int: """ Get the total number of checks in the aggregated validation """ return len(self._overall_stats.get("checks", set())) @property def passed_checks(self) -> set[RequirementCheck]: """ Get the set of passed checks in the aggregated validation """ return self._overall_stats.get("passed_checks", set()) @property def failed_checks(self) -> set[RequirementCheck]: """ Get the set of failed checks in the aggregated validation """ return self._overall_stats.get("failed_checks", set()) @property def started_at(self) -> Optional[datetime]: """ Get the timestamp when the aggregated validation started """ return self._overall_stats.get("started_at") @property def finished_at(self) -> Optional[datetime]: """ Get the timestamp when the aggregated validation finished """ return self._overall_stats.get("finished_at") @property def duration(self) -> float: """ Get the total duration of the aggregated validation in seconds """ return self._overall_stats.get("duration", 0.0) def __compute_averall_stats__(self): """ Compute the overall aggregated statistics """ # Initialize the overall statistics result = { "profiles": set(), "requirements": set(), "checks": set(), "checks_by_severity": {}, "failed_requirements": set(), "failed_checks": set(), "passed_requirements": set(), "passed_checks": set(), "started_at": None, "finished_at": None, "duration": 0.0, } # Aggregate statistics from each ValidationStatistics instance for stats in self._statistics_list: # Aggregate profiles for profile in stats.profiles: result["profiles"].add(profile) # Aggregate total requirements and checks result["requirements"].update(stats.requirements) result["checks"].update(stats.checks) result["checks_by_severity"].update(stats.checks_by_severity) # Aggregate failed and passed requirements and checks result["failed_requirements"].update(stats.failed_requirements) result["failed_checks"].update(stats.failed_checks) result["passed_requirements"].update(stats.passed_requirements) result["passed_checks"].update(stats.passed_checks) # Aggregate started_at and finished_at result["started_at"] = ( min(result["started_at"], stats.started_at) if result["started_at"] else stats.started_at ) result["finished_at"] = ( max(result["finished_at"], stats.finished_at) if result["finished_at"] else stats.finished_at ) # Aggregate duration result["duration"] += stats.duration or 0.0 # Sort the sets to have consistent order result["profiles"] = sorted(result["profiles"], key=lambda p: p.identifier) result["requirements"] = sorted(result["requirements"], key=lambda r: r.identifier) result["checks"] = sorted(result["checks"], key=lambda c: c.identifier) result["checks_by_severity"] = { k: sorted(v, key=lambda c: c.identifier) for k, v in result["checks_by_severity"].items() } result["failed_requirements"] = sorted(result["failed_requirements"], key=lambda r: r.identifier) result["failed_checks"] = sorted(result["failed_checks"], key=lambda c: c.identifier) result["passed_requirements"] = sorted(result["passed_requirements"], key=lambda r: r.identifier) result["passed_checks"] = sorted(result["passed_checks"], key=lambda c: c.identifier) # return the aggregated statistics return result
[docs] class ValidationResult: """ Represents the result of a validation. :param context: The validation context :type context: ValidationContext :param rocrate_uri: The URI of the RO-Crate :type rocrate_uri: str :param validation_settings: The validation settings :type validation_settings: ValidationSettings :param issues: The issues found during the validation :type issues: list[CheckIssue] """ def __init__(self, context: ValidationContext): # reference to the validation context self._context = context # reference to the ro-crate URI self._rocrate_uri = context.rocrate_uri # reference to the validation settings self._validation_settings: ValidationSettings = context.settings # keep track of the issues found during the validation self._issues: list[CheckIssue] = [] # keep track of the checks that have been executed self._executed_checks: set[RequirementCheck] = set() self._executed_checks_results: dict[str, bool] = {} # keep track of the checks that have been skipped self._skipped_checks: set[RequirementCheck] = set() # initialize the statistics self._statistics = ValidationStatistics(context.settings) @property def context(self) -> ValidationContext: """ The validation context """ return self._context @property def rocrate_uri(self): """ The URI of the RO-Crate """ return self._rocrate_uri @property def validation_settings(self): """ The validation settings """ return self._validation_settings @property def statistics(self) -> ValidationStatistics: """ The validation statistics """ return self._statistics # --- Checks --- @property def executed_checks(self) -> set[RequirementCheck]: """ The checks that have been executed """ return self._executed_checks def _add_executed_check(self, check: RequirementCheck, result: bool): """ Internal method to add a check to the executed checks """ self._executed_checks.add(check) self._executed_checks_results[check.identifier] = result # remove the check from the skipped checks if it was skipped if check in self._skipped_checks: self._skipped_checks.remove(check) logger.debug("Removing check '%s' from skipped checks", check.name)
[docs] def get_executed_check_result(self, check: RequirementCheck) -> Optional[bool]: """ Get the result of an executed check """ return self._executed_checks_results.get(check.identifier)
@property def skipped_checks(self) -> set[RequirementCheck]: """ The checks that have been skipped """ return self._skipped_checks def _add_skipped_check(self, check: RequirementCheck): """ Internal method to add a check to the skipped checks """ self._skipped_checks.add(check) def _remove_skipped_check(self, check: RequirementCheck): """ Internal method to remove a check from the skipped checks """ self._skipped_checks.remove(check) # --- Issues --- @property def issues(self) -> list[CheckIssue]: """ The issues found during the validation """ return self._issues.copy()
[docs] def get_issues(self, min_severity: Optional[Severity] = None) -> list[CheckIssue]: """ Get the issues found during the validation with a severity greater than or equal to `min_severity` """ min_severity = min_severity or self.context.requirement_severity return [issue for issue in self._issues if issue.severity >= min_severity]
[docs] def get_issues_by_check(self, check: RequirementCheck, min_severity: Severity = None) -> list[CheckIssue]: """ Get the issues found during the validation for a specific check with a severity greater than or equal to `min_severity` """ min_severity = min_severity or self.context.requirement_severity return [issue for issue in self._issues if issue.check == check and issue.severity >= min_severity]
# def get_issues_by_check_and_severity(self, check: RequirementCheck, severity: Severity) -> list[CheckIssue]: # return [issue for issue in self.issues if issue.check == check and issue.severity == severity]
[docs] def has_issues(self, min_severity: Optional[Severity] = None) -> bool: """ Check if there are issues with a severity greater than or equal to the given `severity` """ min_severity = min_severity or self.context.requirement_severity return any(issue.severity >= min_severity for issue in self._issues)
[docs] def passed(self, min_severity: Optional[Severity] = None) -> bool: """ Check if all checks passed with a severity greater than or equal to the given `severity` """ min_severity = min_severity or self.context.requirement_severity return not any(issue.severity >= min_severity for issue in self._issues)
[docs] def add_issue( self, message: str, check: RequirementCheck, violatingEntity: Optional[str] = None, violatingProperty: Optional[str] = None, violatingPropertyValue: Optional[str] = None, ) -> CheckIssue: """ Add an issue to the validation result Parameters: message(str): The message of the issue check(RequirementCheck): The check that generated the issue violatingEntity(Optional[str]): The entity that caused the issue (if any) violatingProperty(Optional[str]): The property that caused the issue (if any) violatingPropertyValue(Optional[str]): The value of the violatingProperty (if any) """ c = CheckIssue( check, message, violatingProperty=violatingProperty, violatingEntity=violatingEntity, value=violatingPropertyValue, ) bisect.insort(self._issues, c) return c
# --- Requirements --- @property def failed_requirements(self) -> Collection[Requirement]: """ Get the requirements that failed at or above the configured `requirement_severity`. """ min_severity = self.context.requirement_severity return set(issue.check.requirement for issue in self._issues if issue.severity >= min_severity) # --- Checks --- @property def failed_checks(self) -> Collection[RequirementCheck]: """ Get the checks that failed at or above the configured `requirement_severity`. """ min_severity = self.context.requirement_severity return set(issue.check for issue in self._issues if issue.severity >= min_severity)
[docs] def get_failed_checks_by_requirement(self, requirement: Requirement) -> Collection[RequirementCheck]: """ Get the checks that failed for a specific requirement """ return [check for check in self.failed_checks if check.requirement == requirement]
[docs] def get_failed_checks_by_requirement_and_severity( self, requirement: Requirement, severity: Severity ) -> Collection[RequirementCheck]: """ Get the checks that failed for a specific requirement and severity """ return [ check for check in self.failed_checks if check.requirement == requirement and check.severity == severity ]
def __str__(self) -> str: return f"Validation result: passed={len(self.failed_checks) == 0}, {len(self._issues)} issues" def __repr__(self): return f"ValidationResult(passed={len(self.failed_checks) == 0},issues={self._issues})" def __eq__(self, other: object) -> bool: if not isinstance(other, ValidationResult): raise TypeError(f"Cannot compare ValidationResult with {type(other)}") return self._issues == other._issues
[docs] def to_dict(self) -> dict: """ Convert the ValidationResult to a dictionary """ allowed_properties = [ "profile_identifier", "enable_profile_inheritance", "requirement_severity", "abort_on_first", ] validation_settings = { key: value for key, value in self.validation_settings.to_dict().items() if key in allowed_properties } result = { "meta": {"version": JSON_OUTPUT_FORMAT_VERSION}, "validation_settings": validation_settings, "passed": self.passed(self.context.settings.requirement_severity), "issues": [issue.to_dict() for issue in self.issues], } # add validator version to the settings result["validation_settings"]["rocrate_validator_version"] = __version__ return result
[docs] def to_json(self, path: Optional[Path] = None) -> str: """ Convert the ValidationResult to a JSON string """ result = json.dumps(self.to_dict(), indent=4, cls=CustomEncoder) if path: with open(path, "w") as f: f.write(result) return result
class CustomEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, CheckIssue): return obj.__dict__ if isinstance(obj, Path): return str(obj) if isinstance(obj, Severity): return obj.name if isinstance(obj, RequirementCheck): return obj.identifier if isinstance(obj, Requirement): return obj.identifier if isinstance(obj, RequirementLevel): return obj.name return super().default(obj)
[docs] @dataclass class ValidationSettings: """ Represents the settings for RO-Crate validation. It includes the following attributes: """ #: The URI of the RO-Crate rocrate_uri: URI #: The relative root path of the RO-Crate rocrate_relative_root_path: Optional[Path] = None # Profile settings #: The path to the profiles profiles_path: Path = DEFAULT_PROFILES_PATH #: The path to the extra profiles extra_profiles_path: Optional[Path] = None #: The profile identifier to validate against profile_identifier: str = DEFAULT_PROFILE_IDENTIFIER #: Flag to enable profile inheritance # Use the `enable_profile_inheritance` flag with caution: disable inheritance only if the # target validation profile is fully self-contained and does not rely on definitions # from inherited profiles (e.g., entities defined upstream). For modularization # purposes, some base entities and properties are defined in the base RO-Crate # profile and are intentionally not redefined in specialized profiles; they are # required for validations targeting those specializations and therefore cannot be skipped. # Nevertheless, the validator can still suppress issue reporting for checks defined # in inherited profiles by setting disable_inherited_profiles_issue_reporting to `True`. enable_profile_inheritance: bool = True # Validation settings #: Flag to abort on first error abort_on_first: Optional[bool] = False #: Flag to disable reporting of issues related to inherited profiles disable_inherited_profiles_issue_reporting: bool = False #: Flag to disable remote crate download disable_remote_crate_download: bool = True # Requirement settings #: The requirement severity requirement_severity: Union[str, Severity] = Severity.REQUIRED #: Flag to validate requirement severity only skipping check with lower or higher severity requirement_severity_only: bool = False # Requirement check settings #: Flag to allow requirement check override allow_requirement_check_override: bool = True #: Flag to disable the check for duplicates disable_check_for_duplicates: bool = False #: Checks to skip skip_checks: list[str] = None #: Flag to validate only the metadata of the RO-Crate metadata_only: bool = False #: RO-Crate metadata as dictionary metadata_dict: dict = None #: Verbose output verbose: bool = False #: Cache max age in seconds (negative values mean "never expire") cache_max_age: Optional[int] = DEFAULT_HTTP_CACHE_MAX_AGE #: Cache path cache_path: Optional[Path] = None #: Flag to enable offline mode: HTTP requests are served only from the cache offline: bool = False #: Flag to disable the HTTP cache entirely: every request hits the network no_cache: bool = False def __post_init__(self): # if requirement_severity is a str, convert to Severity if isinstance(self.requirement_severity, str): self.requirement_severity = Severity[self.requirement_severity] # Offline mode needs the cache to serve responses, so it cannot be # combined with an explicit cache disable. if self.offline and self.no_cache: raise ValueError( "Offline mode requires the HTTP cache to be enabled; " "no_cache=True is incompatible with offline=True." ) # Default to the persistent user cache whenever caching is enabled so that # consecutive runs (online then offline) share the same HTTP cache: this # is what lets the offline mode find the resources fetched online. if self.cache_path is None and not self.no_cache: default_path = get_default_http_cache_path() default_path.parent.mkdir(parents=True, exist_ok=True) self.cache_path = default_path logger.debug("Cache path not set: defaulting to persistent user cache %s", self.cache_path) if self.offline and self.cache_path is None: logger.warning( "Offline mode enabled without a persistent cache path: " "all HTTP-backed resources will fail unless pre-populated." ) # Reset any previously initialized singleton so new settings take effect. HttpRequester.reset() # initialize the HTTP cache HttpRequester.initialize_cache( cache_path=str(self.cache_path) if self.cache_path is not None else None, cache_max_age=self.cache_max_age, offline=self.offline, no_cache=self.no_cache, ) logger.debug( "HTTP cache initialized at %s with max age %s seconds (offline=%s, no_cache=%s)", self.cache_path, self.cache_max_age, self.offline, self.no_cache, ) # Install the JSON-LD document loader so context resolution goes through the cache. try: install_document_loader() except Exception as e: logger.debug("Could not install JSON-LD document loader: %s", e) # Best-effort synchronous warm-up of profile-declared URLs. if not self.offline: try: auto_warm_up_for_settings(self) except Exception as e: logger.debug("Auto warm-up skipped: %s", e)
[docs] def to_dict(self): """ Convert the ValidationSettings to a dictionary """ result = asdict(self) result["rocrate_uri"] = str(self.rocrate_uri) result.pop("metadata_dict", None) # exclude metadata_dict from the dict representation # Remove disable_crate_download from the dict representation result.pop("disable_remote_crate_download", None) # Remove requirement_severity_only from the dict representation result.pop("requirement_severity_only", None) return result
@property def rocrate_uri(self) -> Optional[URI]: """ Get the RO-Crate URI :return: The RO-Crate URI :rtype: URI """ return self._rocrate_uri @rocrate_uri.setter def rocrate_uri(self, value: URI): """ Set the RO-Crate URI. :param value: The RO-Crate URI. :type value: URI """ if not value: raise ValueError("Invalid RO-Crate URI") self._rocrate_uri: URI = URI(value)
[docs] @classmethod def parse(cls, settings: Union[dict, ValidationSettings]) -> ValidationSettings: """ Parse the settings to a ValidationSettings object. :param settings: The settings to parse. :type settings: Union[dict, ValidationSettings] :return: The parsed settings. :rtype: ValidationSettings :raises ValueError: If the settings type is invalid. """ if isinstance(settings, dict): return cls(**settings) elif isinstance(settings, ValidationSettings): return settings else: raise ValueError(f"Invalid settings type: {type(settings)}")
class ValidationEvent(Event): def __init__( self, event_type: EventType, validation_result: Optional[ValidationResult] = None, message: Optional[str] = None, ): super().__init__(event_type, message) self._validation_result = validation_result @property def validation_result(self) -> Optional[ValidationResult]: return self._validation_result class ProfileValidationEvent(Event): def __init__( self, event_type: EventType, profile: Profile, message: Optional[str] = None, ): assert event_type in ( EventType.PROFILE_VALIDATION_START, EventType.PROFILE_VALIDATION_END, ) super().__init__(event_type, message) self._profile = profile @property def profile(self) -> Profile: return self._profile def __str__(self) -> str: return f"ProfileValidationEvent({self.event_type}, {self.profile})" def __repr__(self) -> str: return f"ProfileValidationEvent(event_type={self.event_type}, profile={self.profile})" def __eq__(self, other: object) -> bool: if not isinstance(other, ProfileValidationEvent): raise TypeError(f"Cannot compare {type(self)} with {type(other)}") return self.event_type == other.event_type and self.profile == other.profile def __ne__(self, other: object) -> bool: return not self.__eq__(other) def __hash__(self) -> int: return hash((self.event_type, self.profile)) class RequirementValidationEvent(Event): def __init__( self, event_type: EventType, requirement: Requirement, validation_result: Optional[bool] = None, message: Optional[str] = None, ): assert event_type in ( EventType.REQUIREMENT_VALIDATION_START, EventType.REQUIREMENT_VALIDATION_END, ) super().__init__(event_type, message) self._requirement = requirement self._validation_result = validation_result @property def requirement(self) -> Requirement: return self._requirement @property def validation_result(self) -> Optional[bool]: return self._validation_result def __str__(self) -> str: return f"RequirementValidationEvent({self.event_type}, {self.requirement})" def __repr__(self) -> str: return f"RequirementValidationEvent(event_type={self.event_type}, requirement={self.requirement})" def __eq__(self, other: object) -> bool: if not isinstance(other, RequirementValidationEvent): raise TypeError(f"Cannot compare {type(self)} with {type(other)}") return self.event_type == other.event_type and self.requirement == other.requirement def __ne__(self, other: object) -> bool: return not self.__eq__(other) def __hash__(self) -> int: return hash((self.event_type, self.requirement)) class RequirementCheckValidationEvent(Event): def __init__( self, event_type: EventType, requirement_check: RequirementCheck, validation_result: Optional[bool] = None, message: Optional[str] = None, ): assert event_type in ( EventType.REQUIREMENT_CHECK_VALIDATION_START, EventType.REQUIREMENT_CHECK_VALIDATION_END, ) super().__init__(event_type, message) self._requirement_check = requirement_check self._validation_result = validation_result @property def requirement_check(self) -> RequirementCheck: return self._requirement_check @property def validation_result(self) -> Optional[bool]: return self._validation_result def __str__(self) -> str: return f"RequirementCheckValidationEvent({self.event_type}, {self.requirement_check})" def __repr__(self) -> str: return ( f"RequirementCheckValidationEvent(event_type={self.event_type}, requirement_check={self.requirement_check})" ) def __eq__(self, other: object) -> bool: if not isinstance(other, RequirementCheckValidationEvent): raise TypeError(f"Cannot compare {type(self)} with {type(other)}") return self.event_type == other.event_type and self.requirement_check == other.requirement_check def __ne__(self, other: object) -> bool: return not self.__eq__(other) def __hash__(self) -> int: return hash((self.event_type, self.requirement_check)) class Validator(Publisher): """ Validator class for validating Research Object Crates(RO-Crate) against specified profiles according to the validation settings. Attributes: validation_settings(ValidationSettings): The settings used for validation. Methods: __init__(settings: Union[str, ValidationSettings]): Initializes the Validator with the given settings. validation_settings() -> ValidationSettings: Returns the validation settings. detect_rocrate_profiles() -> list[Profile]: Detects the profiles to validate against. validate() -> ValidationResult: Validate the RO-Crate against the detected profiles according to the validation settings validate_requirements(requirements: list[Requirement]) -> ValidationResult: Validates the RO-Crate against the specified subset of the profile requirements. """ def __init__(self, settings: Union[str, ValidationSettings]): self._validation_settings = ValidationSettings.parse(settings) super().__init__() # initialize the current context self.__current_context__ = None @property def validation_settings(self) -> ValidationSettings: return self._validation_settings def detect_rocrate_profiles(self) -> list[Profile]: """ Detect the profiles to validate against """ try: # initialize the validation context context = ValidationContext(self, self.validation_settings) candidate_profiles_uris = set() try: candidate_profiles_uris.update(context.ro_crate.metadata.get_conforms_to()) except Exception as e: logger.debug("Error while getting candidate profiles URIs: %s", e) try: candidate_profiles_uris.update(context.ro_crate.metadata.get_root_data_entity_conforms_to()) except Exception as e: logger.debug("Error while getting candidate profiles URIs: %s", e) logger.debug("Candidate profiles: %s", candidate_profiles_uris) if not candidate_profiles_uris: logger.debug("Unable to determine the profile to validate against") return None # load the profiles profiles = [] candidate_profiles = [] available_profiles = Profile.load_profiles( context.profiles_path, extra_profiles_path=context.extra_profiles_path, publicID=context.publicID, severity=context.requirement_severity, ) profiles = [p for p in available_profiles if p.uri in candidate_profiles_uris] # get the candidate profiles for profile in profiles: candidate_profiles.append(profile) inherited_profiles = profile.inherited_profiles for inherited_profile in inherited_profiles: if inherited_profile in candidate_profiles: candidate_profiles.remove(inherited_profile) logger.debug( "%d Candidate Profiles found: %s", len(candidate_profiles), candidate_profiles, ) # unmatched candidate profiles unmatched_profiles = candidate_profiles_uris.difference(set(p.uri for p in profiles)) logger.debug("Unmatched Candidate Profiles URIs: %s", unmatched_profiles) if len(unmatched_profiles) > 0: logger.warning( "The conformance to the following profiles could not be verified: %s", ", ".join(unmatched_profiles), ) return candidate_profiles except Exception as e: if logger.isEnabledFor(logging.DEBUG): logger.exception(e) return None def validate(self) -> ValidationResult: """ Validate the RO-Crate against the detected profiles according to the validation settings """ return self.__do_validate__() def validate_requirements(self, requirements: list[Requirement]) -> ValidationResult: """ Validates the RO-Crate against the specified subset of the profile requirements """ assert all(isinstance(requirement, Requirement) for requirement in requirements), "Invalid requirement type" # perform the requirements validation return self.__do_validate__(requirements) def __do_validate__(self, requirements: Optional[list[Requirement]] = None) -> ValidationResult: # initialize the validation context context = ValidationContext(self, self.validation_settings) # register the current context self.__current_context__ = context # initialize the requirement types self.__invoke_pre_validation_hooks__(context) try: # set the profiles to validate against profiles = context.profiles assert len(profiles) > 0, "No profiles to validate" # Pre-load every profile's requirements so all shape graphs are # populated before the validation loop runs. This lets a check # see `sh:deactivated true` triples declared by descendant # profiles that have not yet been visited. for p in profiles: _ = p.requirements self.notify(EventType.VALIDATION_START) for profile in profiles: logger.debug( "Validating profile %s (id: %s)", profile.name, profile.identifier, ) # set the target profile in the context context._target_validation_profile = profile self.notify(ProfileValidationEvent(EventType.PROFILE_VALIDATION_START, profile=profile)) # perform the requirements validation requirements = profile.get_requirements( context.requirement_severity, exact_match=context.requirement_severity_only, ) logger.debug( "Validating profile %s with %s requirements", profile.identifier, len(requirements), ) logger.debug( "For profile %s, validating these %s requirements: %s", profile.identifier, len(requirements), requirements, ) terminate = False for requirement in requirements: if not requirement.overridden: self.notify( RequirementValidationEvent( EventType.REQUIREMENT_VALIDATION_START, requirement=requirement, ) ) passed = requirement._do_validate_(context) logger.debug( "Requirement %s passed: %s", requirement.identifier, passed, ) if not requirement.overridden: self.notify( RequirementValidationEvent( EventType.REQUIREMENT_VALIDATION_END, requirement=requirement, validation_result=passed, ) ) if passed: logger.debug("Validation Requirement passed") else: logger.debug(f"Validation Requirement {requirement} failed (profile: {profile.identifier})") if context.fail_fast: logger.debug("Aborting on first requirement failure") terminate = True break self.notify(ProfileValidationEvent(EventType.PROFILE_VALIDATION_END, profile=profile)) if terminate: break # finalize the requirement types self.__invoke_post_validation_hooks__(context) # notify the end of the validation self.notify(ValidationEvent(EventType.VALIDATION_END, validation_result=context.result)) # return the validation result return context.result finally: # clear the current context self.__current_context__ = None def __invoke_pre_validation_hooks__(self, context: ValidationContext): logger.debug("Initializing requirement types: starting...") requirements_types = RequirementLoader.__get_requirement_classes__() for requirement_type in requirements_types: requirement_type.initialize(context) logger.debug("Initializing requirement types: completed") def __invoke_post_validation_hooks__(self, context: ValidationContext): logger.debug("Finalizing requirement types: starting...") requirements_types = RequirementLoader.__get_requirement_classes__() for requirement_type in requirements_types: requirement_type.finalize(context) logger.debug("Finalizing requirement types: completed") def notify(self, event: Union[Event, EventType]): """Override notify to update statistics""" assert self.__current_context__ is not None, "No current validation context" result: ValidationResult = self.__current_context__.result if isinstance(event, EventType): event = Event(event) result.statistics.update(event, ctx=self.__current_context__) return super().notify(event, ctx=self.__current_context__)
[docs] class ValidationContext: """ Class that represents the context for the validation process. """ def __init__(self, validator: Validator, settings: ValidationSettings): # reference to the validator self._validator = validator # reference to the settings self._settings = settings # reference to the data graph self._data_graph = None # reference to the profiles self._profiles = None # reference to the target profile self._target_validation_profile = None # reference to the validation result self._result = None # additional properties for the context self._properties = {} # URLs already reported as missing from the HTTP cache during this run self._offline_cache_misses_warned: set[str] = set() # initialize the ROCrate object if settings.metadata_dict: self._rocrate = ROCrate.from_metadata_dict(settings.metadata_dict) else: self._rocrate = ROCrate.new_instance( settings.rocrate_uri, relative_root_path=settings.rocrate_relative_root_path, ) assert isinstance(self._rocrate, ROCrate), "Invalid RO-Crate instance" @property def ro_crate(self) -> ROCrate: """ The RO-Crate instance :return: The RO-Crate instance :rtype: ROCrate """ return self._rocrate @property def validator(self) -> Validator: """ The validator instance which this context belongs to :return: The validator instance :rtype: Validator """ return self._validator @property def result(self) -> ValidationResult: """ The validation result :return: The validation result :rtype: ValidationResult """ if self._result is None: self._result = ValidationResult(self) return self._result @property def settings(self) -> ValidationSettings: """ The validation settings :return: The validation settings :rtype: ValidationSettings """ return self._settings @property def publicID(self) -> str: """ The root URI of the RO-Crate """ path = str(self.ro_crate.uri.base_uri) if not path.endswith("/"): path = f"{path}/" return path @property def profiles_path(self) -> Path: """ The path to the profiles :return: The path to the profiles :rtype: Path """ profiles_path = self.settings.profiles_path if isinstance(profiles_path, str): profiles_path = Path(profiles_path) return profiles_path @property def extra_profiles_path(self) -> Optional[Path]: """ The path to the extra profiles :return: The path to the extra profiles :rtype: Optional[Path] """ extra_profiles_path = self.settings.extra_profiles_path if isinstance(extra_profiles_path, str): extra_profiles_path = Path(extra_profiles_path) return extra_profiles_path if extra_profiles_path else None @property def requirement_severity(self) -> Severity: """ The requirement severity to validate against :return: The requirement severity :rtype: Severity """ severity = self.settings.requirement_severity if isinstance(severity, str): severity = Severity[severity] elif not isinstance(severity, Severity): raise ValueError(f"Invalid severity type: {type(severity)}") return severity @property def requirement_severity_only(self) -> bool: """ Flag to validate requirement severity only skipping check with lower or higher severity :return: The flag to validate requirement severity only :rtype: bool """ return self.settings.requirement_severity_only @property def rocrate_uri(self) -> URI: """ The URI of the RO-Crate :return: The URI of the RO-Crate :rtype: Path """ return self.settings.rocrate_uri @property def fail_fast(self) -> bool: """ Flag to abort on first error :return: The flag to abort on first error :rtype: bool """ return self.settings.abort_on_first @property def rel_fd_path(self) -> Path: """ The relative path to the file descriptor :return: The relative path to the file descriptor :rtype: Path """ return Path(ROCRATE_METADATA_FILE) def __load_data_graph__(self) -> Graph: data_graph = Graph() logger.debug("Loading RO-Crate metadata of: %s", self.ro_crate.uri) _ = data_graph.parse( data=self.ro_crate.metadata.as_dict(), format="json-ld", publicID=self.publicID, ) logger.debug("RO-Crate metadata loaded: %s", data_graph) return data_graph
[docs] def get_data_graph(self, refresh: bool = False) -> Graph: """ Utility method to get the data graph of the RO-Crate, i.e., the metadata of the RO-Crate as an RDF graph. :param refresh: Flag to refresh the data graph :type refresh: bool :return: The data graph of the RO-Crate :rtype: :py:class:rdflib.Graph :raises ROCrateMetadataNotFoundError: If the RO-Crate metadata is not found """ # load the data graph try: if not self._data_graph or refresh: self._data_graph = self.__load_data_graph__() return self._data_graph except (HTTPError, FileNotFoundError) as e: logger.debug("Error loading data graph: %s", e) raise ROCrateMetadataNotFoundError(self.rocrate_uri)
@property def data_graph(self) -> Graph: """ The data graph of the RO-Crate, i.e., the metadata of the RO-Crate as an RDF graph. :return: The data graph of the RO-Crate :rtype: Graph """ return self.get_data_graph() @property def inheritance_enabled(self) -> bool: """ Flag which indicates if profile inheritance is enabled :return: The flag to enable profile inheritance :rtype: bool """ return self.settings.enable_profile_inheritance @property def profile_identifier(self) -> str: """ The profile identifier to validate against :return: The profile identifier :rtype: str """ return self.settings.profile_identifier @property def allow_requirement_check_override(self) -> bool: """ Flag that indicates if requirement check override is allowed :return: The flag to allow requirement check override :rtype: bool """ return self.settings.allow_requirement_check_override @property def disable_check_for_duplicates(self) -> bool: """ Flag that indicates if the check for duplicates is disabled :return: The flag to disable the check for duplicates :rtype: bool """ return self.settings.disable_check_for_duplicates def __load_profiles__(self) -> list[Profile]: # load all profiles profiles = Profile.load_profiles( self.profiles_path, extra_profiles_path=self.settings.extra_profiles_path, publicID=self.publicID, severity=self.requirement_severity, allow_requirement_check_override=self.allow_requirement_check_override, ) # Check if the target profile is in the list of profiles profile = Profile.get_by_identifier(self.profile_identifier) if not profile: try: candidate_profiles = Profile.get_by_token(self.profile_identifier) logger.debug("Candidate profiles found by token: %s", profile) if candidate_profiles: # Find the profile with the highest version number profile = max(candidate_profiles, key=lambda p: p.version) self.settings.profile_identifier = profile.identifier logger.debug("Profile with the highest version number: %s", profile) # if the profile is found by token, set the profile name to the identifier self.settings.profile_identifier = profile.identifier except AttributeError as e: # raised when the profile is not found if logger.isEnabledFor(logging.DEBUG): logger.exception(e) raise ProfileNotFound( self.profile_identifier, message=f"Profile '{self.profile_identifier}' not found in '{self.profiles_path}'", ) from e # if the inheritance is enabled, return only the target profile if not self.inheritance_enabled: return [profile] # Set the profiles to validate against as the target profile and its inherited profiles profiles = profile.inherited_profiles + [profile] # if the check for duplicates is disabled, return the profiles if self.disable_check_for_duplicates: return profiles return profiles @property def profiles(self) -> list[Profile]: """ The profiles to validate against, i.e., the target profile and its inherited profiles :return: The profiles to validate against :rtype: list[Profile] """ if not self._profiles: self._profiles = self.__load_profiles__() return self._profiles.copy() @property def target_validation_profile(self) -> Profile: """ The target validation profile to validate against :return: The target validation profile :rtype: Profile """ return self._target_validation_profile @property def target_profile(self) -> Profile: """ The target profile to validate against :return: The target profile :rtype: Profile """ profiles = self.profiles assert len(profiles) > 0, "No profiles to validate" return self.profiles[-1]
[docs] def get_profile_by_token(self, token: str) -> list[Profile]: """ Get the profile by token from the profiles to validate against :param token: The token of the profile :type token: str :return: The profile with the given token :rtype: Profile """ return [p for p in self.profiles if p.token == token]
[docs] def get_profile_by_identifier(self, identifier: str) -> list[Profile]: """ Get the profile by identifier from the profiles to validate against :param identifier: The identifier of the profile :type identifier: str :return: The profile with the given identifier :rtype: Profile """ for p in self.profiles: if p.identifier == identifier: return p raise ProfileNotFound(identifier)
[docs] def maybe_warn_offline_cache_miss(self, exc: BaseException) -> bool: """ If ``exc`` (or any cause/context in its chain) is an :class:`OfflineCacheMissError`, emit a single user-facing warning for the missing URL — but only the first time that URL is seen during this validation run — and return ``True``. Returns ``False`` when the exception is unrelated to offline cache misses, so callers can fall back to their generic handling. """ miss = find_offline_cache_miss(exc) if miss is None: return False if miss.url not in self._offline_cache_misses_warned: self._offline_cache_misses_warned.add(miss.url) logger.warning("%s", miss) return True