Source code for pywhip.validators

# -*- coding: utf-8 -*-

import re
from copy import copy
from datetime import datetime, date
from dateutil.parser import parse
from collections import Mapping, Sequence

import json
# https://pypi.python.org/pypi/rfc3987 regex on URI's en IRI's
from rfc3987 import match

from cerberus import Validator
from cerberus import errors
from cerberus.errors import ErrorDefinition, BasicErrorHandler
from cerberus.platform import _str_type

"""
For each pywhip custom rule, a :class:`~cerberus.errors.ErrorDefinition`
instance is created to link specifications with unique identifiers.
"""
DELIMITER_SCHEMA = ErrorDefinition(0x85, 'delimitedvalues')
IF_SCHEMA = ErrorDefinition(0x86, 'if')

DELIMITER_DOUBLE = ErrorDefinition(0x107, 'delimitedvalues')
DELIMITER_SPACE = ErrorDefinition(0x108, 'delimitedvalues')

MIN_NON_NUMERIC = ErrorDefinition(0x7, 'min')
MAX_NON_NUMERIC = ErrorDefinition(0x8, 'max')
MINDATE_VALUE = ErrorDefinition(0xA, 'mindate')
MAXDATE_VALUE = ErrorDefinition(0xB, 'maxdate')
MINDATE_NOT_PARSED = ErrorDefinition(0xC, 'mindate')
MAXDATE_NOT_PARSED = ErrorDefinition(0xD, 'maxdate')
DATEFORMAT = ErrorDefinition(0xE, 'dateformat')

NUMBERFORMAT_NON_NUM = ErrorDefinition(0x101, 'numberformat')
NUMBERFORMAT_NON_FLOAT = ErrorDefinition(0x102, 'numberformat')
NUMBERFORMAT_NON_INT = ErrorDefinition(0x103, 'numberformat')
NUMBERFORMAT_VALUE = ErrorDefinition(0x104, 'numberformat')
STRINGFORMAT_JSON = ErrorDefinition(0x105, 'stringformat')
STRINGFORMAT_URL = ErrorDefinition(0x106, 'stringformat')


