Source code for oc_ocdm.support.support

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
#
# Permission to use, copy, modify, and/or distribute this software for any purpose
# with or without fee is hereby granted, provided that the above copyright notice
# and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
# SOFTWARE.
from __future__ import annotations

import os
import re
from datetime import datetime
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Optional, List, Tuple, Match, Dict, Set
    from rdflib import URIRef, Graph
    from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource
    from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent
    from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole

from urllib.parse import quote

from rdflib import Literal, RDF, XSD


[docs]def create_date(date_list: List[Optional[int]] = None) -> Optional[str]: string: Optional[str] = None if date_list is not None: l_date_list: int = len(date_list) if l_date_list != 0 and date_list[0] is not None: if l_date_list == 3 and \ ((date_list[1] is not None and date_list[1] != 1) or (date_list[2] is not None and date_list[2] != 1)): string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d') elif l_date_list == 2 and date_list[1] is not None: string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m') else: string = datetime(date_list[0], 1, 1).strftime('%Y') return string
[docs]def get_datatype_from_iso_8601(string: str) -> Tuple[URIRef, str]: # Keep only the "yyyy-mm-dd" part of the string string = string[:10] try: date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)] except ValueError: raise ValueError("The provided date string is not ISO-8601 compliant!") num_of_parts: int = len(date_parts) if num_of_parts == 3: return XSD.date, datetime(*date_parts).strftime('%Y-%m-%d') elif num_of_parts == 2: return XSD.gYearMonth, datetime(*date_parts, 1).strftime('%Y-%m') else: return XSD.gYear, datetime(*date_parts, 1, 1).strftime('%Y')
[docs]def get_ordered_contributors_from_br(br: BibliographicResource, contributor_type: URIRef): ar_list: List[AgentRole] = br.get_contributors() list_id: int = 0 heads: Dict[URIRef, Dict] = {} tails: Dict[URIRef, Dict] = {} sub_lists: List[Dict] = [] from_id_to_res_in_heads: Dict[int, URIRef] = {} for ar in ar_list: role_type: URIRef = ar.get_role_type() ra: ResponsibleAgent = ar.get_is_held_by() next_ar: AgentRole = ar.get_next() if next_ar is not None: next_ar_res: Optional[URIRef] = next_ar.res else: next_ar_res: Optional[URIRef] = None if role_type is not None and role_type == contributor_type and ra is not None: if next_ar_res is not None and next_ar_res in heads: sub_list: Dict = heads[next_ar_res] sub_list['list'].insert(0, ra) del heads[next_ar_res] heads[ar.res] = sub_list from_id_to_res_in_heads[sub_list['id']] = ar.res elif ar.res is not None and ar.res in tails: sub_list: Dict = tails[ar.res] sub_list['list'].append(ra) del tails[ar.res] if next_ar_res is not None: tails[next_ar_res] = sub_list else: # This AR cannot be inserted into any list, so # we need to create an entirely new list for it: sub_list: Dict = {'id': list_id, 'list': [ra]} list_id += 1 sub_lists.append(sub_list) heads[ar.res] = sub_list from_id_to_res_in_heads[sub_list['id']] = ar.res if next_ar_res is not None: tails[next_ar_res] = sub_list ids_in_heads: Set[int] = {val['id'] for val in heads.values()} ids_in_tails: Set[int] = {val['id'] for val in tails.values()} diff_set: Set[int] = ids_in_heads - ids_in_tails if len(diff_set) == 0: # No contributor was found! return [] elif len(diff_set) != 1: raise ValueError('A malformed list of AgentRole entities was given.') else: result_list: List[ResponsibleAgent] = [] cur_id: int = diff_set.pop() already_merged_list_ids: Set[int] = set() finished: bool = False while not finished: found: bool = False if cur_id in from_id_to_res_in_heads: res: URIRef = from_id_to_res_in_heads[cur_id] subl: Dict = heads[res] subl_id: int = subl['id'] if subl_id not in already_merged_list_ids: found = True already_merged_list_ids.add(subl_id) result_list = subl['list'] + result_list # Now we need to get the next cur_id value: if res in tails: cur_id = tails[res]['id'] else: finished = True if not found: raise ValueError('A malformed list of AgentRole entities was given.') unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids if len(unmerged_list_ids) != 0: raise ValueError('A malformed list of AgentRole entities was given.') return result_list
[docs]def encode_url(u: str) -> str: return quote(u, "://")
[docs]def create_literal(g: Graph, res: URIRef, p: URIRef, s: str, dt: URIRef = None, nor: bool = True) -> None: if not is_string_empty(s): g.add((res, p, Literal(s, datatype=dt, normalize=nor)))
[docs]def create_type(g: Graph, res: URIRef, res_type: URIRef) -> None: g.add((res, RDF.type, res_type))
[docs]def is_string_empty(string: str) -> bool: return string is None or string.strip() == ""
# Variable used in several functions entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$" prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)/prov/([a-z][a-z])/([1-9][0-9]*)$" def _get_match(regex: str, group: int, string: str) -> str: match: Match = re.match(regex, string) if match is not None: return match.group(group) else: return ""
[docs]def get_base_iri(res: URIRef) -> str: string_iri: str = str(res) if "/prov/" in string_iri: return _get_match(prov_regex, 1, string_iri) else: return _get_match(entity_regex, 1, string_iri)
[docs]def get_short_name(res: URIRef) -> str: string_iri: str = str(res) if "/prov/" in string_iri: return _get_match(prov_regex, 5, string_iri) else: return _get_match(entity_regex, 2, string_iri)
[docs]def get_prov_subject_short_name(prov_res: URIRef) -> str: string_iri: str = str(prov_res) if "/prov/" in string_iri: return _get_match(prov_regex, 2, string_iri) else: return "" # non-provenance entities do not have a prov_subject!
[docs]def get_prefix(res: URIRef) -> str: string_iri: str = str(res) if "/prov/" in string_iri: return "" # provenance entities cannot have a supplier prefix else: return _get_match(entity_regex, 3, string_iri)
[docs]def get_prov_subject_prefix(prov_res: URIRef) -> str: string_iri: str = str(prov_res) if "/prov/" in string_iri: return _get_match(prov_regex, 3, string_iri) else: return "" # non-provenance entities do not have a prov_subject!
[docs]def get_count(res: URIRef) -> str: string_iri: str = str(res) if "/prov/" in string_iri: return _get_match(prov_regex, 6, string_iri) else: return _get_match(entity_regex, 4, string_iri)
[docs]def get_prov_subject_count(prov_res: URIRef) -> str: string_iri: str = str(prov_res) if "/prov/" in string_iri: return _get_match(prov_regex, 4, string_iri) else: return "" # non-provenance entities do not have a prov_subject!
[docs]def get_resource_number(res: URIRef) -> int: string_iri: str = str(res) if "/prov/" in string_iri: return int(_get_match(prov_regex, 4, string_iri)) else: return int(_get_match(entity_regex, 4, string_iri))
[docs]def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int: cur_number: int = get_resource_number(res) cur_file_split: int = 0 while True: if cur_number > cur_file_split: cur_file_split += n_file_item else: cur_file_split -= n_file_item break return cur_number - cur_file_split
[docs]def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int, n_file_item: int, is_json: bool = True) -> Tuple[str, str]: """ This function is responsible for looking for the correct JSON file that contains the data related to the resource identified by the variable 'string_iri'. This search takes into account the organisation in directories and files, as well as the particular supplier prefix for bibliographic entities, if specified. In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead. """ string_iri: str = str(res) if is_dataset(res): cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1] # In case of dataset, the file path is different from regular files, e.g. # /corpus/br/index.json cur_file_path: str = cur_dir_path + os.sep + "index.json" else: cur_number: int = get_resource_number(res) # Find the correct file number where to save the resources cur_file_split: int = 0 while True: if cur_number > cur_file_split: cur_file_split += n_file_item else: break # The data have been split in multiple directories and it is not something related # with the provenance data of the whole corpus (e.g. provenance agents) if dir_split and not string_iri.startswith(base_iri + "prov/"): # Find the correct directory number where to save the file cur_split: int = 0 while True: if cur_number > cur_split: cur_split += dir_split else: break if "/prov/" in string_iri: # provenance file of a bibliographic entity subj_short_name: str = get_prov_subject_short_name(res) short_name: str = get_short_name(res) sub_folder: str = get_prov_subject_prefix(res) file_extension: str = '.json' if is_json else '.nq' if sub_folder == "": sub_folder = default_dir if sub_folder == "": sub_folder = "_" # enforce default value cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \ os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov" cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension else: # regular bibliographic entity short_name: str = get_short_name(res) sub_folder: str = get_prefix(res) file_extension: str = '.json' if is_json else '.nt' if sub_folder == "": sub_folder = default_dir if sub_folder == "": sub_folder = "_" # enforce default value cur_dir_path: str = base_dir + short_name + os.sep + sub_folder + os.sep + str(cur_split) cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension # Enter here if no split is needed elif dir_split == 0: if "/prov/" in string_iri: subj_short_name: str = get_prov_subject_short_name(res) short_name: str = get_short_name(res) sub_folder: str = get_prov_subject_prefix(res) file_extension: str = '.json' if is_json else '.nq' if sub_folder == "": sub_folder = default_dir if sub_folder == "": sub_folder = "_" # enforce default value cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \ os.sep + str(cur_file_split) + os.sep + "prov" cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension else: short_name: str = get_short_name(res) sub_folder: str = get_prefix(res) file_extension: str = '.json' if is_json else '.nt' if sub_folder == "": sub_folder = default_dir if sub_folder == "": sub_folder = "_" # enforce default value cur_dir_path: str = base_dir + short_name + os.sep + sub_folder cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension # Enter here if the data is about a provenance agent, e.g. /corpus/prov/ else: short_name: str = get_short_name(res) prefix: str = get_prefix(res) count: str = get_count(res) file_extension: str = '.json' if is_json else '.nq' cur_dir_path: str = base_dir + short_name cur_file_path: str = cur_dir_path + os.sep + prefix + count + file_extension return cur_dir_path, cur_file_path
[docs]def has_supplier_prefix(res: URIRef, base_iri: str) -> bool: string_iri: str = str(res) return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None
[docs]def is_dataset(res: URIRef) -> bool: string_iri: str = str(res) return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None