"""Estimate reasonable bin sizes from BAM read counts or depths."""
from __future__ import annotations
import logging
import os
import tempfile
from typing import Optional, Union, TYPE_CHECKING
import numpy as np
import pandas as pd
from skgenome import tabio, GenomicArray as GA
from . import coverage, samutil
from .antitarget import compare_chrom_names
from .descriptives import weighted_median
if TYPE_CHECKING:
from collections.abc import Iterable
[docs]
def midsize_file(fnames: Iterable[str]) -> list[str]:
"""Select the median-size file from several given filenames.
If an even number of files is given, selects the file just below the median.
"""
assert fnames, "No files provided to calculate the median size."
return sorted(fnames, key=lambda f: os.stat(f).st_size)[(len(fnames) - 1) // 2]
[docs]
def do_autobin(
bam_fname: str,
method: str,
targets: Optional[GA] = None,
access: Optional[GA] = None,
bp_per_bin: float = 100000.0,
target_min_size: int = 20,
target_max_size: int = 50000,
antitarget_min_size: int = 500,
antitarget_max_size: int = 1000000,
fasta: Optional[str] = None,
) -> tuple[tuple[float, int], tuple[float, int]]:
"""Quickly calculate reasonable bin sizes from BAM read counts.
Parameters
----------
bam_fname : str
Path to BAM file.
method : str
Sequencing method: 'wgs' (whole-genome sequencing), 'amplicon' (targeted
amplicon capture), or 'hybrid' (hybridization capture).
targets : GenomicArray, optional
Targeted genomic regions (required for 'hybrid' and 'amplicon').
access : GenomicArray, optional
Sequencing-accessible regions of the reference genome (for 'hybrid' and 'wgs').
bp_per_bin : float, optional
Desired number of sequencing read nucleotide bases mapped to each bin.
Default is 100000.0.
target_min_size : int, optional
Minimum target bin size. Default is 20.
target_max_size : int, optional
Maximum target bin size. Default is 50000.
antitarget_min_size : int, optional
Minimum antitarget bin size. Default is 500.
antitarget_max_size : int, optional
Maximum antitarget bin size. Default is 1000000.
fasta : str, optional
Path to reference genome FASTA file.
Returns
-------
tuple of tuple
Nested tuple: ((target depth, target avg. bin size),
(antitarget depth, antitarget avg. bin size)).
Raises
------
ValueError
If targets are required for the method but not provided or empty.
"""
if method in ("amplicon", "hybrid"):
if targets is None:
raise ValueError(
f"Target regions are required for method {method!r} but were "
"not provided."
)
if not len(targets):
raise ValueError(
f"Target regions are required for method {method!r} but were "
"not provided."
)
# Closes over bp_per_bin
def depth2binsize(depth: float, min_size: int, max_size: int) -> int:
if not depth:
return None
bin_size = round(bp_per_bin / depth)
if bin_size < min_size:
logging.info(
"Limiting est. bin size %d to given min. %d", bin_size, min_size
)
bin_size = min_size
elif bin_size > max_size:
logging.info(
"Limiting est. bin size %d to given max. %d", bin_size, max_size
)
bin_size = max_size
return bin_size
samutil.ensure_bam_index(bam_fname)
rc_table = samutil.idxstats(bam_fname, drop_unmapped=True, fasta=fasta)
read_len = samutil.get_read_length(bam_fname, fasta=fasta)
logging.info("Estimated read length %s", read_len)
# Dispatch
if method == "amplicon":
# From BAM index
# rc_table = update_chrom_length(rc_table, targets)
# tgt_depth = average_depth(rc_table, read_len)
# By sampling
tgt_depth = sample_region_cov(bam_fname, targets, fasta=fasta)
anti_depth = None
elif method == "hybrid":
tgt_depth, anti_depth = hybrid(
rc_table, read_len, bam_fname, targets, access, fasta
)
elif method == "wgs":
if access is not None and len(access):
rc_table = update_chrom_length(rc_table, access)
tgt_depth = average_depth(rc_table, read_len)
anti_depth = None
# Clip bin sizes to specified ranges
tgt_bin_size = depth2binsize(tgt_depth, target_min_size, target_max_size)
anti_bin_size = depth2binsize(anti_depth, antitarget_min_size, antitarget_max_size)
return ((tgt_depth, tgt_bin_size), (anti_depth, anti_bin_size))
[docs]
def hybrid(
rc_table: pd.DataFrame,
read_len: Union[int, float],
bam_fname: str,
targets: GA,
access: Optional[GA] = None,
fasta: Optional[str] = None,
) -> tuple:
"""Hybrid capture sequencing."""
# Identify off-target regions
if access is None:
access = idxstats2ga(rc_table, bam_fname)
# Verify BAM chromosome names match those in target BED
compare_chrom_names(access, targets)
antitargets = access.subtract(targets)
# Only examine chromosomes present in all 2-3 input datasets
rc_table, targets, antitargets = shared_chroms(rc_table, targets, antitargets)
# Deal with targets
target_depth = sample_region_cov(bam_fname, targets, fasta=fasta)
# Antitargets: subtract captured reads from total
target_length = region_size_by_chrom(targets)["length"]
target_reads = (target_length * target_depth / read_len).to_numpy()
anti_table = update_chrom_length(rc_table, antitargets)
anti_table = anti_table.assign(mapped=anti_table.mapped - target_reads)
anti_depth = average_depth(anti_table, read_len)
return target_depth, anti_depth
# ---
[docs]
def average_depth(rc_table: pd.DataFrame, read_length: Union[int, float]) -> float:
"""Estimate the average read depth across the genome.
Returns
-------
float
Median of the per-chromosome mean read depths, weighted by chromosome
size.
"""
mean_depths = read_length * rc_table.mapped / rc_table.length
return weighted_median(mean_depths, rc_table.length)
[docs]
def idxstats2ga(table: pd.DataFrame, bam_fname: str) -> GA:
return GA(
table.assign(start=0, end=table.length).loc[:, ("chromosome", "start", "end")],
meta_dict={"filename": bam_fname},
)
[docs]
def sample_region_cov(
bam_fname: str, regions: GA, max_num: int = 100, fasta: Optional[str] = None
) -> float:
"""Calculate read depth in a randomly sampled subset of regions."""
midsize_regions = sample_midsize_regions(regions, max_num)
with tempfile.NamedTemporaryFile(suffix=".bed", mode="w+t") as f:
tabio.write(regions.as_dataframe(midsize_regions), f, "bed4")
f.flush()
table = coverage.bedcov(f.name, bam_fname, 0, fasta)
# Mean read depth across all sampled regions
return table.basecount.sum() / (table.end - table.start).sum()
[docs]
def sample_midsize_regions(regions: GA, max_num: int) -> pd.DataFrame:
"""Randomly select a subset of up to `max_num` regions."""
sizes = regions.end - regions.start
lo_size, hi_size = np.percentile(sizes[sizes > 0], [25, 75])
midsize_regions = regions.data[(sizes >= lo_size) & (sizes <= hi_size)]
if len(midsize_regions) > max_num:
midsize_regions = midsize_regions.sample(max_num, random_state=0xA5EED)
return midsize_regions
[docs]
def shared_chroms(*tables) -> list:
"""Intersection of DataFrame .chromosome values."""
chroms = tables[0].chromosome.drop_duplicates()
for tab in tables[1:]:
if tab is not None:
new_chroms = tab.chromosome.drop_duplicates()
chroms = chroms[chroms.isin(new_chroms)]
return [None if tab is None else tab[tab.chromosome.isin(chroms)] for tab in tables]
[docs]
def update_chrom_length(rc_table: pd.DataFrame, regions: Optional[GA]) -> pd.DataFrame:
if regions is not None and len(regions):
chrom_sizes = region_size_by_chrom(regions)
rc_table = rc_table.merge(chrom_sizes, on="chromosome", how="inner")
rc_table["length"] = rc_table["length_y"] # ?
rc_table = rc_table.drop(["length_x", "length_y"], axis=1)
return rc_table
[docs]
def region_size_by_chrom(regions: GA) -> pd.DataFrame:
chromgroups = regions.data.groupby("chromosome", sort=False)
# sizes = chromgroups.apply(total_region_size) # XXX
sizes = [total_region_size(g) for _key, g in chromgroups]
return pd.DataFrame(
{"chromosome": regions.chromosome.drop_duplicates(), "length": sizes}
)
[docs]
def total_region_size(regions: GA) -> int:
"""Aggregate area of all genomic ranges in `regions`."""
return (regions.end - regions.start).sum()