Source code for modalysis.core.gff
"""Core GFF formatting and expression annotation routines."""
import csv
import logging
from pathlib import Path
from modalysis.core.chromosomes import normalize_allowed_chromosomes
logger = logging.getLogger(__name__)
[docs]
def format(
input_path: str,
output_path: str,
output_name: str,
allowed_chromosomes: list[str],
) -> None:
"""Format raw GFF rows into gene-level `.modalysis` output."""
output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis")
logger.info(
"Formatting GFF. Input: %s, Output: %s", input_path, output_path_with_name
)
input_file = open(input_path, newline="")
output_file = open(output_path_with_name, "w", newline="")
reader = csv.reader(input_file, delimiter="\t")
writer = csv.writer(output_file, delimiter="\t")
NUM_GFF_COLUMNS = 9
num_skipped_rows = 0
num_processed_rows = 0
writer.writerow(["CHROMOSOME", "START", "END", "GENE_ID", "DESCRIPTION"])
allowed_chromosomes = normalize_allowed_chromosomes(allowed_chromosomes)
for row in reader:
if len(row) != NUM_GFF_COLUMNS:
num_skipped_rows += 1
continue
feature_type = row[2]
if feature_type != "protein_coding_gene":
num_skipped_rows += 1
continue
chromosome = row[0].upper()
if chromosome not in allowed_chromosomes:
num_skipped_rows += 1
continue
# Modkit uses zero-based indexing whereas GFF uses one-based indexing.
# We are following zero-based indexing.
start = int(row[3]) - 1
end = row[4]
attributes = dict(map(lambda x: x.split("=", 1), row[8].split(";")))
gene_id = attributes["ID"]
description = attributes["description"]
writer.writerow([chromosome, start, end, gene_id, description])
num_processed_rows += 1
logger.info(
"Formatted GFF. Processed %s rows. Skipped %s rows.",
num_processed_rows,
num_skipped_rows,
)
input_file.close()
output_file.close()
[docs]
def annotate(
gff_path: str,
expression_paths: list[str],
expression_labels: list[str],
output_path: str,
output_name: str,
) -> None:
"""Annotate formatted GFF genes with one or more expression sources."""
if len(expression_paths) != len(expression_labels):
raise ValueError(
"Number of expression paths (%d) must match number of expression labels (%d)"
% (len(expression_paths), len(expression_labels))
)
output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis")
logger.info("Annotating GFF. GFF: %s, Output: %s", gff_path, output_path_with_name)
# Parse each expression file into a dict: {gene_id_upper -> value_upper}
expression_data: list[tuple[str, dict[str, str]]] = []
for path, label in zip(expression_paths, expression_labels):
expr_dict = {}
expr_file = open(path, newline="")
expr_reader = csv.reader(expr_file, delimiter="\t")
for row in expr_reader:
if len(row) >= 2:
gene_id = row[0].strip().upper()
value = row[1].strip().upper()
expr_dict[gene_id] = value
expr_file.close()
expression_data.append((label, expr_dict))
logger.debug(
"Loaded %d expression entries from %s (%s)", len(expr_dict), path, label
)
input_file = open(gff_path, newline="")
output_file = open(output_path_with_name, "w", newline="")
reader = csv.reader(input_file, delimiter="\t")
writer = csv.writer(output_file, delimiter="\t")
header = next(reader)
writer.writerow(header + ["EXPRESSION"])
num_processed_rows = 0
num_annotated_rows = 0
for row in reader:
gene_id = row[3].strip().upper()
parts = []
for label, expr_dict in expression_data:
if gene_id in expr_dict:
parts.append("%s: %s" % (label, expr_dict[gene_id]))
expression_str = "; ".join(parts)
if parts:
num_annotated_rows += 1
writer.writerow(row + [expression_str])
num_processed_rows += 1
logger.info(
"Annotated GFF. Processed %s rows. %s rows had expression annotations.",
num_processed_rows,
num_annotated_rows,
)
input_file.close()
output_file.close()