Source code for modalysis.core.pileup

"""Core pileup formatting and merge routines."""

import csv
import logging
from pathlib import Path

from modalysis.core.chromosomes import normalize_allowed_chromosomes

logger = logging.getLogger(__name__)


[docs] def format( input_path: str, output_path: str, output_name: str, allowed_chromosomes: list[str], ) -> None: """Format a raw pileup file into canonical `.modalysis` columns.""" output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis") logger.info( "Formatting Pileup. Input: %s, Output: %s", input_path, output_path_with_name ) input_file = open(input_path, newline="") output_file = open(output_path_with_name, "w", newline="") reader = csv.reader(input_file, delimiter="\t") writer = csv.writer(output_file, delimiter="\t") NUM_PILEUP_COLUMNS = 18 num_skipped_rows = 0 num_processed_rows = 0 allowed_chromosomes = normalize_allowed_chromosomes(allowed_chromosomes) writer.writerow( ["CHROMOSOME", "START", "END", "MODIFICATION", "N_VALID_COV", "N_MOD"] ) for row in reader: if len(row) != NUM_PILEUP_COLUMNS: num_skipped_rows += 1 continue chromosome = row[0].upper() if chromosome not in allowed_chromosomes: logger.debug("%s not in allowed chromosomes.", chromosome) num_skipped_rows += 1 continue start = row[1] end = row[2] modification = row[3] n_valid_cov = row[9] n_mod = row[11] writer.writerow([chromosome, start, end, modification, n_valid_cov, n_mod]) num_processed_rows += 1 logger.info( "Formatted Pileup: Processed %s rows. Skipped %s rows.", num_processed_rows, num_skipped_rows, ) input_file.close() output_file.close()
[docs] def merge( pileup_paths: list[str], output_path: str, output_name: str, min_files: int = 2, min_file_coverage: float = 50.0, min_reads: int = 5, ) -> None: """Merge formatted pileup files by genomic key and apply coverage/read filters.""" output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis") logger.info( "Merging Pileups. Inputs: %s, Output: %s", pileup_paths, output_path_with_name ) total_files = len(pileup_paths) # key: (chromosome, start, end, modification) -> [n_valid_cov, n_mod, n_files] merged: dict[tuple[str, str, str, str], list[int]] = {} for pileup_path in pileup_paths: input_file = open(pileup_path, newline="") reader = csv.reader(input_file, delimiter="\t") header = next(reader) logger.debug("Reading pileup %s with header: %s", pileup_path, header) for row in reader: chromosome = row[0] start = row[1] end = row[2] modification = row[3] n_valid_cov = int(row[4]) n_mod = int(row[5]) key = (chromosome, start, end, modification) if key not in merged: merged[key] = [0, 0, 0] merged[key][0] += n_valid_cov merged[key][1] += n_mod merged[key][2] += 1 input_file.close() output_file = open(output_path_with_name, "w", newline="") writer = csv.writer(output_file, delimiter="\t") writer.writerow( [ "CHROMOSOME", "START", "END", "MODIFICATION", "N_VALID_COV", "N_MOD", "TOTAL_FILES", "N_FILES", ] ) num_written_rows = 0 num_filtered_rows = 0 for key, values in merged.items(): n_valid_cov, n_mod, n_files = values file_coverage = (n_files / total_files) * 100 if ( n_files < min_files or file_coverage < min_file_coverage or n_valid_cov < min_reads ): num_filtered_rows += 1 continue chromosome, start, end, modification = key writer.writerow( [ chromosome, start, end, modification, n_valid_cov, n_mod, total_files, n_files, ] ) num_written_rows += 1 logger.info( "Merged Pileups: Wrote %s rows. Filtered %s rows.", num_written_rows, num_filtered_rows, ) output_file.close()