"""The 'batch' command."""
from __future__ import annotations
import logging
import os
from matplotlib import pyplot
from skgenome import tabio, GenomicArray as GA
from . import (
access,
antitarget,
autobin,
bintest,
call,
core,
coverage,
diagram,
fix,
parallel,
reference,
scatter,
segmentation,
segmetrics,
target,
)
from .cmdutil import read_cna
from typing import TYPE_CHECKING, Optional
[docs]
def batch_make_reference(
normal_bams: list[str],
target_bed: Optional[str],
antitarget_bed: None,
is_haploid_x_reference: bool,
diploid_parx_genome: Optional[str],
fasta: str,
annotate: Optional[str],
short_names: bool,
target_avg_size: int,
access_bed: None,
antitarget_avg_size: Optional[int],
antitarget_min_size: Optional[int],
output_reference: None,
output_dir: str,
processes: int,
by_count: bool,
method: str,
do_cluster: bool,
) -> tuple[str, str, str]:
"""Build a complete copy number reference from normal samples.
This is the high-level workflow function used by the `batch` command to
create a reference. It coordinates target/antitarget preparation, coverage
calculation across normal samples, and reference construction.
The function handles three sequencing methods: hybrid capture (default),
whole genome sequencing (WGS), and targeted amplicon sequencing.
Parameters
----------
normal_bams : list of str
Paths to BAM files from normal/control samples. If empty, creates a
"flat" reference with uniform expected coverage.
target_bed : str, optional
Path to BED file defining target/baited regions. Required for hybrid
capture and amplicon. Optional for WGS (can be auto-generated).
antitarget_bed : None
Path to pre-computed antitarget BED file. If None (typical), antitargets
are automatically generated for hybrid capture.
is_haploid_x_reference : bool
True if reference samples are male (haploid X chromosome).
diploid_parx_genome : str, optional
Reference genome name (e.g., 'hg19', 'hg38') for pseudo-autosomal region
handling on X/Y chromosomes.
fasta : str
Path to reference genome FASTA file. Required for GC/RepeatMasker
calculations and for auto-generating WGS targets.
annotate : str, optional
Path to gene annotation file (e.g., refFlat.txt) to add gene names
to targets. Recommended for WGS.
short_names : bool
Shorten target names to gene symbols when possible.
target_avg_size : int
Target average bin size in base pairs. If 0 or None, automatically
determined. For WGS, defaults to 5000 bp bins.
access_bed : None
Path to sequencing-accessible regions BED file (e.g., from `access`
command). Used for WGS to exclude low-mappability regions.
antitarget_avg_size : int, optional
Average size for antitarget bins in bp (hybrid capture only).
antitarget_min_size : int, optional
Minimum size for antitarget bins in bp (hybrid capture only).
output_reference : None
Path for output reference file. If None, writes to
"{output_dir}/reference.cnn".
output_dir : str
Directory for output files (intermediate coverage files, reference).
processes : int
Number of parallel processes for coverage calculation.
0 = use all available CPUs.
by_count : bool
Calculate coverage by read count instead of read depth.
method : str
Sequencing protocol: 'hybrid' (default), 'wgs', or 'amplicon'.
Determines target/antitarget handling and default parameters.
do_cluster : bool
Apply hierarchical clustering to identify and separate reference
sample subgroups (useful for heterogeneous normal cohorts).
Returns
-------
tuple of (str, str, str)
Paths to: (reference .cnn file, targets .bed file, antitargets .bed file)
Notes
-----
The reference creation workflow:
1. **Target preparation** (protocol-dependent):
- **Hybrid capture**: Uses provided target_bed, generates antitargets
- **WGS**: Auto-generates genome-wide bins, no antitargets
- **Amplicon**: Uses provided target_bed, no antitargets
2. **Coverage calculation** (parallel across normal samples):
- Runs `coverage` command on each normal BAM
- Creates .targetcoverage.cnn and .antitargetcoverage.cnn files
- Parallelized across samples and within each coverage calculation
3. **Reference construction**:
- Pools coverage across normal samples
- Calculates GC content, RepeatMasker, edge effects from FASTA
- Optionally clusters samples and creates sub-references
- Outputs reference.cnn file
**Flat reference**: If no normal BAMs provided, creates a uniform reference
assuming equal coverage in all bins. Less accurate but useful when no
normals are available.
**Protocol differences**:
- **hybrid**: Uses both targets and antitargets, edge correction enabled
- **wgs**: Large genome-wide bins, no antitargets, no edge correction
- **amplicon**: Uses only targets, no antitargets, no edge correction
Raises
------
ValueError
If method='wgs' or 'amplicon' but antitargets are specified.
If method='wgs' and neither targets, access, nor fasta provided.
If method='wgs' and targets != access (when both given).
See Also
--------
do_reference : Creates reference from coverage files
do_reference_flat : Creates flat reference without normal samples
batch_write_coverage : Calculates coverage for a single BAM
autobin : Automatically determines optimal bin size
Examples
--------
Hybrid capture with normal samples:
>>> ref, tgt, anti = batch_make_reference(
... normal_bams=['N1.bam', 'N2.bam', 'N3.bam'],
... target_bed='baits.bed',
... antitarget_bed=None, # Auto-generate
... is_haploid_x_reference=False,
... diploid_parx_genome='hg38',
... fasta='hg38.fa',
... annotate='refFlat.txt',
... short_names=True,
... target_avg_size=0, # Auto-determine
... access_bed=None,
... antitarget_avg_size=None,
... antitarget_min_size=None,
... output_reference=None,
... output_dir='output/',
... processes=8,
... by_count=False,
... method='hybrid',
... do_cluster=False
... )
WGS without normal samples (flat reference):
>>> ref, tgt, anti = batch_make_reference(
... normal_bams=[],
... target_bed=None, # Auto-generate from FASTA
... antitarget_bed=None,
... is_haploid_x_reference=True,
... diploid_parx_genome='hg19',
... fasta='hg19.fa',
... annotate='refFlat.txt',
... short_names=False,
... target_avg_size=5000,
... access_bed='access-5k.hg19.bed',
... antitarget_avg_size=None,
... antitarget_min_size=None,
... output_reference='flat_reference.cnn',
... output_dir='wgs_ref/',
... processes=1,
... by_count=False,
... method='wgs',
... do_cluster=False
... )
"""
if method in ("wgs", "amplicon"):
if antitarget_bed:
raise ValueError(
r"{method!r} protocol: antitargets should not be given/specified."
)
if access_bed and target_bed and access_bed != target_bed:
raise ValueError(
f"{method!r} protocol: targets and access should not be different."
)
bait_arr = None
if method == "wgs":
if not annotate:
# TODO check if target_bed has gene names
logging.warning(
"WGS protocol: recommend '--annotate' option "
"(e.g. refFlat.txt) to help locate genes "
"in output files."
)
access_arr = None
if not target_bed:
# TODO - drop weird contigs before writing, see antitargets.py
if access_bed:
target_bed = access_bed
elif fasta:
# Run 'access' on the fly
access_arr = access.do_access(fasta)
# Take filename base from FASTA, lacking any other clue
target_bed = os.path.splitext(os.path.basename(fasta))[0] + ".bed"
if output_dir:
target_bed = os.path.join(output_dir, target_bed)
tabio.write(access_arr, target_bed, "bed3")
else:
raise ValueError(
"WGS protocol: need to provide --targets, --access, or "
"--fasta options."
)
# Tweak default parameters
if not target_avg_size:
if normal_bams:
# Calculate bin size from .bai & access
if fasta and not access_arr:
# Calculate wgs depth from all
# sequencing-accessible area (it doesn't take that long
# compared to WGS coverage); user-provided access might be
# something else that excludes a significant number of
# mapped reads.
access_arr = access.do_access(fasta)
if access_arr:
autobin_args = ["wgs", None, access_arr]
else:
# Don't assume the given targets/access covers the whole
# genome; use autobin sampling to estimate bin size, as we
# do for amplicon
bait_arr = tabio.read_auto(target_bed)
autobin_args = ["amplicon", bait_arr]
# Choose median-size normal bam or tumor bam
bam_fname = autobin.midsize_file(normal_bams)
(wgs_depth, target_avg_size), _ = autobin.do_autobin(
bam_fname, *autobin_args, bp_per_bin=50000.0, fasta=fasta
)
logging.info(
"WGS average depth %.2f --> using bin size %d",
wgs_depth,
target_avg_size,
)
else:
# This bin size is OK down to 10x
target_avg_size = 5000
# To make temporary filenames for processed targets or antitargets
tgt_name_base, _tgt_ext = os.path.splitext(os.path.basename(target_bed))
if output_dir:
tgt_name_base = os.path.join(output_dir, tgt_name_base)
# Pre-process baits/targets
new_target_fname = tgt_name_base + ".target.bed"
if bait_arr is None:
bait_arr = tabio.read_auto(target_bed)
target_arr = target.do_target(
bait_arr,
annotate,
short_names,
True,
**({"avg_size": target_avg_size} if target_avg_size else {}),
)
tabio.write(target_arr, new_target_fname, "bed4")
target_bed = new_target_fname
if not antitarget_bed:
# Devise a temporary antitarget filename
antitarget_bed = tgt_name_base + ".antitarget.bed"
if method == "hybrid":
# Build antitarget BED from the given targets
anti_kwargs = {}
if access_bed:
anti_kwargs["access"] = tabio.read_auto(access_bed)
if antitarget_avg_size:
anti_kwargs["avg_bin_size"] = antitarget_avg_size
if antitarget_min_size:
anti_kwargs["min_bin_size"] = antitarget_min_size
anti_arr = antitarget.do_antitarget(target_arr, **anti_kwargs)
else:
# No antitargets for wgs, amplicon
anti_arr = GA([])
tabio.write(anti_arr, antitarget_bed, "bed4")
if len(normal_bams) == 0:
logging.info("Building a flat reference...")
ref_arr = reference.do_reference_flat(
target_bed,
antitarget_bed,
fasta,
is_haploid_x_reference,
diploid_parx_genome,
)
else:
logging.info("Building a copy number reference from normal samples...")
# Run coverage on all normals
with parallel.pick_pool(processes) as pool:
tgt_futures = []
anti_futures = []
procs_per_cnn = max(1, processes // (2 * len(normal_bams)))
for nbam in normal_bams:
sample_id = core.fbase(nbam)
sample_pfx = os.path.join(output_dir, sample_id)
tgt_futures.append(
pool.submit(
batch_write_coverage,
target_bed,
nbam,
sample_pfx + ".targetcoverage.cnn",
by_count,
procs_per_cnn,
fasta,
)
)
anti_futures.append(
pool.submit(
batch_write_coverage,
antitarget_bed,
nbam,
sample_pfx + ".antitargetcoverage.cnn",
by_count,
procs_per_cnn,
fasta,
)
)
target_fnames = [tf.result() for tf in tgt_futures]
antitarget_fnames = [af.result() for af in anti_futures]
# Build reference from *.cnn
ref_arr = reference.do_reference(
target_fnames,
antitarget_fnames,
fasta,
is_haploid_x_reference,
diploid_parx_genome,
None,
do_gc=True,
do_edge=(method == "hybrid"),
do_rmask=True,
do_cluster=do_cluster,
)
if not output_reference:
output_reference = os.path.join(output_dir, "reference.cnn")
core.ensure_path(output_reference)
tabio.write(ref_arr, output_reference)
return output_reference, target_bed, antitarget_bed
[docs]
def batch_write_coverage(
bed_fname: str,
bam_fname: str,
out_fname: str,
by_count: bool,
processes: int,
fasta: str,
) -> str:
"""Run coverage on one sample, write to file."""
cnarr = coverage.do_coverage(bed_fname, bam_fname, by_count, 0, processes, fasta)
tabio.write(cnarr, out_fname)
return out_fname
[docs]
def batch_run_sample(
bam_fname: str,
target_bed: str,
antitarget_bed: str,
ref_fname: str,
output_dir: str,
is_haploid_x_reference: bool,
diploid_parx_genome: Optional[str],
plot_scatter: bool,
plot_diagram: bool,
rscript_path: str,
by_count: bool,
skip_low: bool,
seq_method: str,
segment_method: str,
processes: int,
do_cluster: bool,
fasta: None = None,
) -> None:
"""Run the pipeline on one BAM file."""
# ENH - return probes, segments (cnarr, segarr)
logging.info("Running the CNVkit pipeline on %s ...", bam_fname)
sample_id = core.fbase(bam_fname)
sample_pfx = os.path.join(output_dir, sample_id)
raw_tgt = coverage.do_coverage(target_bed, bam_fname, by_count, 0, processes, fasta)
tabio.write(raw_tgt, sample_pfx + ".targetcoverage.cnn")
raw_anti = coverage.do_coverage(
antitarget_bed, bam_fname, by_count, 0, processes, fasta
)
tabio.write(raw_anti, sample_pfx + ".antitargetcoverage.cnn")
cnarr = fix.do_fix(
raw_tgt,
raw_anti,
read_cna(ref_fname),
diploid_parx_genome,
do_gc=True,
do_edge=(seq_method == "hybrid"),
do_rmask=True,
do_cluster=do_cluster,
)
tabio.write(cnarr, sample_pfx + ".cnr")
logging.info("Segmenting %s.cnr ...", sample_pfx)
segments = segmentation.do_segmentation(
cnarr,
segment_method,
diploid_parx_genome,
rscript_path=rscript_path,
skip_low=skip_low,
processes=processes,
**({"threshold": 1e-6} if seq_method == "wgs" else {}),
)
logging.info("Post-processing %s.cns ...", sample_pfx)
# TODO/ENH take centering shift & apply to .cnr for use in segmetrics
seg_metrics = segmetrics.do_segmetrics(
cnarr,
segments,
interval_stats=["ci"],
alpha=0.5,
smoothed=True,
skip_low=skip_low,
)
tabio.write(seg_metrics, sample_pfx + ".cns")
# Remove likely false-positive breakpoints
seg_call = call.do_call(seg_metrics, method="none", filters=["ci"])
# Calculate another segment-level test p-value
seg_alltest = segmetrics.do_segmetrics(
cnarr, seg_call, location_stats=["p_ttest"], skip_low=skip_low
)
# Finally, assign absolute copy number values to each segment
seg_alltest.center_all("median", diploid_parx_genome=diploid_parx_genome)
seg_final = call.do_call(seg_alltest, method="threshold")
tabio.write(seg_final, sample_pfx + ".call.cns")
# Test for single-bin CNVs separately
seg_bintest = bintest.do_bintest(cnarr, seg_call, target_only=True)
tabio.write(seg_bintest, sample_pfx + ".bintest.cns")
if plot_scatter:
scatter.do_scatter(cnarr, seg_final)
pyplot.savefig(sample_pfx + "-scatter.png", format="png", bbox_inches="tight")
logging.info("Wrote %s-scatter.png", sample_pfx)
if plot_diagram:
is_xx = cnarr.guess_xx(is_haploid_x_reference, diploid_parx_genome)
outfname = sample_pfx + "-diagram.pdf"
diagram.create_diagram(
cnarr.shift_xx(is_haploid_x_reference, is_xx, diploid_parx_genome),
seg_final.shift_xx(is_haploid_x_reference, is_xx, diploid_parx_genome),
0.5,
3,
outfname,
)
logging.info("Wrote %s", outfname)