Source code for modalysis.client.pileup

"""HTTP client calls for pileup endpoints."""

import logging

import requests
from modalysis.constants import DEFAULT_BASE_URL

logger = logging.getLogger(__name__)


[docs] def pileup_merge( pileup_paths: list[str], output_path: str, output_name: str, min_files: int = 2, min_file_coverage: float = 50.0, min_reads: int = 5, base_url: str = DEFAULT_BASE_URL, ) -> dict[str, str]: """POST a pileup merge request to the server.""" url = f"{base_url}/pileup/merge" payload = { "pileup_paths": pileup_paths, "output_path": output_path, "output_name": output_name, "min_files": min_files, "min_file_coverage": min_file_coverage, "min_reads": min_reads, } logger.info("Client sending pileup merge request to %s", url) response = requests.post(url, json=payload) response.raise_for_status() return response.json()
[docs] def pileup_format( input_path: str, output_path: str, output_name: str, allowed_chromosomes: list[str], base_url: str = DEFAULT_BASE_URL, ) -> dict[str, str]: """POST a pileup format request to the server.""" url = f"{base_url}/pileup/format" payload = { "input_path": input_path, "output_path": output_path, "output_name": output_name, "allowed_chromosomes": allowed_chromosomes, } logger.info("Client sending pileup format request to %s", url) response = requests.post(url, json=payload) response.raise_for_status() return response.json()