import ast
import csv
import gzip
import json
import logging
import re
from enum import Enum
from importlib.resources import files
import jsonschema
import tqdm
from mpralib.utils.io import is_compressed_file
[docs]
class ValidationSchema(Enum):
REPORTER_SEQUENCE_DESIGN = "reporter_sequence_design"
REPORTER_BARCODE_TO_ELEMENT_MAPPING = "reporter_barcode_to_element_mapping"
REPORTER_EXPERIMENT_BARCODE = "reporter_experiment_barcode"
REPORTER_EXPERIMENT = "reporter_experiment"
REPORTER_ELEMENT = "reporter_element"
REPORTER_VARIANT = "reporter_variant"
REPORTER_GENOMIC_ELEMENT = "reporter_genomic_element"
REPORTER_GENOMIC_VARIANT = "reporter_genomic_variant"
[docs]
class SchemaToFileNameMap:
def __init__(self):
self._data = {}
[docs]
def set(self, key: ValidationSchema, file_name: str):
if not isinstance(key, ValidationSchema):
raise KeyError(f"Key must be a FileKey enum value. Got {key}")
if not isinstance(file_name, str):
raise ValueError("File path must be a string")
self._data[key] = file_name
[docs]
def get(self, key: ValidationSchema):
return self._data.get(key, None)
[docs]
def as_dict(self):
return {k.value: v for k, v in self._data.items()}
schemaFilemap = SchemaToFileNameMap()
schemaFilemap.set(ValidationSchema.REPORTER_SEQUENCE_DESIGN, "reporter_sequence_design.json")
schemaFilemap.set(
ValidationSchema.REPORTER_BARCODE_TO_ELEMENT_MAPPING,
"reporter_barcode_to_element_mapping.json",
)
schemaFilemap.set(ValidationSchema.REPORTER_EXPERIMENT_BARCODE, "reporter_experiment_barcode.json")
schemaFilemap.set(ValidationSchema.REPORTER_EXPERIMENT, "reporter_experiment.json")
schemaFilemap.set(ValidationSchema.REPORTER_ELEMENT, "reporter_element.json")
schemaFilemap.set(ValidationSchema.REPORTER_VARIANT, "reporter_variant.json")
schemaFilemap.set(ValidationSchema.REPORTER_GENOMIC_ELEMENT, "reporter_genomic_element.json")
schemaFilemap.set(ValidationSchema.REPORTER_GENOMIC_VARIANT, "reporter_genomic_variant.json")
def _convert_row_value(value: str, prop_schema: dict):
try:
if prop_schema.get("type") == "integer":
converted_value = int(value)
elif prop_schema.get("type") == "number":
converted_value = float(value)
elif prop_schema.get("type") == "array":
converted_value = ast.literal_eval(value)
else:
converted_value = value
except ValueError:
converted_value = value # Let validation catch the error
return converted_value
[docs]
def validate_tsv_with_schema(tsv_file_path: str, schema_type: ValidationSchema) -> bool:
"""Validates a TSV file against a specified JSON schema.
This function reads a TSV file (optionally gzipped), converts each row to a dictionary,
and validates each row against the provided JSON schema. If any row fails validation,
a warning is logged. If an unexpected error occurs during validation, it is logged and raised.
Args:
tsv_file_path (str): Path to the TSV file to validate. The file may be gzipped.
schema_type (ValidationSchema): The type of schema to validate against.
Returns:
True if all rows are valid according to the schema, False otherwise.
Raises:
Exception: If an unexpected error occurs during validation.
Logs:
- Warnings for each row that fails schema validation.
- Errors for unexpected exceptions during validation.
- Info if the file is valid according to the schema.
- Warning if the file is not valid according to the schema.
"""
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.WARNING)
schema = _load_schema(schema_type)
header = _get_header_for_schema(schema_type)
open_func = gzip.open if is_compressed_file(tsv_file_path) else open
correct_file = True
with open_func(tsv_file_path, "rt", encoding="utf-8") as tsvfile:
reader = csv.DictReader(tsvfile, delimiter="\t", fieldnames=header)
i = 0
for i, row in enumerate(tqdm.tqdm(reader, desc="Validating rows", unit="row"), start=1):
_convert_row_types(row, schema)
try:
jsonschema.validate(instance=row, schema=schema)
except jsonschema.ValidationError as e:
LOGGER.warning(f"Row {i} invalid: {e.message}")
correct_file = False
except Exception as e:
LOGGER.error(f"Row {i} error: {e}")
correct_file = False
raise e
if i == 0:
LOGGER.warning("The file is empty.")
correct_file = False
if correct_file:
LOGGER.info(f"File {tsv_file_path} is valid according to schema {schema_type.value}.")
else:
LOGGER.warning(f"File {tsv_file_path} is not valid according to schema {schema_type.value}.")
return correct_file
def _load_schema(schema_type: ValidationSchema):
schema_filename = schemaFilemap.get(schema_type)
if schema_filename is None:
raise ValueError(f"No schema file mapped for schema type: {schema_type}")
schema_path = files("mpralib.schemas").joinpath(schema_filename)
with schema_path.open("r", encoding="utf-8") as f:
return json.load(f)
def _get_header_for_schema(schema_type: ValidationSchema) -> list | None:
if schema_type == ValidationSchema.REPORTER_BARCODE_TO_ELEMENT_MAPPING:
return ["barcode", "oligoName"]
elif schema_type == ValidationSchema.REPORTER_GENOMIC_ELEMENT:
return [
"chrom",
"chromStart",
"chromEnd",
"name",
"score",
"strand",
"log2FoldChange",
"inputCount",
"outputCount",
"minusLog10PValue",
"minusLog10QValue",
]
elif schema_type == ValidationSchema.REPORTER_GENOMIC_VARIANT:
return [
"chrom",
"chromStart",
"chromEnd",
"name",
"score",
"strand",
"log2FoldChange",
"inputCountRef",
"outputCountRef",
"inputCountAlt",
"outputCountAlt",
"minusLog10PValue",
"minusLog10QValue",
"postProbEffect",
"CI_lower_95",
"CI_upper_95",
"variantPos",
"refAllele",
"altAllele",
]
return None
def _convert_row_types(row: dict, schema: dict) -> None:
# Handle patternProperties
for prop_pattern_string, prop_schema in schema.get("patternProperties", {}).items():
prop_pattern = re.compile(prop_pattern_string)
for prop in [p for p in row if prop_pattern.match(p)]:
if row[prop] != "":
if "anyOf" in prop_schema:
for anyOfProp_schema in prop_schema["anyOf"]:
row[prop] = _convert_row_value(row[prop], anyOfProp_schema)
else:
row[prop] = _convert_row_value(row[prop], prop_schema)
# Handle properties
for prop, prop_schema in schema.get("properties", {}).items():
if prop in row and row[prop] != "":
if "anyOf" in prop_schema:
for anyOfProp_schema in prop_schema["anyOf"]:
row[prop] = _convert_row_value(row[prop], anyOfProp_schema)
else:
row[prop] = _convert_row_value(row[prop], prop_schema)