Source code for pyldapi_client.py35

# -*- coding: utf-8 -*-
#
import asyncio
from abc import ABCMeta, abstractmethod

import aiohttp
import requests
import json
from pyldapi_client.functions import *
try:
    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
    pass


class LoadedRegister(object):
    """
    TODO: Ashley
    """
    __slots__ = ('uri', 'item_classes', 'payload')

    def __init__(self, uri, item_classes=None, payload=None):
        """
        TODO: Ashley

        :param uri:
        :type uri:

        :param item_classes:
        :type item_classes:

        :param payload:
        :type payload:
        """
        self.uri = uri
        self.item_classes = item_classes or []
        self.payload = payload

    def get_current_page_details(self):
        """
        TODO: Ashley

        :return:
        :rtype:
        """
        page = 1
        per_page = 100
        first = 1
        last = 1
        if not self.payload:
            return page, per_page, first, last
        for p in self.payload:
            if "@type" in p and ldp_Page in p['@type']:
                page_string = str(p['@id'])
                ex_page = extract_page_from_string(page_string)
                ex_per_page = extract_per_page_from_string(page_string)
                page = ex_page or page
                per_page = ex_per_page or per_page
                if vocab_first in p and len(p[vocab_first]) > 0:
                    page_string = p[vocab_first][0]['@id']
                    ex_first_page = extract_page_from_string(page_string)
                    first = ex_first_page or first
                if vocab_last in p and len(p[vocab_last]) > 0:
                    page_string = p[vocab_last][0]['@id']
                    ex_last_page = extract_page_from_string(page_string)
                    last = ex_last_page or last
                break
        return page, per_page, first, last

    def make_instance_uri(self, identifier):
        """
        TODO: Ashley

        :param identifier:
        :type identifier:

        :return:
        :rtype:
        """
        if identifier.startswith("http:") or identifier.startswith("https:"):
            pass
        else:
            identifier = "/".join([self.uri.rstrip('/'), identifier])
        return identifier

    def filter_index(self, payload):
        """
        TODO: Ashley

        :param payload:
        :type payload:

        :return:
        :rtype:
        """
        index = {}
        for p in payload:
            if "@id" not in p:
                continue
            if "@type" not in p:
                continue
            contained = False
            for t in p['@type']:
                if t in self.item_classes:
                    contained = True
                    break
            if not contained:
                continue
            index[p["@id"]] = p
        return index


class AbstractBoundIndexPage(object, metaclass=ABCMeta):
    """
    TODO: Ashley
    """
    __slots__ = ('register', 'index', 'page', 'per_page', 'first', 'last')

    def __init__(self, register, index, page, per_page, first, last):
        """
        TODO: Ashley

        :param register:
        :type register:

        :param index:
        :type index:

        :param page:
        :type page:

        :param per_page:
        :type per_page:

        :param first:
        :type first:

        :param last:
        :type last:
        """
        self.register = register
        self.index = index
        self.page = page
        self.per_page = per_page
        self.first = first
        self.last = last

    def items(self):
        """
        TODO: Ashley

        :return:
        :rtype:
        """
        for k, i in self.index.items():
            yield k, i



class BoundIndexPage(AbstractBoundIndexPage):
    """
    TODO: Ashley
    """
    __slots__ = tuple()

    def prev_page(self):
        """
        TODO: Ashley

        :return:
        :rtype:
        """
        if self.page == 1 or self.page <= self.first:
            return None
        index = self.register.index_page(self.page - 1, self.per_page)
        return index

    def next_page(self):
        """
        TODO: Ashley

        :return:
        :rtype:
        """
        if self.page >= self.last:
            return None
        index = self.register.index_page(self.page + 1, self.per_page)
        return index



