Source code for rdfframework.utilities.frameworkutilities

__author__ = "Mike Stabile, Jeremy Nelson"

import os
import re
import copy
from base64 import b64encode
import datetime
import requests
from flask import current_app, json
from jinja2 import Template, Environment, FileSystemLoader
from rdflib import Namespace, XSD
from dateutil.parser import parse


DC = Namespace("http://purl.org/dc/elements/1.1/")
DCTERMS = Namespace("http://purl.org/dc/terms/")
DOAP = Namespace("http://usefulinc.com/ns/doap#")
FOAF = Namespace("http://xmlns.com/foaf/spec/")
SKOS = Namespace("http://www.w3.org/2004/02/skos/core#")
DEBUG = True

FRAMEWORK_BASE = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
if not os.path.exists(FRAMEWORK_BASE):
    #! Quick hack to get running on Docker container -- jpn 2016-03-08
    FRAMEWORK_BASE = "/opt/intro2libsys/ebadges/rdfframework/rdfframework"
JSON_LOCATION = os.path.join(FRAMEWORK_BASE, "json-definitions")

ENV = Environment(loader=FileSystemLoader(
    [os.path.join(FRAMEWORK_BASE, "sparql"),
     os.path.join(FRAMEWORK_BASE, "turtle")]))

[docs]def nz(value, none_value, strict=True): ''' This function is named after an old VBA function. It returns a default value if the passed in value is None. If strict is False it will treat an empty string as None as well. example: x = None nz(x,"hello") --> "hello" nz(x,"") --> "" y = "" nz(y,"hello") --> "" nz(y,"hello", False) --> "hello" ''' if not DEBUG: debug = False else: debug = False if debug: print("START nz frameworkutilities.py ----------------------\n") if value is None and strict: return_val = none_value elif strict and value is not None: return_val = value elif not strict and not is_not_null(value): return_val = none_value else: return_val = value if debug: print("value: %s | none_value: %s | return_val: %s" % (value, none_value, return_val)) if debug: print("END nz frameworkutilities.py ----------------------\n") return return_val
[docs]def render_without_request(template_name, **template_vars): """ Usage is the same as flask.render_template: render_without_request('my_template.html', var1='foo', var2='bar') """ template = ENV.get_template(template_name) return template.render(**template_vars)
[docs]def cbool(value, strict=True): ''' converts a value to true or false. Python's default bool() function does not handle 'true' of 'false' strings ''' return_val = value if is_not_null(value): if isinstance(value, bool): return_val = value elif isinstance(value, str): if value.lower() in ['true', '1', 't', 'y', 'yes']: return_val = True elif value.lower() in ['false', '0', 'n', 'no']: return_val = False else: if strict: return_val = None else: if strict: return_val = None return return_val
[docs]def iri(uri_string): "converts a string to an IRI or returns an IRI if already formated" if uri_string[:1] == "?": return uri_string if uri_string[:1] == "[": return uri_string if uri_string[:1] != "<": uri_string = "<{}".format(uri_string.strip()) if uri_string[len(uri_string)-1:] != ">": uri_string = "{}>".format(uri_string.strip()) return uri_string
[docs]def is_not_null(value): ''' test for None and empty string ''' return value is not None and len(str(value)) > 0
[docs]def is_valid_object(uri_string): '''Test to see if the string is a object store''' uri_string = uri_string return True
[docs]def make_list(value): ''' Takes a value and turns it into a list if it is not one !!!!! This is important becouse list(value) if perfomed on an dictionary will return the keys of the dictionary in a list and not the dictionay as an element in the list. i.e. x = {"first":1, "second":2} list(x) = ["first", "second"] or use this [x,] make_list(x) =[{"first":1, "second":2}] ''' if not isinstance(value, list): value = [value] return value
[docs]def make_set(value): ''' Takes a value and turns it into a set !!!! This is important because set(string) will parse a string to individual characters vs. adding the string as an element of the set i.e. x = 'setvalue' set(x) = {'t', 'a', 'e', 'v', 'u', 's', 'l'} make_set(x) = {'setvalue'} or use set([x,]) by adding string as first item in list. ''' _return_set = set() if isinstance(value, list): for i in value: _return_set.add(i) elif isinstance(value, set): _return_set = value else: _return_set.add(value) return _return_set
[docs]def uid_to_repo_uri(id_value): if id_value: _uri = "{}/{}/{}/{}/{}/{}".format(fw_config().get('REPOSITORY_URL'), id_value[:2], id_value[2:4], id_value[4:6], id_value[6:8], id_value) return _uri
[docs]def fw_config(**kwargs): ''' function returns the application configuration information ''' global FRAMEWORK_CONFIG try: FRAMEWORK_CONFIG except NameError: FRAMEWORK_CONFIG = None if FRAMEWORK_CONFIG is None: if kwargs.get("config"): config = kwargs.get("config") else: try: config = current_app.config except: config = None if not config is None: FRAMEWORK_CONFIG = config else: print("framework not initialized") return "framework not initialized" return FRAMEWORK_CONFIG
[docs]def make_triple(sub, pred, obj): """Takes a subject predicate and object and joins them with a space in between Args: sub -- Subject pred -- Predicate obj -- Object Returns str """ return "{s} {p} {o} .".format(s=sub, p=pred, o=obj)
[docs]def xsd_to_python(value, data_type, rdf_type="literal", output="python"): ''' This will take a value and xsd data_type and convert it to a python variable''' from rdfframework import get_framework as rdfw if data_type: data_type = data_type.replace(str(XSD), "") if not value: return value elif rdf_type == "uri": return iri(value) elif not is_not_null(value): return value elif data_type == "xsd_anyURI": # URI (Uniform Resource Identifier) return value elif data_type == "xsd_base64Binary": # Binary content coded as "base64" return value.decode() elif data_type == "xsd_boolean": # Boolean (true or false) return cbool(value) elif data_type == "xsd_byte": # Signed value of 8 bits return value.decode() elif data_type == "xsd_date": ## Gregorian calendar date _temp_value = parse(value) if output == "string": _date_format = rdfw().app['kds_dataFormats'].get(\ 'kds_pythonDateFormat', '') return _temp_value.strftime(_date_format) elif output == "python": return _temp_value elif data_type == "xsd_dateTime": ## Instant of time (Gregorian calendar) _temp_value = parse(value) if output == "string": _date_format = rdfw().app['kds_dataFormats'].get(\ 'kds_pythonDateTimeFormat', '') return _temp_value.strftime(_date_format) elif output == "python": return _temp_value elif data_type == "xsd_decimal": # Decimal numbers return float(value) elif data_type == "xsd_double": # IEEE 64 return float(value) elif data_type == "xsd_duration": # Time durations return timedelta(milleseconds=float(value)) elif data_type == "xsd_ENTITIES": # Whitespace return value elif data_type == "xsd_ENTITY": # Reference to an unparsed entity return value elif data_type == "xsd_float": # IEEE 32 return float(value) elif data_type == "xsd_gDay": # Recurring period of time: monthly day return value elif data_type == "xsd_gMonth": # Recurring period of time: yearly month return value elif data_type == "xsd_gMonthDay": # Recurring period of time: yearly day return value elif data_type == "xsd_gYear": # Period of one year return value elif data_type == "xsd_gYearMonth": # Period of one month return value elif data_type == "xsd_hexBinary": # Binary contents coded in hexadecimal return value elif data_type == "xsd_ID": # Definition of unique identifiers return value elif data_type == "xsd_IDREF": # Definition of references to unique identifiers return value elif data_type == "xsd_IDREFS": # Definition of lists of references to unique identifiers return value elif data_type == "xsd_int": # 32 return value elif data_type == "xsd_integer": # Signed integers of arbitrary length return int(value) elif data_type == "xsd_language": # RFC 1766 language codes return value elif data_type == "xsd_long": # 64 return int(value) elif data_type == "xsd_Name": # XML 1.O name return value elif data_type == "xsd_NCName": # Unqualified names return value elif data_type == "xsd_negativeInteger": # Strictly negative integers of arbitrary length return abs(int(value))*-1 elif data_type == "xsd_NMTOKEN": # XML 1.0 name token (NMTOKEN) return value elif data_type == "xsd_NMTOKENS": # List of XML 1.0 name tokens (NMTOKEN) return value elif data_type == "xsd_nonNegativeInteger": # Integers of arbitrary length positive or equal to zero return abs(int(value)) elif data_type == "xsd_nonPositiveInteger": # Integers of arbitrary length negative or equal to zero return abs(int(value))*-1 elif data_type == "xsd_normalizedString": # Whitespace return value elif data_type == "xsd_NOTATION": # Emulation of the XML 1.0 feature return value elif data_type == "xsd_positiveInteger": # Strictly positive integers of arbitrary length return abs(int(value)) elif data_type == "xsd_QName": # Namespaces in XML return value elif data_type == "xsd_short": # 32 return value elif data_type == "xsd_string": # Any string return value elif data_type == "xsd_time": # Point in time recurring each day return parse(value) elif data_type == "xsd_token": # Whitespace return value elif data_type == "xsd_unsignedByte": # Unsigned value of 8 bits return value.decode() elif data_type == "xsd_unsignedInt": # Unsigned integer of 32 bits return int(value) elif data_type == "xsd_unsignedLong": # Unsigned integer of 64 bits return int(value) elif data_type == "xsd_unsignedShort": # Unsigned integer of 16 bits return int(value) else: return value
[docs]def convert_spo_to_dict(data, mode="subject", option="string"): '''Takes the SPAQRL query results and converts them to a python Dict mode: subject --> groups based on subject ''' if data is None: return None _return_obj = {} _list_obj = False if mode == "subject": for item in data: # determine data is list of objects _sv = item['s']['value'] _pv = item['p']['value'] if item.get('itemID'): _list_obj = True _iv = item['itemID']['value'] if _return_obj.get(_iv): if _return_obj[_iv].get(_sv): if _return_obj[_iv][_sv].get(_pv): _obj_list = make_list(\ _return_obj[_iv][_sv][_pv]) _obj_list.append(\ xsd_to_python(item['o']['value'], \ item['o'].get("datatype"), \ item['o']['type'], option)) _return_obj[_iv][_sv][_pv] = _obj_list else: _return_obj[_iv][_sv][_pv] = \ xsd_to_python(item['o']['value'], item['o'].get(\ "datatype"), item['o']['type'], option) else: _return_obj[_iv][_sv] = {} _return_obj[_iv][_sv][_pv] = \ xsd_to_python(item['o']['value'], item['o'].get(\ "datatype"), item['o']['type'], option) else: _return_obj[_iv] = {} _return_obj[_iv][_sv] = {} _return_obj[_iv][_sv][_pv] = \ xsd_to_python(item['o']['value'], item['o'].get(\ "datatype"), item['o']['type'], option) # if not a list of objects else: if _return_obj.get(_sv): if _return_obj[_sv].get(_pv): _obj_list = make_list(\ _return_obj[_sv][_pv]) _obj_list.append(xsd_to_python(item['o']['value'], \ item['o'].get("datatype"), item['o']['type'], option)) _return_obj[_sv][_pv] = _obj_list else: _return_obj[_sv][_pv] = \ xsd_to_python(item['o']['value'], item['o'].get(\ "datatype"), item['o']['type'], option) else: _return_obj[_sv] = {} _return_obj[_sv][_pv] = \ xsd_to_python(item['o']['value'], item['o'].get(\ "datatype"), item['o']['type'], option) if _list_obj: _return_list = [] for _key, _value in _return_obj.items(): _value[_key]["subjectUri"] = _key _return_list.append(_value) return _return_list else: return _return_obj
[docs]def remove_null(obj): ''' reads through a list or set and strips any null values''' if isinstance(obj, set): try: obj.remove(None) except: pass elif isinstance(obj, list): for item in obj: if not is_not_null(item): obj.remove(item) return obj
[docs]class DeleteProperty(object): ''' dummy class for tagging items to be deleted. This will prevent passed in data ever being confused with marking a property for deletion. ''' def __init__(self): setattr(self, "delete", True)
[docs]class NotInFormClass(object): ''' dummy class for tagging properties that were never in a form. This will prevent passed in data ever being confused with a property that was never in the form. ''' def __init__(self): setattr(self, "notInForm", True)
[docs]def slugify(value): """Converts to lowercase, removes non-word characters (alphanumerics and underscores) and converts spaces to hyphens. Also strips leading and trailing whitespace using Django format Args: """ value = re.sub(r'[^\w\s-]', '', value).strip().lower() return re.sub(r'[-\s]+', '-', value)
[docs]def get_app_ns_uri(value): ''' looks in the framework for the namespace uri''' for _ns in get_framework().rdf_app_dict['application'].get(\ "appNameSpace", []): if _ns.get('prefix') == value: return _ns.get('nameSpaceUri')
[docs]def clean_iri(uri_string): '''removes the <> signs from a string start and end''' if isinstance(uri_string, str): if uri_string[:1] == "<" and uri_string[len(uri_string)-1:] == ">": uri_string = uri_string[1:len(uri_string)-1] return uri_string
[docs]def copy_obj(obj): ''' does a deepcopy of an object, but does not copy a class i.e. x = {"key":[<classInstance1>,<classInstance2>,<classInstance3>]} y = copy_obj(x) y --> {"key":[<classInstance1>,<classInstance2>,<classInstance3>]} del y['key'][0] y --> {"key":[<classInstance2>,<classInstance3>]} x --> {"key":[<classInstance1>,<classInstance2>,<classInstance3>]} *** this is to overcome a dictionary object that lists with classes as the list items. ''' if isinstance(obj, dict): return_obj = {} for key, value in obj.items(): if isinstance(value, dict): return_obj[key] = copy_obj(value) elif isinstance(value, list): return_obj[key] = copy_obj(value) else: return_obj[key] = value elif isinstance(obj, list): return_obj = [] for value in obj: if isinstance(value, dict): return_obj.append(copy_obj(value)) elif isinstance(value, list): return_obj.append(copy_obj(value)) else: return_obj.append(value) else: return_obj = copy.copy(obj) return return_obj
[docs]def get_attr(item, name, default=None): ''' similar to getattr and get but will test for class or dict ''' if isinstance(item, dict): return_val = item.get(name, default) else: if hasattr(item, name): return_val = getattr(item, name) else: return_val = default return return_val