Source code for rocrate_validator.services

# Copyright (c) 2024-2026 CRS4
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import shutil
import tempfile
import zipfile
from pathlib import Path
from typing import Optional, Union

from rocrate_validator.utils import log as logging
from rocrate_validator.events import Subscriber
from rocrate_validator.models import Profile, Severity, ValidationResult, ValidationSettings, Validator
from rocrate_validator.utils.uri import URI
from rocrate_validator.utils.paths import get_profiles_path
from rocrate_validator.utils.http import HttpRequester

# set the default profiles path
DEFAULT_PROFILES_PATH = get_profiles_path()

# set up logging
logger = logging.getLogger(__name__)


def detect_profiles(settings: Union[dict, ValidationSettings]) -> list[Profile]:
    # initialize the validator
    validator = __initialise_validator__(settings)
    # detect the profiles
    profiles = validator.detect_rocrate_profiles()
    logger.debug("Profiles detected: %s", profiles)
    return profiles


def validate_metadata_as_dict(
    metadata_dict: dict, settings: Union[dict, ValidationSettings], subscribers: Optional[list[Subscriber]] = None
) -> ValidationResult:
    """
    Validate the RO-Crate metadata only against a profile and return the validation result.
    """
    assert metadata_dict is not None, "Metadata dictionary cannot be None"
    assert isinstance(metadata_dict, dict), "Metadata must be a dictionary"
    # set the RO-Crate metadata dictionary in the settings
    if isinstance(settings, dict):
        settings["metadata_dict"] = metadata_dict
        settings["metadata_only"] = True
    else:
        settings.metadata_dict = metadata_dict
        settings.metadata_only = True
    # validate the RO-Crate metadata
    return validate(settings, subscribers)