class AsyncBoundIndexPage(AbstractBoundIndexPage):
    """
    TODO: Ashley
    """
    __slots__ = tuple()

    async def prev_page(self):
        """
        TODO: Ashley

        :return:
        :rtype:
        """
        if self.page == 1 or self.page <= self.first:
            return None
        index = await self.register.index_page(self.page - 1, self.per_page)
        return index

    async def next_page(self):
        """
        TODO: Ashley

        :return:
        :rtype:
        """
        if self.page >= self.last:
            return None
        index = await self.register.index_page(self.page + 1, self.per_page)
        return index

class AbstractBoundRegister(object, metaclass=ABCMeta):
    """
    TODO: Ashley
    """
    __slots__ = ('register', 'client')

    def __init__(self, register, client):
        """
        TODO: Ashley

        :param register:
        :type register:

        :param client:
        :type client:
        """
        self.register = register
        self.client = client


class AsyncBoundRegister(AbstractBoundRegister):
    """
    TODO: Ashley
    """
    __slots__ = tuple()

    async def index(self, offset=None, min_count=None):
        """
        Gets all of the IDs of instances on this register.

        Note: this can take a long time for a large dataset

        :return:
        :rtype: dict
        """
        current_page, current_per_page, first, last =\
            self.register.get_current_page_details()
        if last < first:
            last = first
        if last == first and offset is None:
            page = await self.index_page(first)
            return page.index
        if offset and offset > 0:
            offset_pages = offset // current_per_page
            offset_entries = offset - (offset_pages * current_per_page)
            first = first+offset_pages
            last = last+offset_pages
        else:
            offset_entries = 0
        # plus 1 because first and last are inclusive.
        total_pages = (last - first) + 1
        if total_pages < 1:
            total_pages = 1
        chunks = total_pages // 8
        if chunks < 1:
            chunks = 1
        elif (total_pages % 8) > 0:
            chunks += 1
        index = {}
        for c in range(chunks):
            jobs = []
            page_offset = c*8
            for p in range(page_offset+first, page_offset+first+8):
                page_job = self.index_page(page=p)
                jobs.append(page_job)
            awaitable = asyncio.gather(*jobs, return_exceptions=True)
            pages = await awaitable
            for p in pages:
                if p is None:
                    continue
                elif isinstance(p, Exception):
                    print(p)
                    continue
                if offset_entries:
                    skip_entries = sorted(p.index.keys())[:offset_entries]
                    for s in skip_entries:
                        _ = p.index.pop(s)
                    offset_entries = 0
                try:
                    index.update(p.index)
                except Exception as e:
                    print(e)
                    continue
            if min_count is not None and len(index) >= min_count:
                break
        return index

    async def instances(self, index=None, min_count=None):
        """
        Gets all of the instances on this register.

        Note: this can take a *very* long time for a large dataset.

        :return:
        :rtype: dict
        """
        if index is None:
            index = await self.index()
        if isinstance(index, dict):
            index = tuple(index.keys())
        instance_count = len(index)
        chunks = instance_count // 8
        if chunks < 1:
            chunks = 1
        elif (instance_count % 8) > 0:
            chunks += 1
        ret_dict = {}

        async def _get_instance_for_key(_instance_uri):
            nonlocal self
            _instance = await self.instance(_instance_uri)
            return _instance_uri, _instance

        for c in range(chunks):
            jobs = []
            for p in range(0, 8):
                _offset = (c*8)+p
                if _offset >= instance_count:
                    break
                instance_uri = index[_offset]
                instance_job = _get_instance_for_key(instance_uri)
                jobs.append(instance_job)
            awaitable = asyncio.gather(*jobs, return_exceptions=True)
            completed_jobs = await awaitable
            for completed_job in completed_jobs:
                identifier, instance = completed_job
                if instance is None:
                    continue
                elif isinstance(instance, Exception):
                    print(instance)
                    continue
                try:
                    ret_dict[identifier] = instance
                except Exception as e:
                    print(e)
                    continue
            if min_count is not None and len(ret_dict) >= min_count:
                break
        return ret_dict

    async def index_page(self, page=None, per_page=None):
        """
        TODO: Ashley

        :param page:
        :type page:

        :param per_page:
        :type per_page:

        :return:
        :rtype:
        """
        current_page, current_per_page, first, last = self.register.get_current_page_details()
        if page is None:
            page = current_page or 1
        if per_page is None:
            per_page = current_per_page or 100
        first = first or 1
        last = last or 1
        if page < 1:
            raise RuntimeError("Cannot execute an index call to register page less-than 1.")
        if per_page < 1:
            raise RuntimeError("Cannot execute an index call to register with items-per-page less-than 1.")
        if page == current_page and per_page == current_per_page:
            payload = self.register.payload
        else:
            payload = await self.client._get_register_index(self.register.uri, page, per_page)
            if not payload:
                return None
            self.register.payload = payload
            current_page, current_per_page, first, last = self.register.get_current_page_details()
        index = self.register.filter_index(payload)
        return AsyncBoundIndexPage(self, index, current_page, current_per_page, first, last)

    async def instance(self, identifier):
        """
        TODO: Ashley

        :param identifier:
        :type identifier:

        :return:
        :rtype:
        """
        id_uri = self.register.make_instance_uri(identifier)
        resp = await self.client._get_register_instance(self.register.uri, id_uri)
        return resp


