Source code for modalysis.core.dmr

"""Core DMR formatting, annotation, and aggregation routines."""

import csv
import logging
import zipfile
from pathlib import Path
from xml.sax.saxutils import escape

from modalysis.core.chromosomes import normalize_allowed_chromosomes
from modalysis.core.expression import parse_expression_field
from modalysis.core.gene_regions import (
    build_gene_regions,
    find_genes_overlapping_interval,
    parse_gff,
)

logger = logging.getLogger(__name__)

EXPRESSION_PROFILES = ["UP", "DOWN", "NDE"]
EFFECT_SIGNS = ["NON_NEGATIVE", "NEGATIVE"]
REGIONS = ["PROMOTER", "BODY", "ENHANCER"]


[docs] def _to_excel_column_name(column_index: int) -> str: """Convert 1-based integer column index to Excel letter notation.""" result = "" while column_index > 0: column_index, remainder = divmod(column_index - 1, 26) result = chr(ord("A") + remainder) + result return result
[docs] def _excel_inline_string_cell(row: int, column: int, value: str) -> str: """Build XML for an inline string cell in an XLSX worksheet.""" ref = "%s%s" % (_to_excel_column_name(column), row) return '<c r="%s" t="inlineStr"><is><t>%s</t></is></c>' % (ref, escape(value))
[docs] def _excel_number_cell(row: int, column: int, value: int) -> str: """Build XML for a numeric cell in an XLSX worksheet.""" ref = "%s%s" % (_to_excel_column_name(column), row) return '<c r="%s"><v>%s</v></c>' % (ref, value)
[docs] def _write_gene_counts_excel( output_path_with_name: Path, manifestation_order: list[str], modification_order: list[str], count_lookup: dict[tuple[str, str, str, str, str], int], ) -> None: """Write a compact XLSX workbook for aggregated DMR gene counts.""" output_excel_path = output_path_with_name.with_suffix(".xlsx") logger.info("Writing DMR gene counts Excel workbook: %s", output_excel_path) rows_xml = [] merge_ranges = [] merge_ranges.append("A1:A3") merge_ranges.append("B1:B3") row_1_cells = [ _excel_inline_string_cell(1, 1, "MANIFESTATION"), _excel_inline_string_cell(1, 2, "EXPRESSION_PROFILE"), ] row_2_cells = [] row_3_cells = [] current_col = 3 for modification in modification_order: modification_start_col = current_col modification_span = len(EFFECT_SIGNS) * len(REGIONS) modification_end_col = modification_start_col + modification_span - 1 row_1_cells.append(_excel_inline_string_cell(1, modification_start_col, modification)) merge_ranges.append( "%s1:%s1" % ( _to_excel_column_name(modification_start_col), _to_excel_column_name(modification_end_col), ) ) for effect_sign in EFFECT_SIGNS: sign_start_col = current_col sign_end_col = sign_start_col + len(REGIONS) - 1 row_2_cells.append(_excel_inline_string_cell(2, sign_start_col, effect_sign)) merge_ranges.append( "%s2:%s2" % ( _to_excel_column_name(sign_start_col), _to_excel_column_name(sign_end_col), ) ) for region in REGIONS: row_3_cells.append(_excel_inline_string_cell(3, current_col, region)) current_col += 1 rows_xml.append('<row r="1">%s</row>' % "".join(row_1_cells)) rows_xml.append('<row r="2">%s</row>' % "".join(row_2_cells)) rows_xml.append('<row r="3">%s</row>' % "".join(row_3_cells)) row_num = 4 for manifestation in manifestation_order: for expression_profile in EXPRESSION_PROFILES: row_cells = [ _excel_inline_string_cell(row_num, 1, manifestation), _excel_inline_string_cell(row_num, 2, expression_profile), ] col_num = 3 for modification in modification_order: for effect_sign in EFFECT_SIGNS: for region in REGIONS: key = ( manifestation, expression_profile, effect_sign, modification, region, ) value = count_lookup.get(key, 0) row_cells.append(_excel_number_cell(row_num, col_num, value)) col_num += 1 rows_xml.append('<row r="%s">%s</row>' % (row_num, "".join(row_cells))) row_num += 1 merge_cells_xml = "".join( ['<mergeCell ref="%s"/>' % merge_range for merge_range in merge_ranges] ) sheet_xml = ( '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>' '<worksheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main">' '<sheetViews><sheetView workbookViewId="0"/></sheetViews>' '<sheetFormatPr defaultRowHeight="15"/>' '<sheetData>%s</sheetData>' '<mergeCells count="%s">%s</mergeCells>' '<pageMargins left="0.7" right="0.7" top="0.75" bottom="0.75" header="0.3" footer="0.3"/>' "</worksheet>" ) % ("".join(rows_xml), len(merge_ranges), merge_cells_xml) workbook_xml = ( '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>' '<workbook xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" ' 'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">' '<sheets><sheet name="GeneCounts" sheetId="1" r:id="rId1"/></sheets>' "</workbook>" ) workbook_rels_xml = ( '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>' '<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">' '<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/worksheet" Target="worksheets/sheet1.xml"/>' "</Relationships>" ) root_rels_xml = ( '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>' '<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">' '<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="xl/workbook.xml"/>' "</Relationships>" ) content_types_xml = ( '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>' '<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">' '<Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>' '<Default Extension="xml" ContentType="application/xml"/>' '<Override PartName="/xl/workbook.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet.main+xml"/>' '<Override PartName="/xl/worksheets/sheet1.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.worksheet+xml"/>' "</Types>" ) workbook_zip = zipfile.ZipFile(output_excel_path, "w", compression=zipfile.ZIP_DEFLATED) workbook_zip.writestr("[Content_Types].xml", content_types_xml) workbook_zip.writestr("_rels/.rels", root_rels_xml) workbook_zip.writestr("xl/workbook.xml", workbook_xml) workbook_zip.writestr("xl/_rels/workbook.xml.rels", workbook_rels_xml) workbook_zip.writestr("xl/worksheets/sheet1.xml", sheet_xml) workbook_zip.close()
[docs] def format( input_path: str, output_path: str, output_name: str, allowed_chromosomes: list[str], min_score: float = 5, max_p_value: float = 0.05, min_pct_a_samples: float = 50.0, min_pct_b_samples: float = 50.0, min_reads: int = 5, ) -> None: """Filter and normalize raw DMR rows into the `.modalysis` schema.""" output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis") logger.info( "Formatting DMR. Input: %s, Output: %s", input_path, output_path_with_name ) input_file = open(input_path, newline="") output_file = open(output_path_with_name, "w", newline="") reader = csv.reader(input_file, delimiter="\t") writer = csv.writer(output_file, delimiter="\t") NUM_DMR_COLUMNS = 23 num_skipped_rows = 0 num_processed_rows = 0 failed_score = 0 failed_p_value = 0 failed_pct_a = 0 failed_pct_b = 0 failed_reads = 0 writer.writerow( [ "CHROMOSOME", "START", "END", "SCORE", "MAP_BASED_P_VALUE", "EFFECT_SIZE", "PCT_A_SAMPLES", "PCT_B_SAMPLES", ] ) allowed_chromosomes = normalize_allowed_chromosomes(allowed_chromosomes) for row in reader: if len(row) != NUM_DMR_COLUMNS: num_skipped_rows += 1 continue chromosome = row[0].upper() if chromosome not in allowed_chromosomes: num_skipped_rows += 1 continue start = row[1] end = row[2] score = row[4] a_total = row[7] b_total = row[9] map_based_p_value = row[14] effect_size = row[15] pct_a_samples = row[18] pct_b_samples = row[19] if float(score) < min_score: failed_score += 1 if float(map_based_p_value) > max_p_value: failed_p_value += 1 if float(pct_a_samples) < min_pct_a_samples: failed_pct_a += 1 if float(pct_b_samples) < min_pct_b_samples: failed_pct_b += 1 if int(a_total) < min_reads or int(b_total) < min_reads: failed_reads += 1 passes_all = ( float(score) >= min_score and float(map_based_p_value) <= max_p_value and float(pct_a_samples) >= min_pct_a_samples and float(pct_b_samples) >= min_pct_b_samples and int(a_total) >= min_reads and int(b_total) >= min_reads ) if not passes_all: num_skipped_rows += 1 continue writer.writerow( [ chromosome, start, end, score, map_based_p_value, effect_size, pct_a_samples, pct_b_samples, ] ) num_processed_rows += 1 logger.info( "Formatted DMR. Processed %s rows. Skipped %s rows.", num_processed_rows, num_skipped_rows, ) logger.info( "Filter failure breakdown - score: %s, p_value: %s, pct_a: %s, pct_b: %s, reads: %s", failed_score, failed_p_value, failed_pct_a, failed_pct_b, failed_reads, ) input_file.close() output_file.close()
[docs] def annotate( dmr_path: str, gff_path: str, output_path: str, output_name: str, ) -> None: """Annotate DMR intervals with overlapping promoter/body/enhancer gene IDs.""" output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis") logger.info( "Annotating DMR. DMR: %s, GFF: %s, Output: %s", dmr_path, gff_path, output_path_with_name, ) genes_by_chromosome = parse_gff(gff_path) regions = build_gene_regions(genes_by_chromosome) input_file = open(dmr_path, newline="") output_file = open(output_path_with_name, "w", newline="") reader = csv.reader(input_file, delimiter="\t") writer = csv.writer(output_file, delimiter="\t") header = next(reader) writer.writerow(header + ["PROMOTER", "BODY", "ENHANCER"]) num_processed_rows = 0 num_annotated_rows = 0 for row in reader: chromosome = row[0] interval_start = int(row[1]) interval_end = int(row[2]) promoter_genes = [] body_genes = [] enhancer_genes = [] if chromosome in regions: chrom_regions = regions[chromosome] promoter_genes = find_genes_overlapping_interval( interval_start, interval_end, chrom_regions["promoter"], chrom_regions["promoter_starts"], ) body_genes = find_genes_overlapping_interval( interval_start, interval_end, chrom_regions["body"], chrom_regions["body_starts"], ) enhancer_genes = find_genes_overlapping_interval( interval_start, interval_end, chrom_regions["enhancer"], chrom_regions["enhancer_starts"], ) promoter_str = ",".join(promoter_genes) body_str = ",".join(body_genes) enhancer_str = ",".join(enhancer_genes) if promoter_genes or body_genes or enhancer_genes: num_annotated_rows += 1 writer.writerow(row + [promoter_str, body_str, enhancer_str]) num_processed_rows += 1 logger.info( "Annotated DMR. Processed %s rows. %s rows had gene annotations.", num_processed_rows, num_annotated_rows, ) input_file.close() output_file.close()
[docs] def gene_counts( annotated_dmr_paths: list[str], manifestations: list[str], modifications: list[str], manifestation_labels: list[str], expression_labels: list[str], annotated_gff_path: str, output_path: str, output_name: str, output_excel: bool = False, ) -> None: """Count unique genes by manifestation/expression/effect/modification/region.""" if len(annotated_dmr_paths) != len(manifestations): raise ValueError( "Number of annotated DMR paths (%d) must match number of manifestations (%d)" % (len(annotated_dmr_paths), len(manifestations)) ) if len(annotated_dmr_paths) != len(modifications): raise ValueError( "Number of annotated DMR paths (%d) must match number of modifications (%d)" % (len(annotated_dmr_paths), len(modifications)) ) if len(manifestation_labels) != len(expression_labels): raise ValueError( "Number of manifestation labels (%d) must match number of expression labels (%d)" % (len(manifestation_labels), len(expression_labels)) ) output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis") logger.info( "Building DMR gene counts table. Annotated GFF: %s, Output: %s", annotated_gff_path, output_path_with_name, ) manifestation_to_expression_label = {} for manifestation_label, expression_label in zip( manifestation_labels, expression_labels ): manifestation_to_expression_label[manifestation_label.strip().upper()] = ( expression_label.strip().upper() ) gene_to_expression = {} gff_file = open(annotated_gff_path, newline="") gff_reader = csv.DictReader(gff_file, delimiter="\t") for row in gff_reader: gene_id = row["GENE_ID"].strip().upper() expression_by_label = parse_expression_field(row.get("EXPRESSION", "")) gene_to_expression[gene_id] = expression_by_label gff_file.close() genes_by_combination = {} file_constraints = [] num_processed_rows = 0 for dmr_path, manifestation, modification in zip( annotated_dmr_paths, manifestations, modifications ): normalized_manifestation = manifestation.strip().upper() normalized_modification = modification.strip().upper() file_constraints.append( (dmr_path, normalized_manifestation, normalized_modification) ) if normalized_manifestation not in manifestation_to_expression_label: raise ValueError( "No expression label configured for manifestation '%s'" % normalized_manifestation ) expression_label = manifestation_to_expression_label[normalized_manifestation] dmr_file = open(dmr_path, newline="") dmr_reader = csv.DictReader(dmr_file, delimiter="\t") for row in dmr_reader: effect_size = float(row["EFFECT_SIZE"]) effect_sign = "NON_NEGATIVE" if effect_size >= 0 else "NEGATIVE" for region in REGIONS: genes_field = row[region].strip() if not genes_field: continue genes = [gene.strip().upper() for gene in genes_field.split(",")] for gene in genes: if not gene: continue if gene not in gene_to_expression: continue expression_profile = gene_to_expression[gene].get(expression_label) if expression_profile not in EXPRESSION_PROFILES: continue key = ( dmr_path, normalized_manifestation, expression_profile, effect_sign, normalized_modification, region, ) if key not in genes_by_combination: genes_by_combination[key] = set() genes_by_combination[key].add(gene) num_processed_rows += 1 dmr_file.close() output_file = open(output_path_with_name, "w", newline="") writer = csv.writer(output_file, delimiter="\t") writer.writerow( [ "MANIFESTATION", "EXPRESSION_PROFILE", "EFFECT_SIZE_SIGN", "MODIFICATION", "REGION", "GENE_COUNT", ] ) for dmr_path, manifestation, modification in file_constraints: for expression_profile in EXPRESSION_PROFILES: for effect_sign in EFFECT_SIGNS: for region in REGIONS: key = ( dmr_path, manifestation, expression_profile, effect_sign, modification, region, ) genes = genes_by_combination.get(key, set()) writer.writerow( [ manifestation, expression_profile, effect_sign, modification, region, len(genes), ] ) logger.info( "Built DMR gene counts table. Processed %s DMR rows from %s files.", num_processed_rows, len(annotated_dmr_paths), ) output_file.close() if output_excel: manifestation_order = [] for manifestation_label in manifestation_labels: normalized_manifestation = manifestation_label.strip().upper() if normalized_manifestation not in manifestation_order: manifestation_order.append(normalized_manifestation) modification_order = [] for modification_label in modifications: normalized_modification = modification_label.strip().upper() if normalized_modification not in modification_order: modification_order.append(normalized_modification) combined_gene_sets = {} for key, genes in genes_by_combination.items(): ( _dmr_path, manifestation, expression_profile, effect_sign, modification, region, ) = key aggregate_key = ( manifestation, expression_profile, effect_sign, modification, region, ) if aggregate_key not in combined_gene_sets: combined_gene_sets[aggregate_key] = set() combined_gene_sets[aggregate_key].update(genes) count_lookup = { key: len(genes) for key, genes in combined_gene_sets.items() } _write_gene_counts_excel( output_path_with_name, manifestation_order, modification_order, count_lookup, )
[docs] def common_genes( annotated_dmr_paths: list[str], manifestations: list[str], modifications: list[str], manifestation_labels: list[str], expression_labels: list[str], modification_a: str, modification_b: str, annotated_gff_path: str, output_path: str, output_name: str, ) -> None: """Find common negative-effect genes across two modifications by region.""" if len(annotated_dmr_paths) != len(manifestations): raise ValueError( "Number of annotated DMR paths (%d) must match number of manifestations (%d)" % (len(annotated_dmr_paths), len(manifestations)) ) if len(annotated_dmr_paths) != len(modifications): raise ValueError( "Number of annotated DMR paths (%d) must match number of modifications (%d)" % (len(annotated_dmr_paths), len(modifications)) ) if len(manifestation_labels) != len(expression_labels): raise ValueError( "Number of manifestation labels (%d) must match number of expression labels (%d)" % (len(manifestation_labels), len(expression_labels)) ) normalized_modification_a = modification_a.strip().upper() normalized_modification_b = modification_b.strip().upper() if normalized_modification_a == normalized_modification_b: raise ValueError("Modification A and B must be different") output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis") logger.info( "Building common DMR genes table. Annotated GFF: %s, Output: %s", annotated_gff_path, output_path_with_name, ) manifestation_to_expression_label = {} for manifestation_label, expression_label in zip( manifestation_labels, expression_labels ): manifestation_to_expression_label[manifestation_label.strip().upper()] = ( expression_label.strip().upper() ) gene_to_expression = {} gff_file = open(annotated_gff_path, newline="") gff_reader = csv.DictReader(gff_file, delimiter="\t") for row in gff_reader: gene_id = row["GENE_ID"].strip().upper() expression_by_label = parse_expression_field(row.get("EXPRESSION", "")) gene_to_expression[gene_id] = expression_by_label gff_file.close() genes_by_manifestation_modification_region = {} manifestation_order = [] for manifestation_label in manifestation_labels: normalized_manifestation = manifestation_label.strip().upper() if normalized_manifestation not in manifestation_order: manifestation_order.append(normalized_manifestation) num_processed_rows = 0 for dmr_path, manifestation, modification in zip( annotated_dmr_paths, manifestations, modifications ): normalized_manifestation = manifestation.strip().upper() normalized_modification = modification.strip().upper() dmr_file = open(dmr_path, newline="") dmr_reader = csv.DictReader(dmr_file, delimiter="\t") for row in dmr_reader: effect_size = float(row["EFFECT_SIZE"]) if effect_size >= 0: num_processed_rows += 1 continue for region in REGIONS: genes_field = row[region].strip() if not genes_field: continue key = (normalized_manifestation, normalized_modification, region) if key not in genes_by_manifestation_modification_region: genes_by_manifestation_modification_region[key] = set() genes = [gene.strip().upper() for gene in genes_field.split(",")] for gene in genes: if not gene: continue genes_by_manifestation_modification_region[key].add(gene) num_processed_rows += 1 dmr_file.close() output_file = open(output_path_with_name, "w", newline="") writer = csv.writer(output_file, delimiter="\t") writer.writerow( [ "ROW_TYPE", "MANIFESTATION", "REGION", "MODIFICATION_A", "MODIFICATION_B", "COMMON_GENE_COUNT", "GENE_ID", "EXPRESSION_STATUS", ] ) num_common_genes = 0 num_summary_rows = 0 for manifestation in manifestation_order: if manifestation not in manifestation_to_expression_label: raise ValueError( "No expression label configured for manifestation '%s'" % manifestation ) expression_label = manifestation_to_expression_label[manifestation] for region in REGIONS: genes_mod_a = genes_by_manifestation_modification_region.get( (manifestation, normalized_modification_a, region), set() ) genes_mod_b = genes_by_manifestation_modification_region.get( (manifestation, normalized_modification_b, region), set() ) region_common_genes = sorted(genes_mod_a.intersection(genes_mod_b)) writer.writerow( [ "SUMMARY", manifestation, region, normalized_modification_a, normalized_modification_b, len(region_common_genes), "", "", ] ) num_summary_rows += 1 for gene in region_common_genes: expression_status = gene_to_expression.get(gene, {}).get( expression_label, "" ) writer.writerow( [ "GENE", manifestation, region, normalized_modification_a, normalized_modification_b, "", gene, expression_status, ] ) num_common_genes += 1 logger.info( "Built common DMR genes table. Processed %s DMR rows. Wrote %s summary rows and %s common genes.", num_processed_rows, num_summary_rows, num_common_genes, ) output_file.close()