Source code for modalysis.core.gff

"""Core GFF formatting and expression annotation routines."""

import csv
import logging
from pathlib import Path

from modalysis.core.chromosomes import normalize_allowed_chromosomes

logger = logging.getLogger(__name__)


[docs] def format( input_path: str, output_path: str, output_name: str, allowed_chromosomes: list[str], ) -> None: """Format raw GFF rows into gene-level `.modalysis` output.""" output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis") logger.info( "Formatting GFF. Input: %s, Output: %s", input_path, output_path_with_name ) input_file = open(input_path, newline="") output_file = open(output_path_with_name, "w", newline="") reader = csv.reader(input_file, delimiter="\t") writer = csv.writer(output_file, delimiter="\t") NUM_GFF_COLUMNS = 9 num_skipped_rows = 0 num_processed_rows = 0 writer.writerow(["CHROMOSOME", "START", "END", "GENE_ID", "DESCRIPTION"]) allowed_chromosomes = normalize_allowed_chromosomes(allowed_chromosomes) for row in reader: if len(row) != NUM_GFF_COLUMNS: num_skipped_rows += 1 continue feature_type = row[2] if feature_type != "protein_coding_gene": num_skipped_rows += 1 continue chromosome = row[0].upper() if chromosome not in allowed_chromosomes: num_skipped_rows += 1 continue # Modkit uses zero-based indexing whereas GFF uses one-based indexing. # We are following zero-based indexing. start = int(row[3]) - 1 end = row[4] attributes = dict(map(lambda x: x.split("=", 1), row[8].split(";"))) gene_id = attributes["ID"] description = attributes["description"] writer.writerow([chromosome, start, end, gene_id, description]) num_processed_rows += 1 logger.info( "Formatted GFF. Processed %s rows. Skipped %s rows.", num_processed_rows, num_skipped_rows, ) input_file.close() output_file.close()
[docs] def annotate( gff_path: str, expression_paths: list[str], expression_labels: list[str], output_path: str, output_name: str, ) -> None: """Annotate formatted GFF genes with one or more expression sources.""" if len(expression_paths) != len(expression_labels): raise ValueError( "Number of expression paths (%d) must match number of expression labels (%d)" % (len(expression_paths), len(expression_labels)) ) output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis") logger.info("Annotating GFF. GFF: %s, Output: %s", gff_path, output_path_with_name) # Parse each expression file into a dict: {gene_id_upper -> value_upper} expression_data: list[tuple[str, dict[str, str]]] = [] for path, label in zip(expression_paths, expression_labels): expr_dict = {} expr_file = open(path, newline="") expr_reader = csv.reader(expr_file, delimiter="\t") for row in expr_reader: if len(row) >= 2: gene_id = row[0].strip().upper() value = row[1].strip().upper() expr_dict[gene_id] = value expr_file.close() expression_data.append((label, expr_dict)) logger.debug( "Loaded %d expression entries from %s (%s)", len(expr_dict), path, label ) input_file = open(gff_path, newline="") output_file = open(output_path_with_name, "w", newline="") reader = csv.reader(input_file, delimiter="\t") writer = csv.writer(output_file, delimiter="\t") header = next(reader) writer.writerow(header + ["EXPRESSION"]) num_processed_rows = 0 num_annotated_rows = 0 for row in reader: gene_id = row[3].strip().upper() parts = [] for label, expr_dict in expression_data: if gene_id in expr_dict: parts.append("%s: %s" % (label, expr_dict[gene_id])) expression_str = "; ".join(parts) if parts: num_annotated_rows += 1 writer.writerow(row + [expression_str]) num_processed_rows += 1 logger.info( "Annotated GFF. Processed %s rows. %s rows had expression annotations.", num_processed_rows, num_annotated_rows, ) input_file.close() output_file.close()