class BoundRegister(AbstractBoundRegister):
    """
    TODO: Ashley
    """
    __slots__ = tuple()

    def _index_threaded(self, first, last, offset_entries, min_count):
        """
        Gets all of the ids of instances on this register.

        Note: this can take a long time for a large dataset.

        :return:
        :rtype: dict
        """
        # plus 1 because first and last are inclusive.
        num_threads = int(self.client.threads)
        total_pages = (last - first) + 1
        if total_pages < 1:
            total_pages = 1
        chunks = total_pages // num_threads
        if chunks < 1:
            chunks = 1
        elif (total_pages % num_threads) > 0:
            chunks += 1
        index = {}
        import threading
        pages = {}

        def _thread_job(i, p):
            nonlocal self
            nonlocal pages
            try:
                result = self.index_page(page=p)
                pages[i] = result
            except Exception as e:
                pages[i] = e

        for c in range(chunks):
            jobs = []
            c_page_offset = c*8
            pages = {}
            for i, p in enumerate(range(c_page_offset+first, c_page_offset+first+num_threads)):
                page_job = threading.Thread(target=_thread_job, args=(i, p))
                page_job.start()
                jobs.append(page_job)
            for j in jobs:
                try:
                    j.join()
                except Exception:
                    pass
            for i, p in pages.items():
                if p is None:
                    continue
                elif isinstance(p, Exception):
                    print(p)
                    continue
                if offset_entries:
                    skip_entries = sorted(p.index.keys())[:offset_entries]
                    for s in skip_entries:
                        _ = p.index.pop(s)
                    offset_entries = 0
                try:
                    index.update(p.index)
                except Exception as e:
                    print(e)
                    continue
            if min_count is not None and len(index) >= min_count:
                break
        return index

    def index(self, offset=None, min_count=None):
        """
        Gets all of the ids of instances on this register

        Note: this can take a long time for a large dataset

        :return:
        :rtype: dict
        """

        current_page, current_per_page, first, last =\
            self.register.get_current_page_details()
        if last < first:
            last = first
        if last == first and offset is None:
            return self.index_page(first).index
        if offset and offset > 0:
            offset_pages = offset // current_per_page
            offset_entries = offset - (offset_pages * current_per_page)
            first = first+offset_pages
            last = last+offset_pages
        else:
            offset_entries = 0
        if self.client.threads and self.client.threads > 1:
            return self._index_threaded(first, last, offset_entries, min_count)
        index = {}
        for p in range(first, last+1):
            page = self.index_page(page=p)
            if page is None:
                continue
            elif isinstance(page, Exception):
                print(page)
                continue
            if offset_entries:
                skip_entries = sorted(page.index.keys())[:offset_entries]
                for s in skip_entries:
                    _ = page.index.pop(s)
                offset_entries = 0
            try:
                index.update(page.index)
            except Exception as e:
                print(e)
                continue
            if min_count is not None and len(index) >= min_count:
                break
        return index

    def instances(self, index=None, min_count=None):
        """
        Gets all of the instances on this register.
        Note: this can take a *very* long time for a large dataset.

        :return:
        :rtype: dict
        """
        if index is None:
            index = self.index()
        if isinstance(index, dict):
            index = tuple(index.keys())
        if self.client.threads and self.client.threads > 1:
            return self._instances_threaded(index, min_count)
        instance_count = len(index)
        ret_dict = {}

        for p in range(0, instance_count):
            instance_uri = index[p]
            instance = self.instance(instance_uri)
            if instance is None:
                continue
            elif isinstance(instance, Exception):
                print(instance)
                continue
            try:
                ret_dict[instance_uri] = instance
            except Exception as e:
                print(e)
                continue
            if min_count is not None and len(ret_dict) >= min_count:
                break
        return ret_dict

    def _instances_threaded(self, index, min_count):
        """
        Gets all of the instances on this register.
        Note: this can take a *very* long time for a large dataset.

        :return:
        :rtype: dict
        """
        num_threads = int(self.client.threads)
        if isinstance(index, dict):
            index = tuple(index.keys())
        instance_count = len(index)
        chunks = instance_count // num_threads
        if chunks < 1:
            chunks = 1
        elif (instance_count % num_threads) > 0:
            chunks += 1

        ret_dict = {}
        import threading
        instances = {}
        def _get_instance_for_key(i, _instance_uri):
            nonlocal self
            nonlocal instances
            try:
                _instance = self.instance(_instance_uri)
                instances[_instance_uri] = _instance
            except Exception as e:
                instances[_instance_uri] = e

        for c in range(chunks):
            jobs = []
            instances = {}
            for i, p in enumerate(range(0, num_threads)):
                _offset = (c*8)+p
                if _offset >= instance_count:
                    break
                instance_uri = index[_offset]
                instance_job = threading.Thread(target=_get_instance_for_key, args=(i, instance_uri))
                instance_job.start()
                jobs.append(instance_job)
            for j in jobs:
                try:
                    j.join()
                except Exception:
                    pass
            for identifier, instance in instances.items():
                if instance is None:
                    continue
                elif isinstance(instance, Exception):
                    print(instance)
                    continue
                try:
                    ret_dict[identifier] = instance
                except Exception as e:
                    print(e)
                    continue
            if min_count is not None and len(ret_dict) >= min_count:
                break
        return ret_dict

    def index_page(self, page=None, per_page=None):
        """
        TODO: Ashley

        :param page:
        :type page:

        :param per_page:
        :type per_page:

        :return:
        :rtype:
        """
        current_page, current_per_page, first, last = self.register.get_current_page_details()
        if page is None:
            page = current_page or 1
        if per_page is None:
            per_page = current_per_page or 100
        first = first or 1
        last = last or 1
        if page < 1:
            raise RuntimeError("Cannot execute an index call to register page less-than 1.")
        if per_page < 1:
            raise RuntimeError("Cannot execute an index call to register with items-per-page less-than 1.")
        if page == current_page and per_page == current_per_page:
            payload = self.register.payload
        else:
            payload = self.client._get_register_index(self.register.uri, page, per_page)
            if not payload:
                return None
            self.register.payload = payload
            current_page, current_per_page, first, last = self.register.get_current_page_details()
        index = self.register.filter_index(payload)
        return BoundIndexPage(self, index, current_page, current_per_page, first, last)

    def instance(self, identifier):
        """
        TODO: Ashley

        :param identifier:
        :type identifier:

        :return:
        :rtype:
        """
        uri = self.register.make_instance_uri(identifier)
        resp = self.client._get_register_instance(self.register.uri, uri)
        return resp