[docs]class WhipErrorHandler(BasicErrorHandler): """Class to store custom error message handling The WhipErrorHandler updates the :class:`~cerberus.errors.BasicErrorHandler` with custom messages for pywhip specific specifications. Each of the messages updates the message of a specification error, using the unique code attributed in the :class:`~cerberus.errors.ErrorDefinition` setup. The message is a descriptive message about the error and can optionally use the following variables: * value This refers to the individual data value of the document, use ``{value}`` * constraint This refers to the constraint provided by the whip specification right hand side of the colon, use ``{constraint}`` """ messages = BasicErrorHandler.messages.copy() messages[MIN_NON_NUMERIC.code] = "value '{value}' is not numeric" messages[MAX_NON_NUMERIC.code] = "value '{value}' is not numeric" messages[MINDATE_VALUE.code] = "date '{value}' is before min " \ "limit '{constraint}'" messages[MAXDATE_VALUE.code] = "date '{value}' is after max " \ "limit '{constraint}'" messages[MINDATE_NOT_PARSED.code] = "value '{value}' could not be " \ "interpreted as date or datetime" messages[MAXDATE_NOT_PARSED.code] = "value '{value}' could not be " \ "interpreted as date or datetime" messages[DATEFORMAT.code] = "string format of value '{value}' not " \ "compliant with '{constraint}'" messages[NUMBERFORMAT_NON_NUM.code] = "value '{value}' is not numerical" messages[NUMBERFORMAT_NON_FLOAT.code] = "value '{value}' is not a float" messages[NUMBERFORMAT_NON_INT.code] = "value '{value}' is not an integer" messages[NUMBERFORMAT_VALUE.code] = "numberformat of value '{value}' " \ "not in agreement with '{constraint}'" messages[STRINGFORMAT_JSON.code] = "not a valid json format" messages[STRINGFORMAT_URL.code] = "not a valid url" messages[DELIMITER_DOUBLE.code] = "duplicate values in delimitedvalues" messages[DELIMITER_SPACE.code] = "contains empty string inside " \ "delimitedvalues" def __iter__(self): raise NotImplementedError
[docs]class DwcaValidator(Validator): """Validates any mapping against specifications defined in a validation-schema In the context of pywhip, a mapping is generally a single line of data, with the keys the fields (data headers) and the values the data values for that particular line. Notes ------ This class subclasses :class:`~cerberus.Validator` and adds pywhip specific ``_validate_<specification>`` methods. The whip specifications are a combination of cerberus native specifications and pywhip custom ones: * directly available by cerberus minlength, maxlength, regex * cerberus specifications overwritten by pywhip allowed, empty, min, max * pywhip specific specification functions numberformat, dateformat, mindate, maxdate, stringformat * pywhip specific specification environments: delimitedValues, if Each ``_validate_<specification>`` assumes the following input arguments: * constraint: The constraint provided in the whip specification, i.e. the right hand side of the colon in the whip specifications. In the implementation, the input parameter can be names differently to clarify the role of the constraint in the validation function. * field: The name of the field, i.e. the left hand side of the colon in the whip specifications which corresponds to the field header name in the data. * value: A single data value for which the whip specification needs to be tested using the provided constraint. To validate the schema input itself, cerberus validation rules can be added to the docstring TODO ADDLINK """ def __init__(self, *args, **kwargs): """Extends the handling of Cerberus :class:`~cerberus.Validator` The following alterations are done: * Allow_unkown is default set on True * Initaition requires a schema * By default, all fields without ``empty`` specification get an ``empty: False`` specification. As such, empy strings are not allowed by default, according to whip specifications. Parameters ---------- allow_unknown : boolean If False, only terms with specifications are allowed as input. As unknown fields are reported by pywhip after validation, the default value is False. """ super(DwcaValidator, self).__init__(*args, **kwargs) if 'allow_unknown' in kwargs: self.allow_unknown = kwargs['allow_unknown'] else: self.allow_unknown = True if not self.schema: raise Exception('provide a schema to initiate Validator') # Extend schema with empty: False by default self.schema = self._schema_add_empty(self.schema) @staticmethod def _schema_add_empty(dict_schema): """Add `empty: False`` specification for all fields without ``empty`` specification Parameters ---------- dict_schema : dict Schema of ``field: specification`` items, for which each specification is a dict itself. """ for term, rules in dict_schema.items(): if 'empty' not in rules.keys(): rules['empty'] = False return dict_schema def _validate_empty(self, empty, field, value): """ {'type': 'boolean'} """ # Dropping all remaining rules except of if (instead of subselection) # when empty = True from collections import Sized if isinstance(value, Sized) and len(value) == 0: # ALL rules, except of if self._drop_remaining_rules( 'allowed', 'forbidden', 'items', 'minlength', 'maxlength', 'regex', 'check_with', 'stringformat', 'min', 'max', 'numberformat', 'mindate', 'maxdate', 'dateformat', 'delimitedvalues' ) if not empty: self._error(field, errors.EMPTY_NOT_ALLOWED) def _validate_allowed(self, allowed_values, field, value): """ {'type': ['list', 'string']} """ # support single string values as well (cerberus only supports lists) if isinstance(allowed_values, _str_type): allowed_values = [allowed_values] super(DwcaValidator, self)._validate_allowed(allowed_values, field, value) def _validate_min(self, min_value, field, value): """ {'nullable': False} """ try: if float(min_value) > float(value): self._error(field, errors.MIN_VALUE) except ValueError: self._error(field, MIN_NON_NUMERIC) def _validate_max(self, max_value, field, value): """ {'nullable': False} """ try: if float(max_value) < float(value): self._error(field, errors.MAX_VALUE) except ValueError: self._error(field, MAX_NON_NUMERIC) def _parse_date(self, date_string): """Try to parse a string to a Python :class:`~python3.datetime.dateime` datetime. Parameters ---------- date_string : str Returns ------- datetime | None If parsing fails, return None, otherwise parsed :class:`~python3.datetime.dateime` """ try: event_date = parse(date_string) return event_date except ValueError: return None @staticmethod def _dateisrange(value): """Test if the given string representing date is a range Parameters ---------- value : str date, e.g. 2018-01-01 (no range) or 2010-01-01/2018-05-01 Returns ------- value : boolean True if daterange is given, otherwise False """ if len(re.findall('([0-9])/([0-9])', value)) > 1: NotImplemented elif len(re.findall('([0-9])/([0-9])', value)) == 1: return True else: return False @staticmethod def _dateformatisrange(value): """Test if the given dateformat representing is a range Parameters ---------- value : str dateformat, e.g. %Y-%m-%d (no range) or %Y-%m-%d/%Y-%m-%d (range) Returns ------- value : boolean True if daterange is given, otherwise False """ datesymbols = re.sub('[^a-zA-Z]', '', value) return len(set(datesymbols)) != len(datesymbols) def _validate_mindate(self, min_date, field, value): """ {'type': ['date', 'datetime']} """ # TODO: # the yaml-reader prepares a datetime.date objects when possible, # the dwca-reader is not doing this, so compatibility need to be better # ensured if self._dateisrange(value): [self._validate_mindate(min_date, field, valdate) for valdate in value.split("/")] else: # convert schema info to datetime to enable comparison if isinstance(min_date, date): min_date = datetime.combine(min_date, datetime.min.time()) # try to parse the datetime-format event_date = self._parse_date(value) if event_date: if event_date < min_date: self._error(field, MINDATE_VALUE) else: self._error(field, MINDATE_NOT_PARSED) def _validate_maxdate(self, max_date, field, value): """ {'type': ['date', 'datetime']} """ # TODO: # the yaml-reader prepares a datetime.date objects when possible, # the dwca-reader is not doing this, so compatibility need to be better # ensured if self._dateisrange(value): for valdate in value.split("/"): self._validate_maxdate(max_date, field, valdate) else: # convert schema info to datetime to enable comparison if isinstance(max_date, date): max_date = datetime.combine(max_date, datetime.min.time()) # try to parse the datetime-format event_date = self._parse_date(value) if event_date: if event_date > max_date: self._error(field, MAXDATE_VALUE) else: self._error(field, MAXDATE_NOT_PARSED) def _help_dateformat(self, formatstr, value): """Test if a date is according to a given dateformat Parameters ---------- formatstr : str dateformat string, e.g. %Y-%m-%d or %Y-%m-%d/%Y-%m-%d value : str date str representation, e.g. 2018-01-01 or 2015-01-01/2018-01-01 Returns ------- boolean when True, the date string is accoring to the format """ if self._dateformatisrange(formatstr): if self._dateisrange(value): # both ranges-> test range_test = [self._help_dateformat(dt_format, dt) for dt_format, dt in zip(formatstr.split('/'), value.split('/'))] # both must be valid interpretable dates return sum(range_test) == 2 else: return False else: try: datetime.strptime(value, formatstr) tester = True except ValueError: tester = False pass return tester def _validate_dateformat(self, ref_value, field, value): """ {'type': ['string', 'list']} """ # dateformat : ['%Y-%m-%d', '%Y-%m', '%Y'] # dateformat : '%Y-%m' tester = False if isinstance(ref_value, list): for formatstr in ref_value: # check if at least one comply current_test = self._help_dateformat(formatstr, value) if current_test: tester = True else: tester = self._help_dateformat(ref_value, value) if not tester: self._error(field, DATEFORMAT) def _validate_numberformat(self, formatter, field, value): r""" {'type': ['string'], 'regex': r'^[1-9]\.[1-9]$|^[1-9]\.$|^\.[1-9]$|^[1-9]$|^\.$|^x$'} """ # ignore - sign to handle negative numbers value_str = re.sub("^-", "", value) # check if value is number format if not re.match(r'^[0-9]*\.[0-9]*$|^[0-9]+$', value_str): self._error(field, NUMBERFORMAT_NON_NUM) elif re.match(r'^x$', formatter): if not re.match(r'^[-+]?\d+$', value_str): self._error(field, NUMBERFORMAT_NON_INT) else: if re.match(r"[1-9]\.[1-9]", formatter): value_parsed = [len(side) for side in value_str.split(".")] elif re.match(r"\.[1-9]", formatter): if "." in value_str: value_parsed = [len(value_str.split(".")[1])] else: value_parsed = [0] elif re.match(r"[1-9]\.", formatter): value_parsed = [len(value_str.split(".")[0])] elif re.match(r"[1-9]", formatter): if re.match(r"[0-9]+", value_str): value_parsed = [len(value_str)] else: value_parsed = [None] self._error(field, NUMBERFORMAT_NON_INT) elif re.match(r"^\.$", formatter): if "." in value_str: value_parsed = [] else: value_parsed = [None] self._error(field, NUMBERFORMAT_NON_FLOAT) formatter_parsed = [int(length) for length in formatter.split(".") if not length == ''] if formatter_parsed != value_parsed and value_parsed != [None]: self._error(field, NUMBERFORMAT_VALUE) def _validate_if(self, ifset, field, value): """ {'type': ['dict', 'list']} """ if isinstance(ifset, Mapping): # extract dict values -> conditions conditions = {k: v for k, v in ifset.items() if isinstance(v, dict)} # extract dict values -> rules rules = {k: v for k, v in ifset.items() if not isinstance(v, dict)} tempvalidator = DwcaValidator(conditions) tempvalidator.allow_unknown = True if tempvalidator.validate(copy(self.document), normalize=True): validator = self._get_child_validator( document_crumb=(field, 'if'), schema_crumb=(field, 'if'), schema={field: rules}, allow_unknown=True) validator.validate(copy(self.document), normalize=False) if validator._errors: self._drop_nodes_from_errorpaths(validator._errors, [2], [2]) self._error(field, IF_SCHEMA, validator._errors) elif isinstance(ifset, Sequence) and not isinstance(ifset, _str_type): for i, ifsubschema in enumerate(ifset): # extract dict values -> conditions conditions = {k: v for k, v in ifsubschema.items() if isinstance(v, dict)} # extract dict values -> rules rules = {k: v for k, v in ifsubschema.items() if not isinstance(v, dict)} tempvalidator = DwcaValidator(conditions) tempvalidator.allow_unknown = True # when the conditional field is not existing in the document, # ignore the if-statement if not set(conditions.keys()).issubset( set(self.document.keys())): return True if tempvalidator.validate(copy(self.document), normalize=True): validator = self._get_child_validator( document_crumb=(field, ''.join(['if_', str(i)])), schema_crumb=(field, 'if'), schema={field: rules}, allow_unknown=True) validator.validate(copy(self.document), normalize=False) if validator._errors: self._drop_nodes_from_errorpaths(validator._errors, [2], [2]) self._error(field, IF_SCHEMA, validator._errors) def _validate_delimitedvalues(self, ruleset_schema, field, value): """ {'type' : 'dict'} """ # loosely constructed such as the __validate_schema_sequence ruleset = copy(ruleset_schema) # convert field string to list of values if 'delimiter' not in ruleset.keys(): raise ValueError('Define delimiter as rule in delimitedvalues') value = [el for el in value.split(ruleset['delimiter'])] # check for empty string (edge case where we do not want 'male | ') if '' in value: self._error(field, DELIMITER_SPACE) return True # check for doubles ('male | female | male' needs error) if len(value) != len(set(value)): self._error(field, DELIMITER_DOUBLE) return True # reorganise schema to be used in child_validator ruleset.pop('delimiter') schema = dict(((i, ruleset) for i in range(len(value)))) validator = self._get_child_validator( document_crumb=field, schema_crumb=(field, 'delimitedvalues'), schema=schema, allow_unknown=True) document = dict(((i, v) for i, v in enumerate(value))) # provide support for if-statements -> add field from root document if 'if' in ruleset.keys(): term = [key for key in ruleset['if'].keys() if isinstance(ruleset['if'][key], dict)] if len(term) > 1: # multiple if statements not supported NotImplementedError else: term = term[0] document[term] = validator.root_document[term] validator.validate(document, normalize=True) if validator._errors: self._drop_nodes_from_errorpaths(validator._errors, [], [2]) self._error(field, DELIMITER_SCHEMA, validator._errors) def _validate_stringformat(self, stringtype, field, value): """ {'allowed': ['url', 'json']} """ if stringtype == 'json': try: json.loads(value) return True except ValueError: self._error(field, STRINGFORMAT_JSON) elif stringtype == 'url': if match(value, rule='URI'): return True else: self._error(field, STRINGFORMAT_URL)