#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
#
# Permission to use, copy, modify, and/or distribute this software for any purpose
# with or without fee is hereby granted, provided that the above copyright notice
# and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
# SOFTWARE.
from __future__ import annotations
import json
import os
from importlib import resources
from SPARQLWrapper import SPARQLWrapper, RDFXML
from pyshex import ShExEvaluator
from rdflib import RDF, Namespace, ConjunctiveGraph, Graph
from typing import TYPE_CHECKING
from oc_ocdm.graph.graph_entity import GraphEntity
from oc_ocdm.support.reporter import Reporter
if TYPE_CHECKING:
from typing import List, Set, Dict, Any, Optional
from rdflib import URIRef, term
from oc_ocdm.graph.graph_set import GraphSet
[docs]class Reader(object):
def __init__(self, repok: Reporter = None, reperr: Reporter = None, context_map: Dict[str, Any] = None) -> None:
if context_map is not None:
self.context_map: Dict[str, Any] = context_map
else:
self.context_map: Dict[str, Any] = {}
for context_url in self.context_map:
ctx_file_path: Any = self.context_map[context_url]
if type(ctx_file_path) == str and os.path.isfile(ctx_file_path):
# This expensive operation is done only when it's really needed
with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f:
self.context_map[context_url] = json.load(ctx_f)
if repok is None:
self.repok: Reporter = Reporter(prefix="[Reader: INFO] ")
else:
self.repok: Reporter = repok
if reperr is None:
self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ")
else:
self.reperr: Reporter = reperr
[docs] def load(self, rdf_file_path: str) -> Optional[ConjunctiveGraph]:
self.repok.new_article()
self.reperr.new_article()
loaded_graph: Optional[ConjunctiveGraph] = None
if os.path.isfile(rdf_file_path):
try:
loaded_graph = self._load_graph(rdf_file_path)
except Exception as e:
self.reperr.add_sentence("[1] "
"It was impossible to handle the format used for "
"storing the file (stored in the temporary path) "
f"'{rdf_file_path}'. Additional details: {e}")
else:
self.reperr.add_sentence("[2] "
f"The file specified ('{rdf_file_path}') doesn't exist.")
return loaded_graph
def _load_graph(self, file_path: str) -> ConjunctiveGraph:
formats: List[str] = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"]
loaded_graph: ConjunctiveGraph = ConjunctiveGraph()
errors: str = ""
for cur_format in formats:
try:
if cur_format == "json-ld":
with open(file_path, 'rt', encoding='utf-8') as f:
json_ld_file: Any = json.load(f)
if isinstance(json_ld_file, dict):
json_ld_file: List[Any] = [json_ld_file]
for json_ld_resource in json_ld_file:
# Trick to force the use of a pre-loaded context if the format
# specified is JSON-LD
if "@context" in json_ld_resource:
cur_context: str = json_ld_resource["@context"]
if cur_context in self.context_map:
context_json: Any = self.context_map[cur_context]["@context"]
json_ld_resource["@context"] = context_json
loaded_graph.parse(data=json.dumps(json_ld_resource, ensure_ascii=False),
format=cur_format)
else:
loaded_graph.parse(file_path, format=cur_format)
return loaded_graph
except Exception as e:
errors += f" | {e}" # Try another format
raise IOError("1", f"It was impossible to handle the format used for storing the file '{file_path}'{errors}")
[docs] @staticmethod
def get_graph_from_subject(graph: Graph, subject: URIRef) -> Graph:
g: Graph = Graph(identifier=graph.identifier)
for p, o in graph.predicate_objects(subject):
g.add((subject, p, o))
return g
@staticmethod
def _validate(graph: Graph, shex: str, valid_graph: Graph, focus: URIRef, shape: URIRef) -> bool:
node_result = ShExEvaluator().evaluate(rdf=graph, shex=shex, focus=focus, start=shape)[0]
if node_result.result:
for triple in graph.triples((focus, None, None)):
valid_graph.add(triple)
return node_result.result
@staticmethod
def _extract_subjects(graph: Graph) -> Set[URIRef]:
subjects: Set[URIRef] = set()
for s in graph.subjects():
subjects.add(s)
return subjects
[docs] @staticmethod
def graph_validation(graph: Graph, closed: bool = False) -> Graph:
valid_graph: Graph = Graph(identifier=graph.identifier)
if closed:
shex = resources.read_text('oc_ocdm.resources', 'shexc_closed.txt')
else:
shex = resources.read_text('oc_ocdm.resources', 'shexc.txt')
BIRO: Namespace = Namespace("http://purl.org/spar/biro/")
C4O: Namespace = Namespace("http://purl.org/spar/c4o/")
CITO: Namespace = Namespace("http://purl.org/spar/cito/")
DATACITE: Namespace = Namespace("http://purl.org/spar/datacite/")
DEO: Namespace = Namespace("http://purl.org/spar/deo/")
FABIO: Namespace = Namespace("http://purl.org/spar/fabio/")
FOAF: Namespace = Namespace("http://xmlns.com/foaf/0.1/")
OA: Namespace = Namespace("http://www.w3.org/ns/oa#")
PRO: Namespace = Namespace("http://purl.org/spar/pro/")
OC: Namespace = Namespace("https://opencitations.net/shex/")
for subject in Reader._extract_subjects(graph):
# ReferenceAnnotation
if (subject, RDF.type, OA.Annotation) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.ReferenceAnnotationShape)
# AgentRole
elif (subject, RDF.type, PRO.RoleInTime) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.AgentRoleShape)
# BibliographicReference
elif (subject, RDF.type, BIRO.BibliographicReference) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.BibliographicReferenceShape)
# BibliographicResource
elif (subject, RDF.type, FABIO.Expression) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.BibliographicResourceShape)
# Citation
elif (subject, RDF.type, CITO.Citation) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.CitationShape)
# DiscourseElement
elif (subject, RDF.type, DEO.DiscourseElement) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.DiscourseElementShape)
# Identifier
elif (subject, RDF.type, DATACITE.Identifier) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.IdentifierShape)
# PointerList
elif (subject, RDF.type, C4O.SingleLocationPointerList) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.PointerListShape)
# ResponsibleAgent
elif (subject, RDF.type, FOAF.Agent) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.ResponsibleAgentShape)
# ResourceEmbodiment
elif (subject, RDF.type, FABIO.Manifestation) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.ResourceEmbodimentShape)
# ReferencePointer
elif (subject, RDF.type, C4O.InTextReferencePointer) in graph:
Reader._validate(graph, shex, valid_graph, subject, OC.ReferencePointerShape)
return valid_graph
[docs] @staticmethod
def import_entities_from_graph(g_set: GraphSet, graph: Graph, resp_agent: str,
enable_validation: bool = False, closed: bool = False) -> List[GraphEntity]:
if enable_validation:
graph = Reader.graph_validation(graph, closed)
imported_entities: List[GraphEntity] = []
for subject in Reader._extract_subjects(graph):
types: List[term] = []
for o in graph.objects(subject, RDF.type):
types.append(o)
# ReferenceAnnotation
if GraphEntity.iri_note in types:
imported_entities.append(g_set.add_an(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# AgentRole
elif GraphEntity.iri_role_in_time in types:
imported_entities.append(g_set.add_ar(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# BibliographicReference
elif GraphEntity.iri_bibliographic_reference in types:
imported_entities.append(g_set.add_be(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# BibliographicResource
elif GraphEntity.iri_expression in types:
imported_entities.append(g_set.add_br(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# Citation
elif GraphEntity.iri_citation in types:
imported_entities.append(g_set.add_ci(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# DiscourseElement
elif GraphEntity.iri_discourse_element in types:
imported_entities.append(g_set.add_de(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# Identifier
elif GraphEntity.iri_identifier in types:
imported_entities.append(g_set.add_id(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# PointerList
elif GraphEntity.iri_singleloc_pointer_list in types:
imported_entities.append(g_set.add_pl(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# ResponsibleAgent
elif GraphEntity.iri_agent in types:
imported_entities.append(g_set.add_ra(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# ResourceEmbodiment
elif GraphEntity.iri_manifestation in types:
imported_entities.append(g_set.add_re(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
# ReferencePointer
elif GraphEntity.iri_intextref_pointer in types:
imported_entities.append(g_set.add_rp(resp_agent=resp_agent, res=subject,
preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
return imported_entities
[docs] @staticmethod
def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: URIRef, resp_agent: str,
enable_validation: bool = False) -> GraphEntity:
sparql: SPARQLWrapper = SPARQLWrapper(ts_url)
query: str = f"CONSTRUCT {{<{res}> ?p ?o}} WHERE {{<{res}> ?p ?o}}"
sparql.setQuery(query)
sparql.setMethod('GET')
sparql.setReturnFormat(RDFXML)
result: ConjunctiveGraph = sparql.query().convert()
if result is not None:
imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result,
resp_agent, enable_validation)
if len(imported_entities) <= 0:
raise ValueError("The requested entity was not found or was not recognized as a proper OCDM entity.")
else:
return imported_entities[0]