class AbstractLDAPIClient(object, metaclass=ABCMeta):
    """
    TODO: Ashley
    """
    __slots__ = ('base_uri', 'url_remapper', '_registers', 'session')

    def __init__(self, base_uri, *args, url_remapper=None, **kwargs):
        """
        TODO: Ashley

        :param base_uri:
        :type base_uri:

        :param args:
        :type args:

        :param url_remapper:
        :type url_remapper:

        :param kwargs:
        :type kwargs:
        """
        self.base_uri = base_uri
        self.url_remapper = url_remapper
        self._registers = {}
        self.session = requests.Session()
        self._populate_registers()

    @abstractmethod
    def _populate_registers(self):
        """
        TODO: Ashley

        :return:
        """
        pass

    def _remap_url(self, url):
        """
        TODO: Ashley

        :param url:
        :type url:

        :return:
        """
        if self.url_remapper is None:
            return url
        for u, v in self.url_remapper.items():
            if url.startswith(u):
                url = url.replace(u, v)
                break
        return url

    @abstractmethod
    def register(self, reg_uri):
        """
        TODO: Ashley

        :param reg_uri:
        :type reg_uri:

        :return:
        :rtype:
        """
        pass


[docs]class LDAPIClient(AbstractLDAPIClient): """ TODO: Ashley """ __slots__ = ('threads',) def __new__(cls, *args, asynchronous=False, **kwargs): """ TODO: Ashley :param args: :type args: :param asynchronous: :type asynchronous: :param kwargs: :type kwargs: :return: :rtype: """ if asynchronous: return AsyncLDAPIClient(*args, **kwargs) self = super(LDAPIClient, cls).__new__(cls) return self def __init__(self, base_uri, *args, url_remapper=None, threads=1, **kwargs): """ :param base_uri: :type base_uri: :param args: :type args: :param url_remapper: :type url_remapper: :param threads: :type threads: :param kwargs: :type kwargs: """ super(LDAPIClient, self).__init__( base_uri, *args, url_remapper=url_remapper, **kwargs) self.threads = threads
[docs] def register(self, reg_uri): """ TODO: Ashley :param reg_uri: :type reg_uri: :return: :rtype: """ try: register = self._registers[reg_uri] except KeyError as k: raise ValueError(*k.args) return BoundRegister(register, client=self)
def _get_register_instance(self, register_uri, instance_uri): """ TODO: Ashley :param register_uri: :type register_uri: :param instance_uri: :type instance_uri: :return: :rtype: """ headers = { "Accept": "application/ld+json", "Accept-Profile": "http://purl.org/linked-data/registry" } url = self._remap_url(instance_uri) resp = self.session.get(url, headers=headers) if resp.status_code == 404: raise RuntimeError((404, instance_uri)) elif resp.status_code == 500: raise RuntimeError((500, instance_uri)) if resp.status_code != 200: return resp.status_code text = resp.text payload = json.loads(text) return payload def _get_register_index(self, register_uri, page=1, per_page=100): """ TODO: Ashley :param register_uri: :type register_uri: :param page: :type page: :param per_page: :type per_page: :return: :rtype: """ headers = { "Accept": "application/ld+json", "Accept-Profile": "http://purl.org/linked-data/registry" } url = self._remap_url(register_uri) resp = self.session.get( url, headers=headers, params={'page': page, 'per_page': per_page}, timeout=900 ) if resp.status_code != 200: return None text = resp.text payload = json.loads(text) return payload def _populate_registers(self): """ TODO: Ashley :return: :rtype: """ headers = { "Accept": "application/ld+json", "Accept-Profile": "http://purl.org/linked-data/registry" } url = self._remap_url(self.base_uri) response = self.session.get(url, headers=headers) if response.status_code != 200: raise RuntimeError("Cannot get base register.") text = response.text json_struct = json.loads(text) registers = find_registers_from_ld_payload(self.base_uri, json_struct, LoadedRegister) first_registers = list(registers.keys()) for uri in first_registers: r = registers[uri] if not r.payload: url = self._remap_url(uri) response = self.session.get(url, headers=headers) if response.status_code != 200: raise RuntimeError("Cannot get linked register: {}".format(uri)) text = response.text json_struct = json.loads(text) new_registers = find_registers_from_ld_payload(uri, json_struct, LoadedRegister) registers.update(new_registers) self._registers = registers
[docs] def close(self): self.session.close()
class AsyncLDAPIClient(AbstractLDAPIClient): """ TODO: Ashley """ __slots__ = ('_loop',) def __new__(cls, *args, loop=None, **kwargs): """ TODO: Ashley :param args: :type args: :param loop: :type loop: :param kwargs: :type kwargs: :return: :rtype: """ if loop is None: loop = asyncio.get_event_loop() self = super(AsyncLDAPIClient, cls).__new__(cls) self._loop = loop asyncio.set_event_loop(loop) return self.__async_init__(*args, **kwargs) def __init__(self, *args, **kwargs): """ TODO: Ashley :param args: :type args: :param kwargs: :type kwargs: """ # deliberately don't call super init here, because we use an async init object.__init__(self) def register(self, reg_uri): """ TODO: Ashley :param reg_uri: :type reg_uri: :return: :rtype: """ try: register = self._registers[reg_uri] except KeyError as k: raise ValueError(*k.args) return AsyncBoundRegister(register, client=self) async def _get_register_instance(self, register_uri, instance_uri): """ TODO: Ashley :param register_uri: :type register_uri: :param instance_uri: :type instance_uri: :return: :rtype: """ headers = { "Accept": "application/ld+json", "Accept-Profile": "http://purl.org/linked-data/registry" } url = self._remap_url(instance_uri) resp = await self.session.get(url, headers=headers) if resp.status != 200: return resp.status text = await resp.text() payload = json.loads(text) return payload async def _get_register_index(self, register_uri, page=1, per_page=100): """ TODO: Ashley :param register_uri: :type register_uri: :param page: :type page: :param per_page: :type per_page: :return: :rtype: """ headers = { "Accept": "application/ld+json", "Accept-Profile": "http://purl.org/linked-data/registry" } url = self._remap_url(register_uri) resp = await self.session.get( url, headers=headers, params={'page': page, 'per_page': per_page}, timeout=900 ) if resp.status != 200: return None text = await resp.text() payload = json.loads(text) return payload async def _populate_registers(self): """ TODO: Ashley :return: :rtype: """ headers = { "Accept": "application/ld+json", "Accept-Profile": "http://purl.org/linked-data/registry" } url = self._remap_url(self.base_uri) response = await self.session.get(url, headers=headers) if response.status != 200: raise RuntimeError("Cannot get base register.") text = await response.text() json_struct = json.loads(text) registers = find_registers_from_ld_payload(self.base_uri, json_struct, LoadedRegister) first_registers = list(registers.keys()) for uri in first_registers: r = registers[uri] if not r.payload: url = self._remap_url(uri) response = await self.session.get(url, headers=headers, params={"per_page": 1}) if response.status != 200: raise RuntimeError("Cannot get linked register: {}".format(uri)) text = await response.text() json_struct = json.loads(text) new_registers = find_registers_from_ld_payload(uri, json_struct, LoadedRegister) registers.update(new_registers) self._registers = registers async def __async_init__(self, base_uri, *args, url_remapper=None, **kwargs): """ TODO: Ashley :param base_uri: :type base_uri: :param args: :type args: :param url_remapper: :type url_remapper: :param kwargs: :type kwargs: :return: :rtype: """ self.base_uri = base_uri self.url_remapper = url_remapper self._registers = {} self.session = aiohttp.ClientSession() await self._populate_registers() return self async def close(self): """ TODO: Ashley :return: :rtype: """ r = await self.session.close() return r @property def loop(self): """ TODO: Ashley :return: :rtype: """ return self._loop __all__ = ['LDAPIClient']