from __future__ import annotations import collections import datetime import itertools import typing as t from collections.abc import Mapping, MutableMapping from typing import ( TYPE_CHECKING, Any, Container, Dict, Generator, Iterable, List, Optional, Tuple, TypeVar, Union, ) import rdflib.plugins.sparql from rdflib.graph import ConjunctiveGraph, Dataset, Graph from rdflib.namespace import NamespaceManager from rdflib.plugins.sparql.parserutils import CompValue from rdflib.term import BNode, Identifier, Literal, Node, URIRef, Variable if TYPE_CHECKING: from rdflib.paths import Path _AnyT = TypeVar("_AnyT") [docs]class SPARQLError(Exception): [docs] def __init__(self, msg: Optional[str] = None): Exception.__init__(self, msg) [docs]class NotBoundError(SPARQLError): [docs] def __init__(self, msg: Optional[str] = None): SPARQLError.__init__(self, msg) [docs]class AlreadyBound(SPARQLError): # noqa: N818 """Raised when trying to bind a variable that is already bound!""" [docs] def __init__(self): SPARQLError.__init__(self) [docs]class SPARQLTypeError(SPARQLError): [docs] def __init__(self, msg: Optional[str]): SPARQLError.__init__(self, msg) [docs]class Bindings(MutableMapping): """ A single level of a stack of variable-value bindings. Each dict keeps a reference to the dict below it, any failed lookup is propegated back In python 3.3 this could be a collections.ChainMap """ [docs] def __init__(self, outer: Optional[Bindings] = None, d=[]): self._d: Dict[str, str] = dict(d) self.outer = outer [docs] def __getitem__(self, key: str) -> str: if key in self._d: return self._d[key] if not self.outer: raise KeyError() return self.outer[key] [docs] def __contains__(self, key: Any) -> bool: try: self[key] return True except KeyError: return False [docs] def __setitem__(self, key: str, value: Any) -> None: self._d[key] = value [docs] def __delitem__(self, key: str) -> None: raise Exception("DelItem is not implemented!") [docs] def __len__(self) -> int: i = 0 d: Optional[Bindings] = self while d is not None: i += len(d._d) d = d.outer return i [docs] def __iter__(self) -> Generator[str, None, None]: d: Optional[Bindings] = self while d is not None: yield from d._d d = d.outer [docs] def __str__(self) -> str: # type error: Generator has incompatible item type "Tuple[Any, str]"; expected "str" return "Bindings({" + ", ".join((k, self[k]) for k in self) + "})" # type: ignore[misc] [docs] def __repr__(self) -> str: return str(self) [docs]class FrozenDict(Mapping): """ An immutable hashable dict Taken from http://stackoverflow.com/a/2704866/81121 """ [docs] def __init__(self, *args: Any, **kwargs: Any): self._d: Dict[Identifier, Identifier] = dict(*args, **kwargs) self._hash: Optional[int] = None [docs] def __iter__(self): return iter(self._d) [docs] def __len__(self) -> int: return len(self._d) [docs] def __getitem__(self, key: Identifier) -> Identifier: return self._d[key] [docs] def __hash__(self) -> int: # It would have been simpler and maybe more obvious to # use hash(tuple(sorted(self._d.items()))) from this discussion # so far, but this solution is O(n). I don't know what kind of # n we are going to run into, but sometimes it's hard to resist the # urge to optimize when it will gain improved algorithmic performance. if self._hash is None: self._hash = 0 for key, value in self.items(): self._hash ^= hash(key) self._hash ^= hash(value) return self._hash [docs] def project(self, vars: Container[Variable]) -> FrozenDict: return FrozenDict(x for x in self.items() if x[0] in vars) [docs] def disjointDomain(self, other: t.Mapping[Identifier, Identifier]) -> bool: return not bool(set(self).intersection(other)) [docs] def compatible(self, other: t.Mapping[Identifier, Identifier]) -> bool: for k in self: try: if self[k] != other[k]: return False except KeyError: pass return True [docs] def merge(self, other: t.Mapping[Identifier, Identifier]) -> FrozenDict: res = FrozenDict(itertools.chain(self.items(), other.items())) return res [docs] def __str__(self) -> str: return str(self._d) [docs] def __repr__(self) -> str: return repr(self._d) [docs]class FrozenBindings(FrozenDict): [docs] def __init__(self, ctx: QueryContext, *args, **kwargs): FrozenDict.__init__(self, *args, **kwargs) self.ctx = ctx [docs] def __getitem__(self, key: Union[Identifier, str]) -> Identifier: if not isinstance(key, Node): key = Variable(key) if not isinstance(key, (BNode, Variable)): return key if key not in self._d: # type error: Value of type "Optional[Dict[Variable, Identifier]]" is not indexable # type error: Invalid index type "Union[BNode, Variable]" for "Optional[Dict[Variable, Identifier]]"; expected type "Variable" return self.ctx.initBindings[key] # type: ignore[index] else: return self._d[key] [docs] def project(self, vars: Container[Variable]) -> FrozenBindings: return FrozenBindings(self.ctx, (x for x in self.items() if x[0] in vars)) [docs] def merge(self, other: t.Mapping[Identifier, Identifier]) -> FrozenBindings: res = FrozenBindings(self.ctx, itertools.chain(self.items(), other.items())) return res @property def now(self) -> datetime.datetime: return self.ctx.now @property def bnodes(self) -> t.Mapping[Identifier, BNode]: return self.ctx.bnodes @property def prologue(self) -> Optional[Prologue]: return self.ctx.prologue [docs] def forget( self, before: QueryContext, _except: Optional[Container[Variable]] = None ) -> FrozenBindings: """ return a frozen dict only of bindings made in self since before """ if not _except: _except = [] # bindings from initBindings are newer forgotten return FrozenBindings( self.ctx, ( x for x in self.items() if ( x[0] in _except # type error: Unsupported right operand type for in ("Optional[Dict[Variable, Identifier]]") or x[0] in self.ctx.initBindings # type: ignore[operator] or before[x[0]] is None ) ), ) [docs] def remember(self, these) -> FrozenBindings: """ return a frozen dict only of bindings in these """ return FrozenBindings(self.ctx, (x for x in self.items() if x[0] in these)) [docs]class QueryContext: """ Query context - passed along when evaluating the query """ [docs] def __init__( self, graph: Optional[Graph] = None, bindings: Optional[Union[Bindings, FrozenBindings, List[Any]]] = None, initBindings: Optional[Mapping[str, Identifier]] = None, datasetClause=None, ): self.initBindings = initBindings self.bindings = Bindings(d=bindings or []) if initBindings: self.bindings.update(initBindings) self.graph: Optional[Graph] self._dataset: Optional[Union[Dataset, ConjunctiveGraph]] if isinstance(graph, (Dataset, ConjunctiveGraph)): if datasetClause: self._dataset = Dataset() self.graph = Graph() for d in datasetClause: if d.default: from_graph = graph.get_context(d.default) self.graph += from_graph if not from_graph: self.load(d.default, default=True) elif d.named: namedGraphs = Graph( store=self.dataset.store, identifier=d.named ) from_named_graphs = graph.get_context(d.named) namedGraphs += from_named_graphs if not from_named_graphs: self.load(d.named, default=False) else: self._dataset = graph if rdflib.plugins.sparql.SPARQL_DEFAULT_GRAPH_UNION: self.graph = self.dataset else: self.graph = self.dataset.default_context else: self._dataset = None self.graph = graph self.prologue: Optional[Prologue] = None self._now: Optional[datetime.datetime] = None self.bnodes: t.MutableMapping[Identifier, BNode] = collections.defaultdict( BNode ) @property def now(self) -> datetime.datetime: if self._now is None: self._now = datetime.datetime.now(datetime.timezone.utc) return self._now [docs] def clone( self, bindings: Optional[Union[FrozenBindings, Bindings, List[Any]]] = None ) -> QueryContext: r = QueryContext( self._dataset if self._dataset is not None else self.graph, bindings or self.bindings, initBindings=self.initBindings, ) r.prologue = self.prologue r.graph = self.graph r.bnodes = self.bnodes return r @property def dataset(self) -> ConjunctiveGraph: """ "current dataset""" if self._dataset is None: raise Exception( "You performed a query operation requiring " + "a dataset (i.e. ConjunctiveGraph), but " + "operating currently on a single graph." ) return self._dataset [docs] def load( self, source: URIRef, default: bool = False, into: Optional[Identifier] = None, **kwargs: Any, ) -> None: """ Load data from the source into the query context's. :param source: The source to load from. :param default: If `True`, triples from the source will be added to the default graph, otherwise it will be loaded into a graph with ``source`` URI as its name. :param into: The name of the graph to load the data into. If `None`, the source URI will be used as as the name of the graph. :param kwargs: Keyword arguments to pass to :meth:`rdflib.graph.Graph.parse`. """ def _load(graph, source): try: return graph.parse(source, format="turtle", **kwargs) except Exception: pass try: return graph.parse(source, format="xml", **kwargs) except Exception: pass try: return graph.parse(source, format="n3", **kwargs) except Exception: pass try: return graph.parse(source, format="nt", **kwargs) except Exception: raise Exception( "Could not load %s as either RDF/XML, N3 or NTriples" % source ) if not rdflib.plugins.sparql.SPARQL_LOAD_GRAPHS: # we are not loading - if we already know the graph # being "loaded", just add it to the default-graph if default: # Unsupported left operand type for + ("None") self.graph += self.dataset.get_context(source) # type: ignore[operator] else: if default: _load(self.graph, source) else: if into is None: into = source _load(self.dataset.get_context(into), source) [docs] def __getitem__(self, key: Union[str, Path]) -> Optional[Union[str, Path]]: # in SPARQL BNodes are just labels if not isinstance(key, (BNode, Variable)): return key try: return self.bindings[key] except KeyError: return None [docs] def get(self, key: str, default: Optional[Any] = None) -> Any: try: return self[key] except KeyError: return default [docs] def solution(self, vars: Optional[Iterable[Variable]] = None) -> FrozenBindings: """ Return a static copy of the current variable bindings as dict """ if vars: return FrozenBindings( self, ((k, v) for k, v in self.bindings.items() if k in vars) ) else: return FrozenBindings(self, self.bindings.items()) [docs] def __setitem__(self, key: str, value: str) -> None: if key in self.bindings and self.bindings[key] != value: raise AlreadyBound() self.bindings[key] = value [docs] def pushGraph(self, graph: Optional[Graph]) -> QueryContext: r = self.clone() r.graph = graph return r [docs] def push(self) -> QueryContext: r = self.clone(Bindings(self.bindings)) return r [docs] def clean(self) -> QueryContext: return self.clone([]) [docs] def thaw(self, frozenbindings: FrozenBindings) -> QueryContext: """ Create a new read/write query context from the given solution """ c = self.clone(frozenbindings) return c [docs]class Prologue: """ A class for holding prefixing bindings and base URI information """ [docs] def __init__(self) -> None: self.base: Optional[str] = None self.namespace_manager = NamespaceManager(Graph()) # ns man needs a store [docs] def resolvePName(self, prefix: Optional[str], localname: Optional[str]) -> URIRef: ns = self.namespace_manager.store.namespace(prefix or "") if ns is None: raise Exception("Unknown namespace prefix : %s" % prefix) return URIRef(ns + (localname or "")) [docs] def bind(self, prefix: Optional[str], uri: Any) -> None: self.namespace_manager.bind(prefix, uri, replace=True) [docs] def absolutize( self, iri: Optional[Union[CompValue, str]] ) -> Optional[Union[CompValue, str]]: """ Apply BASE / PREFIXes to URIs (and to datatypes in Literals) TODO: Move resolving URIs to pre-processing """ if isinstance(iri, CompValue): if iri.name == "pname": return self.resolvePName(iri.prefix, iri.localname) if iri.name == "literal": # type error: Argument "datatype" to "Literal" has incompatible type "Union[CompValue, Identifier, None]"; expected "Optional[str]" return Literal( iri.string, lang=iri.lang, datatype=self.absolutize(iri.datatype) # type: ignore[arg-type] ) elif isinstance(iri, URIRef) and not ":" in iri: # noqa: E713 return URIRef(iri, base=self.base) return iri [docs]class Query: """ A parsed and translated query """ [docs] def __init__(self, prologue: Prologue, algebra: CompValue): self.prologue = prologue self.algebra = algebra self._original_args: Tuple[str, Mapping[str, str], Optional[str]] [docs]class Update: """ A parsed and translated update """ [docs] def __init__(self, prologue: Prologue, algebra: List[CompValue]): self.prologue = prologue self.algebra = algebra self._original_args: Tuple[str, Mapping[str, str], Optional[str]]
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4