Source code for modalysis.server.pileup

"""FastAPI routes for pileup operations."""

import logging

from fastapi import APIRouter

from modalysis.core import pileup as core_pileup
from modalysis.server.models import PileupFormatRequest, PileupMergeRequest

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/pileup")


[docs] @router.post("/format") def pileup_format(request: PileupFormatRequest) -> dict[str, str]: """Handle POST `/pileup/format` and run core pileup formatting.""" logger.info("Server received pileup format request: %s", request) core_pileup.format( input_path=request.input_path, output_path=request.output_path, output_name=request.output_name, allowed_chromosomes=request.allowed_chromosomes, ) return {"status": "success"}
[docs] @router.post("/merge") def pileup_merge(request: PileupMergeRequest) -> dict[str, str]: """Handle POST `/pileup/merge` and run core pileup merging.""" logger.info("Server received pileup merge request: %s", request) core_pileup.merge( pileup_paths=request.pileup_paths, output_path=request.output_path, output_name=request.output_name, min_files=request.min_files, min_file_coverage=request.min_file_coverage, min_reads=request.min_reads, ) return {"status": "success"}