Source code for modalysis.core.pileup
"""Core pileup formatting and merge routines."""
import csv
import logging
from pathlib import Path
from modalysis.core.chromosomes import normalize_allowed_chromosomes
logger = logging.getLogger(__name__)
[docs]
def format(
input_path: str,
output_path: str,
output_name: str,
allowed_chromosomes: list[str],
) -> None:
"""Format a raw pileup file into canonical `.modalysis` columns."""
output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis")
logger.info(
"Formatting Pileup. Input: %s, Output: %s", input_path, output_path_with_name
)
input_file = open(input_path, newline="")
output_file = open(output_path_with_name, "w", newline="")
reader = csv.reader(input_file, delimiter="\t")
writer = csv.writer(output_file, delimiter="\t")
NUM_PILEUP_COLUMNS = 18
num_skipped_rows = 0
num_processed_rows = 0
allowed_chromosomes = normalize_allowed_chromosomes(allowed_chromosomes)
writer.writerow(
["CHROMOSOME", "START", "END", "MODIFICATION", "N_VALID_COV", "N_MOD"]
)
for row in reader:
if len(row) != NUM_PILEUP_COLUMNS:
num_skipped_rows += 1
continue
chromosome = row[0].upper()
if chromosome not in allowed_chromosomes:
logger.debug("%s not in allowed chromosomes.", chromosome)
num_skipped_rows += 1
continue
start = row[1]
end = row[2]
modification = row[3]
n_valid_cov = row[9]
n_mod = row[11]
writer.writerow([chromosome, start, end, modification, n_valid_cov, n_mod])
num_processed_rows += 1
logger.info(
"Formatted Pileup: Processed %s rows. Skipped %s rows.",
num_processed_rows,
num_skipped_rows,
)
input_file.close()
output_file.close()
[docs]
def merge(
pileup_paths: list[str],
output_path: str,
output_name: str,
min_files: int = 2,
min_file_coverage: float = 50.0,
min_reads: int = 5,
) -> None:
"""Merge formatted pileup files by genomic key and apply coverage/read filters."""
output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis")
logger.info(
"Merging Pileups. Inputs: %s, Output: %s", pileup_paths, output_path_with_name
)
total_files = len(pileup_paths)
# key: (chromosome, start, end, modification) -> [n_valid_cov, n_mod, n_files]
merged: dict[tuple[str, str, str, str], list[int]] = {}
for pileup_path in pileup_paths:
input_file = open(pileup_path, newline="")
reader = csv.reader(input_file, delimiter="\t")
header = next(reader)
logger.debug("Reading pileup %s with header: %s", pileup_path, header)
for row in reader:
chromosome = row[0]
start = row[1]
end = row[2]
modification = row[3]
n_valid_cov = int(row[4])
n_mod = int(row[5])
key = (chromosome, start, end, modification)
if key not in merged:
merged[key] = [0, 0, 0]
merged[key][0] += n_valid_cov
merged[key][1] += n_mod
merged[key][2] += 1
input_file.close()
output_file = open(output_path_with_name, "w", newline="")
writer = csv.writer(output_file, delimiter="\t")
writer.writerow(
[
"CHROMOSOME",
"START",
"END",
"MODIFICATION",
"N_VALID_COV",
"N_MOD",
"TOTAL_FILES",
"N_FILES",
]
)
num_written_rows = 0
num_filtered_rows = 0
for key, values in merged.items():
n_valid_cov, n_mod, n_files = values
file_coverage = (n_files / total_files) * 100
if (
n_files < min_files
or file_coverage < min_file_coverage
or n_valid_cov < min_reads
):
num_filtered_rows += 1
continue
chromosome, start, end, modification = key
writer.writerow(
[
chromosome,
start,
end,
modification,
n_valid_cov,
n_mod,
total_files,
n_files,
]
)
num_written_rows += 1
logger.info(
"Merged Pileups: Wrote %s rows. Filtered %s rows.",
num_written_rows,
num_filtered_rows,
)
output_file.close()