import ast
from importlib.resources import files
import numpy as np
import pandas as pd
from mpralib.exception import MPRAlibException, SequenceDesignException
from mpralib.mpradata import MPRABarcodeData, MPRAData, MPRAOligoData
[docs]
def chromosome_map() -> pd.DataFrame:
with files("mpralib.data").joinpath("hg19.chromAlias.txt").open() as chromAlias_hg19:
df = pd.read_csv(chromAlias_hg19, sep="\t", header=None, comment="#", dtype="category")
df["release"] = "GRCh37"
with files("mpralib.data").joinpath("hg38.chromAlias.txt").open() as chromAlias_hg38:
df_38 = pd.read_csv(chromAlias_hg38, sep="\t", header=None, comment="#", dtype="category")
df_38["release"] = "GRCh38"
df = pd.concat([df, df_38], ignore_index=True)
df.columns = ["ucsc", "assembly", "genbank", "refseq", "release"]
return df
[docs]
def is_compressed_file(filepath: str) -> bool:
"""Check if a file is compressed (gzip or bgz).
Args:
filepath (str): Path to the file to check.
Returns:
True if the file is compressed, False otherwise.
"""
return is_gzip_file(filepath) or is_bgzf(filepath)
[docs]
def is_gzip_file(filepath: str) -> bool:
"""Check if a file is a gzip-compressed file based on its magic number.
Args:
filepath (str or Path): Path to the file to check.
Returns:
True if the file is gzip-compressed, False otherwise.
"""
with open(filepath, "rb") as f:
magic = f.read(2)
return magic == b"\x1f\x8b"
[docs]
def is_bgzf(filepath: str) -> bool:
"""Check if a file is in BGZF (Blocked GNU Zip Format) format.
BGZF is a variant of the standard gzip format with extra fields that allow for random access.
This function reads the file header and checks for the BGZF-specific magic numbers and flags.
Args:
filepath (str): Path to the file to be checked.
Returns:
True if the file is in BGZF format, False otherwise.
"""
with open(filepath, "rb") as f:
header = f.read(18)
return (
len(header) >= 18
and header[0:2] == b"\x1f\x8b" # gzip magic number
and header[3] == 4 # FLG.FEXTRA set
and header[12:14] == b"BC" # BGZF extra subfield
)
[docs]
def export_activity_file(mpradata: MPRAOligoData, output_file_path: str) -> None:
"""Export activity data from an MPRAdata object to a tab-separated values (TSV) file.
The function processes the grouped data from the MPRAdata object, extracts relevant information for each replicate, and writes the data to a TSV file. The output file contains columns for replicate, oligo name, DNA counts, RNA counts, normalized DNA counts, normalized RNA counts, log2 fold change, and the number of barcodes. Barcode filters, count sampling and barcode thresholds are applied.
Args:
mpradata (MPRAdata): An object containing MPRA (Massively Parallel Reporter Assay) data.
output_file_path (str): The file path where the output TSV file will be saved.
""" # noqa: E501
output = pd.DataFrame()
# has to be run to calculate activities
mpradata.activity
for replicate in mpradata.obs_names:
replicate_data = mpradata.data[replicate, :]
replicate_data = replicate_data[
:,
np.asarray(replicate_data.layers["barcode_counts"]) >= np.asarray(mpradata.barcode_threshold),
]
df = {
"replicate": np.repeat(replicate, replicate_data.var_names.size),
"oligo_name": replicate_data.var["oligo"],
"dna_counts": np.asarray(replicate_data.layers["dna"])[0, :],
"rna_counts": np.asarray(replicate_data.layers["rna"])[0, :],
"dna_normalized": np.round(np.asarray(replicate_data.layers["dna_normalized"])[0, :], 4),
"rna_normalized": np.round(np.asarray(replicate_data.layers["rna_normalized"])[0, :], 4),
"log2FoldChange": np.round(np.asarray(replicate_data.layers["activity"])[0, :], 4),
"n_bc": np.asarray(replicate_data.layers["barcode_counts"])[0, :],
}
output = pd.concat([output, pd.DataFrame(df)], axis=0)
output.to_csv(output_file_path, sep="\t", index=False)
[docs]
def export_barcode_file(mpradata: MPRABarcodeData, output_file_path: str) -> None:
"""Export barcode count data to a file.
This function takes an MPRAdata object and exports its barcode count data to a specified file path in tab-separated values (TSV) format. The output file will contain columns for barcodes, oligo names, and DNA/RNA counts for each replicate. Modifides counts (barcode filter/sampling) if applicable will be written.
Args:
mpradata (MPRAdata): An object containing MPRA data, including barcodes, oligos, DNA counts, RNA counts, and replicates.
output_file_path (str): The file path where the output TSV file will be saved.
""" # noqa: E501
output = pd.DataFrame({"barcode": mpradata.var_names, "oligo_name": mpradata.oligos})
dna_counts = mpradata.dna_counts
rna_counts = mpradata.rna_counts
for i, replicate in enumerate(mpradata.obs_names):
output[f"dna_count_{replicate}"] = dna_counts[i]
output[f"rna_count_{replicate}"] = rna_counts[i]
output.replace(0, "", inplace=True)
output.to_csv(output_file_path, sep="\t", index=False)
[docs]
def export_counts_file(
mpradata: MPRAData,
output_file_path: str,
normalized: bool = False,
filter: np.ndarray | None = None,
) -> None:
if isinstance(mpradata, MPRABarcodeData):
df = {"ID": mpradata.var_names}
elif isinstance(mpradata, MPRAOligoData):
df = {"ID": mpradata.oligos}
else:
raise MPRAlibException(f"Invalid MPRA data type {type(mpradata)}. Expected MPRAOligoData or MPRABarcodeData.")
if normalized:
dna_counts = mpradata.normalized_dna_counts.copy()
rna_counts = mpradata.normalized_rna_counts.copy()
else:
dna_counts = mpradata.dna_counts.copy()
rna_counts = mpradata.rna_counts.copy()
not_observed_mask = ~mpradata.observed
lower_bc_mask = mpradata.barcode_counts < mpradata.barcode_threshold
mask = [not_observed_mask, lower_bc_mask]
if filter is not None:
mask.append(filter)
dna_counts = np.ma.masked_array(dna_counts, mask=np.any(mask, axis=0))
rna_counts = np.ma.masked_array(rna_counts, mask=np.any(mask, axis=0))
for idx, replicate in enumerate(mpradata.obs_names):
df["dna_count_" + replicate] = dna_counts[idx, :]
df["rna_count_" + replicate] = rna_counts[idx, :]
df = pd.DataFrame(df).set_index("ID")
df = df.map(lambda x: np.nan if isinstance(x, np.ma.core.MaskedConstant) else x)
nan_columns = df.isna().all(axis=1)
if isinstance(mpradata, MPRABarcodeData):
df.insert(0, "oligo_name", mpradata.oligos)
# remove IDs which are all zero
df = df[~nan_columns]
if normalized:
df.to_csv(output_file_path, sep="\t", index=True, na_rep="", float_format="%.6f")
else:
df.to_csv(output_file_path, sep="\t", index=True, na_rep="", float_format="%.0f")
[docs]
def read_sequence_design_file(file_path: str) -> pd.DataFrame:
"""Read sequence design from a tab-separated values (TSV) file.
This function reads metadata from a TSV file and returns it as a pandas DataFrame. The metadata file should contain columns for sample ID, replicate, and any additional metadata. The sample ID should correspond to the oligo name in the MPRA data object.
Args:
file_path (str): The file path of the metadata TSV file.
Returns:
A DataFrame containing the metadata.
""" # noqa: E501
df = pd.read_csv(
file_path,
sep="\t",
header=0,
na_values=["NA"],
usecols=[
"name",
"sequence",
"category",
"class",
"source",
"ref",
"chr",
"start",
"end",
"strand",
"variant_class",
"variant_pos",
"SPDI",
"allele",
"info",
],
dtype={
"name": "category",
"sequence": str,
"category": "category",
"class": "category",
"source": str,
"ref": str,
"chr": "category",
"start": "Int64",
"end": "Int64",
"strand": "category",
"variant_class": str,
"variant_pos": str,
"SPDI": str,
"allele": str,
"info": str,
},
).drop_duplicates()
# Set specific columns as arrays
df["variant_class"] = df["variant_class"].fillna("[]").apply(ast.literal_eval)
df["variant_pos"] = df["variant_pos"].fillna("[]").apply(ast.literal_eval)
df["SPDI"] = df["SPDI"].fillna("[]").apply(ast.literal_eval)
df["allele"] = df["allele"].fillna("[]").apply(ast.literal_eval)
# Set specific columns as categorical or integer types
df["category"] = pd.Categorical(df["category"])
df["class"] = pd.Categorical(df["class"])
df["chr"] = pd.Categorical(df["chr"])
df["strand"] = pd.Categorical(df["strand"])
df["name"] = pd.Categorical(df["name"].str.replace(r"[\s\[\]]", "_", regex=True))
# oligo name as index
df.set_index("name", inplace=True)
# Validate that the 'sequence' column contains only valid DNA characters
if np.any(~df["sequence"].str.match(r"^[ATGCatgc]+$", na=False)):
raise SequenceDesignException("sequence", file_path)
# Validate that the 'class' column contains only 'variant', 'element', 'synthetic' or 'scrambled'
valid_categories = {"variant", "element", "synthetic", "scrambled"}
if not set(df["category"].cat.categories).issubset(valid_categories):
raise SequenceDesignException("category", file_path)
# Validate that the 'category' column contains only:
valid_classes = {
"test",
"variant positive control",
"variant negative control",
"element active control",
"element inactive control",
}
if not set(df["class"].cat.categories).issubset(valid_classes):
raise SequenceDesignException("class", file_path)
return df