[docs] def validate( settings: Union[dict, ValidationSettings], subscribers: Optional[list[Subscriber]] = None ) -> ValidationResult: """ Validate a RO-Crate against a profile and return the validation result :param settings: the validation settings :type settings: Union[dict, ValidationSettings] :param subscribers: the list of subscribers :type subscribers: Optional[list[Subscriber]] :return: the validation result :rtype: ValidationResult """ # initialize the validator validator = __initialise_validator__(settings, subscribers) # validate the RO-Crate result = validator.validate() logger.debug("Validation completed: %s", result) return result
def __initialise_validator__( settings: Union[dict, ValidationSettings], subscribers: Optional[list[Subscriber]] = None ) -> Validator: """ Validate a RO-Crate against a profile """ # if settings is a dict, convert to ValidationSettings settings = ValidationSettings.parse(settings) # parse the rocrate path rocrate_path: URI = URI(settings.rocrate_uri) logger.debug("Validating RO-Crate: %s", rocrate_path) # check if the RO-Crate exists if not getattr(settings, "metadata_only", False) and getattr(settings, "metadata_dict", None) is None: if not rocrate_path.is_available(): raise FileNotFoundError(f"RO-Crate not found: {rocrate_path}") # check if remote validation is enabled disable_remote_crate_download = settings.disable_remote_crate_download logger.debug("Remote validation: %s", disable_remote_crate_download) if disable_remote_crate_download: # create a validator validator = Validator(settings) logger.debug("Validator created. Starting validation...") if subscribers: for subscriber in subscribers: validator.add_subscriber(subscriber) return validator def __init_validator__(settings: ValidationSettings) -> Validator: # create a validator validator = Validator(settings) logger.debug("Validator created. Starting validation...") if subscribers: for subscriber in subscribers: validator.add_subscriber(subscriber) return validator def __extract_and_validate_rocrate__(rocrate_path: Path): # store the original data path original_data_path = settings.rocrate_uri with tempfile.TemporaryDirectory() as tmp_dir: try: # extract the RO-Crate to the temporary directory with zipfile.ZipFile(rocrate_path, "r") as zip_ref: zip_ref.extractall(tmp_dir) logger.debug("RO-Crate extracted to temporary directory: %s", tmp_dir) # update the data path to point to the temporary directory settings.rocrate_uri = Path(tmp_dir) # continue with the validation process return __init_validator__(settings) finally: # restore the original data path settings.rocrate_uri = original_data_path logger.debug("Original data path restored: %s", original_data_path) # check if the RO-Crate is a remote RO-Crate, # i.e., if the RO-Crate is a URL. If so, download the RO-Crate # and extract it to a temporary directory. We support either http or https # or ftp protocols to download the remote RO-Crate. if rocrate_path.scheme in ("http", "https", "ftp"): logger.debug("RO-Crate is a remote RO-Crate") # create a temp folder to store the downloaded RO-Crate with tempfile.NamedTemporaryFile(delete=False) as tmp_file: requester = HttpRequester() offline = bool(getattr(settings, "offline", False)) # In offline mode, the cache is the only source of truth. Otherwise, # bypass the cache to refresh the stored copy so that subsequent # offline runs validate against the latest known remote state. if offline: response = requester.get(rocrate_path.uri, stream=True, allow_redirects=True) else: response = requester.fetch_fresh(rocrate_path.uri, stream=True, allow_redirects=True) with response as r: if r.status_code >= 400: if offline and r.status_code == 504: raise FileNotFoundError( f"Remote RO-Crate '{rocrate_path.uri}' is not available in the HTTP cache. " f"Validate it online first, or run " f"`rocrate-validator cache warm --crate '{rocrate_path.uri}'`." ) raise FileNotFoundError( f"Failed to download remote RO-Crate '{rocrate_path.uri}' (status {r.status_code})." ) with open(tmp_file.name, "wb") as f: shutil.copyfileobj(r.raw, f) logger.debug("RO-Crate downloaded to temporary file: %s", tmp_file.name) # continue with the validation process by extracting the RO-Crate and validating it return __extract_and_validate_rocrate__(Path(tmp_file.name)) # check if the RO-Crate is a ZIP file elif rocrate_path.as_path().suffix == ".zip": logger.debug("RO-Crate is a local ZIP file") # continue with the validation process by extracting the RO-Crate and validating it return __extract_and_validate_rocrate__(rocrate_path.as_path()) # if the RO-Crate is not a ZIP file, directly validate the RO-Crate elif rocrate_path.is_local_directory(): logger.debug("RO-Crate is a local directory") settings.rocrate_uri = rocrate_path.as_path() return __init_validator__(settings) else: raise ValueError( f"Invalid RO-Crate URI: {rocrate_path}. It MUST be a local directory or a ZIP file (local or remote)." )
[docs] def get_profiles( profiles_path: Path = DEFAULT_PROFILES_PATH, extra_profiles_path: Optional[Path] = None, severity=Severity.OPTIONAL, allow_requirement_check_override: bool = ValidationSettings.allow_requirement_check_override, ) -> list[Profile]: """ Get the list of profiles supported by the package. The profile source path can be overridden by specifying ``profiles_path``. :param profiles_path: the path to the profiles directory :type profiles_path: Path :param severity: the severity level :type severity: Severity :param allow_requirement_check_override: a flag to enable or disable the requirement check override (default: ``True``). If ``True``, the requirement check of a profile ``A`` can be overridden by the requirement check of a profile extension ``B`` (i.e., when ``B extends A``) if they share the same name. If ``False``, a profile extension ``B`` can only add new requirements to the profile ``A`` (i.e., checks with name not present in ``A``) and an error is raised if a check with the same name is found in both profiles. :type allow_requirement_check_override: bool :return: the list of profiles :rtype: list[Profile] """ profiles = Profile.load_profiles( profiles_path, extra_profiles_path=extra_profiles_path, severity=severity, allow_requirement_check_override=allow_requirement_check_override, ) logger.debug("Profiles loaded: %s", profiles) return profiles
[docs] def get_profile( profile_identifier: str, profiles_path: Path = DEFAULT_PROFILES_PATH, extra_profiles_path: Optional[Path] = None, severity=Severity.OPTIONAL, allow_requirement_check_override: bool = ValidationSettings.allow_requirement_check_override, ) -> Profile: """ Get the profile with the given identifier. The profile source path can be overridden through ``profiles_path``. The profile is loaded based on the given severity level and the requirement check override flag. :param profile_identifier: the profile identifier :type profile_identifier: str :param profiles_path: the path to the profiles directory :type profiles_path: Path :param severity: the severity level :type severity: Severity :param allow_requirement_check_override: a flag to enable or disable the requirement check override (default: ``True``). If ``True``, the requirement check of a profile ``A`` can be overridden by the requirement check of a profile extension ``B`` (i.e., when ``B extends A``) if they share the same name. If ``False``, a profile extension ``B`` can only add new requirements to the profile ``A`` (i.e., checks with name not present in ``A``) and an error is raised if a check with the same name is found in both profiles. :type allow_requirement_check_override: bool :return: the profile :rtype: Profile """ profiles = get_profiles( profiles_path, extra_profiles_path=extra_profiles_path, severity=severity, allow_requirement_check_override=allow_requirement_check_override, ) return Profile.find_in_list(profiles, profile_identifier)