__author__ = "Mike Stabile, Jeremy Nelson"
import os
import re
import copy
from base64 import b64encode
import datetime
import requests
from flask import current_app, json
from jinja2 import Template, Environment, FileSystemLoader
from rdflib import Namespace, XSD
from dateutil.parser import parse
DC = Namespace("http://purl.org/dc/elements/1.1/")
DCTERMS = Namespace("http://purl.org/dc/terms/")
DOAP = Namespace("http://usefulinc.com/ns/doap#")
FOAF = Namespace("http://xmlns.com/foaf/spec/")
SKOS = Namespace("http://www.w3.org/2004/02/skos/core#")
DEBUG = True
FRAMEWORK_BASE = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
if not os.path.exists(FRAMEWORK_BASE):
#! Quick hack to get running on Docker container -- jpn 2016-03-08
FRAMEWORK_BASE = "/opt/intro2libsys/ebadges/rdfframework/rdfframework"
JSON_LOCATION = os.path.join(FRAMEWORK_BASE, "json-definitions")
ENV = Environment(loader=FileSystemLoader(
[os.path.join(FRAMEWORK_BASE, "sparql"),
os.path.join(FRAMEWORK_BASE, "turtle")]))
[docs]def nz(value, none_value, strict=True):
''' This function is named after an old VBA function. It returns a default
value if the passed in value is None. If strict is False it will
treat an empty string as None as well.
example:
x = None
nz(x,"hello")
--> "hello"
nz(x,"")
--> ""
y = ""
nz(y,"hello")
--> ""
nz(y,"hello", False)
--> "hello" '''
if not DEBUG:
debug = False
else:
debug = False
if debug: print("START nz frameworkutilities.py ----------------------\n")
if value is None and strict:
return_val = none_value
elif strict and value is not None:
return_val = value
elif not strict and not is_not_null(value):
return_val = none_value
else:
return_val = value
if debug: print("value: %s | none_value: %s | return_val: %s" %
(value, none_value, return_val))
if debug: print("END nz frameworkutilities.py ----------------------\n")
return return_val
[docs]def render_without_request(template_name, **template_vars):
"""
Usage is the same as flask.render_template:
render_without_request('my_template.html', var1='foo', var2='bar')
"""
template = ENV.get_template(template_name)
return template.render(**template_vars)
[docs]def cbool(value, strict=True):
''' converts a value to true or false. Python's default bool() function
does not handle 'true' of 'false' strings '''
return_val = value
if is_not_null(value):
if isinstance(value, bool):
return_val = value
elif isinstance(value, str):
if value.lower() in ['true', '1', 't', 'y', 'yes']:
return_val = True
elif value.lower() in ['false', '0', 'n', 'no']:
return_val = False
else:
if strict:
return_val = None
else:
if strict:
return_val = None
return return_val
[docs]def iri(uri_string):
"converts a string to an IRI or returns an IRI if already formated"
if uri_string[:1] == "?":
return uri_string
if uri_string[:1] == "[":
return uri_string
if uri_string[:1] != "<":
uri_string = "<{}".format(uri_string.strip())
if uri_string[len(uri_string)-1:] != ">":
uri_string = "{}>".format(uri_string.strip())
return uri_string
[docs]def is_not_null(value):
''' test for None and empty string '''
return value is not None and len(str(value)) > 0
[docs]def is_valid_object(uri_string):
'''Test to see if the string is a object store'''
uri_string = uri_string
return True
[docs]def make_list(value):
''' Takes a value and turns it into a list if it is not one
!!!!! This is important becouse list(value) if perfomed on an
dictionary will return the keys of the dictionary in a list and not
the dictionay as an element in the list. i.e.
x = {"first":1, "second":2}
list(x) = ["first", "second"]
or use this [x,]
make_list(x) =[{"first":1, "second":2}]
'''
if not isinstance(value, list):
value = [value]
return value
[docs]def make_set(value):
''' Takes a value and turns it into a set
!!!! This is important because set(string) will parse a string to
individual characters vs. adding the string as an element of
the set i.e.
x = 'setvalue'
set(x) = {'t', 'a', 'e', 'v', 'u', 's', 'l'}
make_set(x) = {'setvalue'}
or use set([x,]) by adding string as first item in list.
'''
_return_set = set()
if isinstance(value, list):
for i in value:
_return_set.add(i)
elif isinstance(value, set):
_return_set = value
else:
_return_set.add(value)
return _return_set
[docs]def uid_to_repo_uri(id_value):
if id_value:
_uri = "{}/{}/{}/{}/{}/{}".format(fw_config().get('REPOSITORY_URL'),
id_value[:2],
id_value[2:4],
id_value[4:6],
id_value[6:8],
id_value)
return _uri
[docs]def fw_config(**kwargs):
''' function returns the application configuration information '''
global FRAMEWORK_CONFIG
try:
FRAMEWORK_CONFIG
except NameError:
FRAMEWORK_CONFIG = None
if FRAMEWORK_CONFIG is None:
if kwargs.get("config"):
config = kwargs.get("config")
else:
try:
config = current_app.config
except:
config = None
if not config is None:
FRAMEWORK_CONFIG = config
else:
print("framework not initialized")
return "framework not initialized"
return FRAMEWORK_CONFIG
[docs]def make_triple(sub, pred, obj):
"""Takes a subject predicate and object and joins them with a space
in between
Args:
sub -- Subject
pred -- Predicate
obj -- Object
Returns
str
"""
return "{s} {p} {o} .".format(s=sub, p=pred, o=obj)
[docs]def xsd_to_python(value, data_type, rdf_type="literal", output="python"):
''' This will take a value and xsd data_type and convert it to a python
variable'''
from rdfframework import get_framework as rdfw
if data_type:
data_type = data_type.replace(str(XSD), "")
if not value:
return value
elif rdf_type == "uri":
return iri(value)
elif not is_not_null(value):
return value
elif data_type == "xsd_anyURI":
# URI (Uniform Resource Identifier)
return value
elif data_type == "xsd_base64Binary":
# Binary content coded as "base64"
return value.decode()
elif data_type == "xsd_boolean":
# Boolean (true or false)
return cbool(value)
elif data_type == "xsd_byte":
# Signed value of 8 bits
return value.decode()
elif data_type == "xsd_date":
## Gregorian calendar date
_temp_value = parse(value)
if output == "string":
_date_format = rdfw().app['kds_dataFormats'].get(\
'kds_pythonDateFormat', '')
return _temp_value.strftime(_date_format)
elif output == "python":
return _temp_value
elif data_type == "xsd_dateTime":
## Instant of time (Gregorian calendar)
_temp_value = parse(value)
if output == "string":
_date_format = rdfw().app['kds_dataFormats'].get(\
'kds_pythonDateTimeFormat', '')
return _temp_value.strftime(_date_format)
elif output == "python":
return _temp_value
elif data_type == "xsd_decimal":
# Decimal numbers
return float(value)
elif data_type == "xsd_double":
# IEEE 64
return float(value)
elif data_type == "xsd_duration":
# Time durations
return timedelta(milleseconds=float(value))
elif data_type == "xsd_ENTITIES":
# Whitespace
return value
elif data_type == "xsd_ENTITY":
# Reference to an unparsed entity
return value
elif data_type == "xsd_float":
# IEEE 32
return float(value)
elif data_type == "xsd_gDay":
# Recurring period of time: monthly day
return value
elif data_type == "xsd_gMonth":
# Recurring period of time: yearly month
return value
elif data_type == "xsd_gMonthDay":
# Recurring period of time: yearly day
return value
elif data_type == "xsd_gYear":
# Period of one year
return value
elif data_type == "xsd_gYearMonth":
# Period of one month
return value
elif data_type == "xsd_hexBinary":
# Binary contents coded in hexadecimal
return value
elif data_type == "xsd_ID":
# Definition of unique identifiers
return value
elif data_type == "xsd_IDREF":
# Definition of references to unique identifiers
return value
elif data_type == "xsd_IDREFS":
# Definition of lists of references to unique identifiers
return value
elif data_type == "xsd_int":
# 32
return value
elif data_type == "xsd_integer":
# Signed integers of arbitrary length
return int(value)
elif data_type == "xsd_language":
# RFC 1766 language codes
return value
elif data_type == "xsd_long":
# 64
return int(value)
elif data_type == "xsd_Name":
# XML 1.O name
return value
elif data_type == "xsd_NCName":
# Unqualified names
return value
elif data_type == "xsd_negativeInteger":
# Strictly negative integers of arbitrary length
return abs(int(value))*-1
elif data_type == "xsd_NMTOKEN":
# XML 1.0 name token (NMTOKEN)
return value
elif data_type == "xsd_NMTOKENS":
# List of XML 1.0 name tokens (NMTOKEN)
return value
elif data_type == "xsd_nonNegativeInteger":
# Integers of arbitrary length positive or equal to zero
return abs(int(value))
elif data_type == "xsd_nonPositiveInteger":
# Integers of arbitrary length negative or equal to zero
return abs(int(value))*-1
elif data_type == "xsd_normalizedString":
# Whitespace
return value
elif data_type == "xsd_NOTATION":
# Emulation of the XML 1.0 feature
return value
elif data_type == "xsd_positiveInteger":
# Strictly positive integers of arbitrary length
return abs(int(value))
elif data_type == "xsd_QName":
# Namespaces in XML
return value
elif data_type == "xsd_short":
# 32
return value
elif data_type == "xsd_string":
# Any string
return value
elif data_type == "xsd_time":
# Point in time recurring each day
return parse(value)
elif data_type == "xsd_token":
# Whitespace
return value
elif data_type == "xsd_unsignedByte":
# Unsigned value of 8 bits
return value.decode()
elif data_type == "xsd_unsignedInt":
# Unsigned integer of 32 bits
return int(value)
elif data_type == "xsd_unsignedLong":
# Unsigned integer of 64 bits
return int(value)
elif data_type == "xsd_unsignedShort":
# Unsigned integer of 16 bits
return int(value)
else:
return value
[docs]def convert_spo_to_dict(data, mode="subject", option="string"):
'''Takes the SPAQRL query results and converts them to a python Dict
mode: subject --> groups based on subject
'''
if data is None:
return None
_return_obj = {}
_list_obj = False
if mode == "subject":
for item in data:
# determine data is list of objects
_sv = item['s']['value']
_pv = item['p']['value']
if item.get('itemID'):
_list_obj = True
_iv = item['itemID']['value']
if _return_obj.get(_iv):
if _return_obj[_iv].get(_sv):
if _return_obj[_iv][_sv].get(_pv):
_obj_list = make_list(\
_return_obj[_iv][_sv][_pv])
_obj_list.append(\
xsd_to_python(item['o']['value'], \
item['o'].get("datatype"), \
item['o']['type'],
option))
_return_obj[_iv][_sv][_pv] = _obj_list
else:
_return_obj[_iv][_sv][_pv] = \
xsd_to_python(item['o']['value'], item['o'].get(\
"datatype"), item['o']['type'], option)
else:
_return_obj[_iv][_sv] = {}
_return_obj[_iv][_sv][_pv] = \
xsd_to_python(item['o']['value'], item['o'].get(\
"datatype"), item['o']['type'], option)
else:
_return_obj[_iv] = {}
_return_obj[_iv][_sv] = {}
_return_obj[_iv][_sv][_pv] = \
xsd_to_python(item['o']['value'], item['o'].get(\
"datatype"), item['o']['type'], option)
# if not a list of objects
else:
if _return_obj.get(_sv):
if _return_obj[_sv].get(_pv):
_obj_list = make_list(\
_return_obj[_sv][_pv])
_obj_list.append(xsd_to_python(item['o']['value'], \
item['o'].get("datatype"), item['o']['type'], option))
_return_obj[_sv][_pv] = _obj_list
else:
_return_obj[_sv][_pv] = \
xsd_to_python(item['o']['value'], item['o'].get(\
"datatype"), item['o']['type'], option)
else:
_return_obj[_sv] = {}
_return_obj[_sv][_pv] = \
xsd_to_python(item['o']['value'], item['o'].get(\
"datatype"), item['o']['type'], option)
if _list_obj:
_return_list = []
for _key, _value in _return_obj.items():
_value[_key]["subjectUri"] = _key
_return_list.append(_value)
return _return_list
else:
return _return_obj
[docs]def remove_null(obj):
''' reads through a list or set and strips any null values'''
if isinstance(obj, set):
try:
obj.remove(None)
except:
pass
elif isinstance(obj, list):
for item in obj:
if not is_not_null(item):
obj.remove(item)
return obj
[docs]class DeleteProperty(object):
''' dummy class for tagging items to be deleted. This will prevent
passed in data ever being confused with marking a property for
deletion. '''
def __init__(self):
setattr(self, "delete", True)
[docs]def slugify(value):
"""Converts to lowercase, removes non-word characters (alphanumerics and
underscores) and converts spaces to hyphens. Also strips leading and
trailing whitespace using Django format
Args:
"""
value = re.sub(r'[^\w\s-]', '', value).strip().lower()
return re.sub(r'[-\s]+', '-', value)
[docs]def get_app_ns_uri(value):
''' looks in the framework for the namespace uri'''
for _ns in get_framework().rdf_app_dict['application'].get(\
"appNameSpace", []):
if _ns.get('prefix') == value:
return _ns.get('nameSpaceUri')
[docs]def clean_iri(uri_string):
'''removes the <> signs from a string start and end'''
if isinstance(uri_string, str):
if uri_string[:1] == "<" and uri_string[len(uri_string)-1:] == ">":
uri_string = uri_string[1:len(uri_string)-1]
return uri_string
[docs]def copy_obj(obj):
''' does a deepcopy of an object, but does not copy a class
i.e.
x = {"key":[<classInstance1>,<classInstance2>,<classInstance3>]}
y = copy_obj(x)
y --> {"key":[<classInstance1>,<classInstance2>,<classInstance3>]}
del y['key'][0]
y --> {"key":[<classInstance2>,<classInstance3>]}
x --> {"key":[<classInstance1>,<classInstance2>,<classInstance3>]}
*** this is to overcome a dictionary object that lists with classes
as the list items. '''
if isinstance(obj, dict):
return_obj = {}
for key, value in obj.items():
if isinstance(value, dict):
return_obj[key] = copy_obj(value)
elif isinstance(value, list):
return_obj[key] = copy_obj(value)
else:
return_obj[key] = value
elif isinstance(obj, list):
return_obj = []
for value in obj:
if isinstance(value, dict):
return_obj.append(copy_obj(value))
elif isinstance(value, list):
return_obj.append(copy_obj(value))
else:
return_obj.append(value)
else:
return_obj = copy.copy(obj)
return return_obj
[docs]def get_attr(item, name, default=None):
''' similar to getattr and get but will test for class or dict '''
if isinstance(item, dict):
return_val = item.get(name, default)
else:
if hasattr(item, name):
return_val = getattr(item, name)
else:
return_val = default
return return_val