# -*- coding: utf-8 -*-
import os
import csv
from datetime import datetime
from collections import Mapping, Sequence
from pkg_resources import resource_filename
from dwca.read import DwCAReader
from cerberus import SchemaError
from jinja2 import FileSystemLoader, Environment
from .validators import DwcaValidator, WhipErrorHandler
from .reporters import SpecificationErrorHandler
[docs]def whip_dwca(dwca_zip, specifications, maxentries=None):
"""Whip a Darwin Core Archive
Validate the core file of a `Darwin Core Archive`_ zipped data set,
using the :class:`~dwca.read.DwCAReader` reading and iterator capabilities.
.. _Darwin Core Archive: https://en.wikipedia.org/wiki/Darwin_Core_Archive
Parameters
----------
dwca_zip : str
Filename of the zipped Darwin Core Archive.
specifications : dict
Valid specifications whip dictionary schema.
maxentries : int
Define the limit of records to validate from the Archive, useful to
have a quick set on the frst subset of data.
Returns
-------
whip_it : pywhip.pywhi.Whip
Whip validator clasc instance, containing the errors and reporting
capabilities.
"""
# Extract data header - only core support
with DwCAReader(dwca_zip) as dwca:
field_names = [field['term'].split('/')[-1] for field in
dwca.core_file.file_descriptor.fields]
# Apply whip
whip_it = Whip(specifications)
whip_it._whip(whip_it.generate_dwca(dwca_zip),
field_names, maxentries)
return whip_it
[docs]def whip_csv(csv_file, specifications, delimiter, maxentries=None):
"""Whip a CSV-like file
Validate a CSV file, using the :class:`CSV <python3:csv.DictReader>`
reading and iterator capabilities of the Python standard library.
Parameters
----------
csv_file : str
Filename of the CSV file to whip validate.
specifications : dict
Valid specifications whip dictionary schema.
delimiter : str
A one-character string used to separate fields, e.g. ``','``.
maxentries : int
Define the limit of records to validate from the Archive, useful to
have a quick set on the frst subset of data.
Returns
-------
whip_it : pywhip.pywhi.Whip
Whip validator class instance, containing the errors and reporting
capabilities.
"""
# Extract data header
with open(csv_file, "r") as dwc:
reader = csv.DictReader(dwc, delimiter=delimiter)
field_names = reader.fieldnames
# Apply whip
whip_it = Whip(specifications)
whip_it._whip(whip_it.generate_csv(csv_file, delimiter),
field_names, maxentries)
return whip_it
[docs]class Whip(object):
"""Whip document validation class
Validates (multiple row) documents against a whip specification schema
using the high-level functions ``whip_...`` and creates a validation report
with the :meth:`~pywhip.pywhip.Whip.get_report` method.
Attributes
----------
sample_size : int
Number of value-examples to use in reporting
schema : dict
Whip specification schema, consisting of `field : constraint`
combinations
validation : pywhip.validators.DwcaValidator
A :class:`~pywhip.validators.DwcaValidator` class instance.
_report : dict
Base report container to collect document errors. Errors are
collected in the ['results']['specified_fields'] values, having
a :class:`~pywhip.reporters.SpecificationErrorHandler` for each
field-specification combination.
"""
def __init__(self, schema, sample_size=10):
"""
Parameters
----------
schema : dict
Whip specification schema, consisting of `field : constraint`
combinations.
sample_size : int
For each of the field-rules combinations, the (top) number of data
value samples/examples to include in the report.
"""
if not isinstance(schema, dict):
raise SchemaError("Input schema need to be dictionary")
self._schema = schema
self._sample_size = sample_size
# setup a DwcaValidator instance
self.validation = DwcaValidator(self.schema,
error_handler=WhipErrorHandler)
self._report = {'executed_at': None,
'errors': [],
'results': {
'total_rows': 0,
'passed_rows': 0,
'failed_rows': 0,
'passed_row_ids': [],
'warnings': [],
'unspecified_fields': None,
'unknown_fields': None,
'specified_fields': {}
}
}
self._total_row_count = 0
@property
def schema(self):
return self._schema
@property
def sample_size(self):
return self._sample_size
[docs] def get_report(self, format='json'):
"""Collect errors into reporting format (json/html)
Converts the logged errors into a json or html style report.
Parameters
----------
format : json | html
Define the output format the report is used.
Returns
-------
str
"""
if format == 'json':
return self._report
elif format == 'html':
return self.create_html()
@staticmethod
def format_if_rule(rule, number):
"""Support to build if-based-rules in report
Parameters
----------
rule : str
A whip specification rule.
number : int
The counter of the if-statements used in the whip specification,
counter starts at 1.
"""
return '{}_if_{}'.format(rule, number)
@staticmethod
def format_delimited_rule(rule):
"""Support to build delimitedvalues-based-rules in report
Parameters
----------
rule : str
A whip specification rule.
"""
return '{}_delimitedvalue'.format(rule)
@staticmethod
def clean_constraint(constraint):
"""Clean constraint for reports
Parameters
----------
constraint : str
The constraint as defined in the whip specification.
"""
if isinstance(constraint, list):
return ', '.join([str(el) for el in constraint])
else:
return str(constraint)
def _extract_schema_blueprint(self, schema):
"""Extract fields and rules from schema
For scopes if and delimitedvalues, the function need to extract the
inner-rules from the scope and combine it with the if/delimitedvalue
to ahve unique specification names. A delimitedvalues is providioned
as well for generl delimitedvalues errors
Parameters
----------
schema : dict
Whip specification schema.
"""
schema_layout = {}
for field, rules in schema.items():
schema_layout[field] = {}
for rule, conditions in rules.items():
if rule == 'if':
for j, condition in enumerate(conditions):
# TODO: exclude to general schema-def on init validator
if 'empty' not in condition.keys():
condition['empty'] = False
for subrule, constraint in condition.items():
if subrule in self.validation.rules:
schema_layout[field][self.format_if_rule(
subrule, j+1)] = \
SpecificationErrorHandler(
self.clean_constraint(constraint))
elif rule == 'delimitedvalues':
schema_layout[field][rule] = SpecificationErrorHandler("")
for subrule, constraint in conditions.items():
if subrule != 'delimiter':
schema_layout[field][self.format_delimited_rule(
subrule)] = SpecificationErrorHandler(
self.clean_constraint(constraint))
else:
schema_layout[field][rule] = \
SpecificationErrorHandler(
self.clean_constraint(conditions))
return schema_layout
def _conditional_fields(self, file_fields):
"""Extract the field names mentioned inside if conditions
When fields are mentioned inside if statements, but not present in the
data, this should raise a preliminar warning/message as part of the
reporting
Parameters
----------
file_fields : list | set
List of the field names present in the input data file.
"""
conditional_fields = []
for _, specs in self.schema.items():
if 'if' in specs.keys():
if_rules = specs['if']
# single if statement
if isinstance(if_rules, Mapping):
conditional_fields += [key for key in if_rules.keys() if isinstance(if_rules[key], dict)]
# multiple ifs combined
elif isinstance(if_rules, Sequence):
for rule in if_rules:
conditional_fields += ([key for key in rule.keys() if isinstance(rule[key], dict)])
else:
raise SchemaError
if not set(conditional_fields).issubset(set(file_fields)):
missing_if_fields = list(set(conditional_fields).difference(
set(file_fields)))
self._report['results']['warnings'].append(
"Following fields mentioned inside if specifications do not "
"exist inside the document: '{}'".format(
', '.join(missing_if_fields)))
def _compare_fields(self, file_fields):
"""Compare data fields and specifications
Compare the fields mentioned by the specifications schema and the
data set and update thereport attributes on unvalidated and missing
fields
Parameters
----------
file_fields : list | set
List of the field names present in the input data file
"""
try:
file_fields = set(file_fields)
except TypeError:
raise TypeError
self._report['results']['unspecified_fields'] = list(
file_fields.difference(set(self.schema.keys())))
self._report['results']['unknown_fields'] = list(set(
self.schema.keys()).difference(file_fields))
def _whip(self, input_generator, field_names, maxentries=None):
"""Validate whip specifications on the input
For each entry of the input generator (which can be limited using the
``maxentries`` parameter, the validation is recording the errors. At
the end, the :attr:`~pywhip.pywhip.Whip._report` attribute is updated
with the error logs and other relevant metadata.
Parameters
----------
input_generator : iterator
An iterator, yielding `field : value` combinations of the document
on each iteration.
field_names : list | set
List of the field names present in the input data file.
maxentries : int
Define the limit of records to validate from the Archive, useful to
have a quick set on the frst subset of data.
"""
# preliminar checks
self._compare_fields(field_names)
self._conditional_fields(field_names)
# prepare object to save errors
specified_fields = self._extract_schema_blueprint(self.schema)
passed_row_ids = []
# validate each row and log the errors for each row
for j, row in enumerate(input_generator):
row_id = j + 1
self.validation.validate(row) # apply specification rules
if len(self.validation.errors) > 0:
for error in self.validation._errors:
field = error.field
if error.is_group_error: # if/delimitedvalues
if error.rule == 'if':
for child_error in error.child_errors:
number = str(int(child_error.field.split(
'_')[-1]) + 1)
rule = self.format_if_rule(
child_error.rule, number)
message = self.validation.schema.validator.\
error_handler._format_message(field,
child_error)
specified_fields[field][rule][(
child_error.value, message)].add(row_id)
elif error.rule == 'delimitedvalues':
for child_error in error.child_errors:
rule = self.format_delimited_rule(child_error.rule)
message = self.validation.schema.validator.\
error_handler._format_message(field,
child_error)
specified_fields[field][rule][(
child_error.value, message)].add(row_id)
else:
NotImplementedError
else:
message = self.validation.schema.validator.\
error_handler._format_message(field, error)
specified_fields[field][error.rule][(error.value,
message)].add(row_id)
else:
passed_row_ids.append(row_id)
if maxentries:
if j >= maxentries-1:
break
# fill report TODO: exclude his from in-function adapts to attributes
self._total_row_count = j + 1
self._report['results']['total_rows'] = self._total_row_count
self._report['results']['passed_row_ids'] = passed_row_ids
self._report['results']['passed_rows'] = len(passed_row_ids)
self._report['results']['failed_rows'] = self._total_row_count - len(passed_row_ids)
self._report['executed_at'] = datetime.now().strftime("%Y-%m-%d %H:%M")
self._report['results']['specified_fields'] = \
self._report_specified_fields(specified_fields,
self._total_row_count,
self.sample_size)
self._isitgreat()
# TODO: add generator function and dict-searches to query errors
def _isitgreat(self):
"""check if there are any errors recorded"""
if self._report['results']['failed_rows'] == 0:
print("Hooray, your dataset complies with the specifications!")
else:
print('Your dataset does not comply with the specifications, '
'use get_report() for more detailed information.')
@staticmethod
def generate_dwca(dwca_zip):
"""Darwin core archive generator
Yields `field : value` combinations of the document on each iteration,
corresponding to individual rows of the data file.
Parameters
----------
dwca_zip : str
Filename of the zipped Darwin Core Archive.
Yields
------
document : dict
Provides a single line document values (as dict values) and
field names (as dict keys).
"""
with DwCAReader(dwca_zip) as dwca:
for row in dwca:
document = {k.split('/')[-1]: v for k, v in row.data.items()}
yield document
@staticmethod
def generate_csv(csv_file, delimiter):
"""CSV File generator
Yields `field : value` combinations of the document on each iteration,
corresponding to individual rows of the data file.
Parameters
----------
csv_file : str
Filename of the CSV file to whip validate.
delimiter : str
A one-character string used to separate fields, e.g. ``','``.
Yields
------
document : dict
Provides a single line document values (as dict values) and
field names (as dict keys).
"""
with open(csv_file, "r") as dwc:
reader = csv.DictReader(dwc, delimiter=delimiter)
for document in reader:
yield document
@staticmethod
def _report_specified_fields(specified_fields, nrows, nsample):
"""Transform the data objects to report objects
Parameters
----------
specified_fields : dict
Dictionary with a `~pywhip.reporters.SpecificationErrorHandler`
object for each field-specification combination.
nrows : int
Total rows of the current document working with, used to calculate
passed rows as well
nsample : int
Number of samples (ordered on the number of rows) to retain for
reporting purposes
"""
for field, rules in specified_fields.items():
for rule, error_report in rules.items():
specified_fields[field][rule] = \
specified_fields[field][rule].build_error_report(nrows,
nsample)
return specified_fields
[docs] def create_html(self):
"""Build html using template
Returns
-------
str
"""
path = "./static/template.html"
html_template_path = resource_filename(__name__, path)
env = Environment(loader=FileSystemLoader(
os.path.dirname(html_template_path)))
template = env.get_template(os.path.basename(html_template_path))
html = template.render(report=self._report)
return str(html)