Source code for cnvlib.target
"""Transform bait intervals into targets more suitable for CNVkit."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Optional, Union
from skgenome import tabio
from . import antitarget
if TYPE_CHECKING:
from collections.abc import Iterator
from pandas.core.series import Series
from skgenome.gary import GenomicArray
[docs]
def do_target(
bait_arr: GenomicArray,
annotate: Optional[str] = None,
do_short_names: bool = False,
do_split: bool = False,
avg_size: Union[int, float] = 200 / 0.75,
) -> GenomicArray:
"""Transform bait intervals into targets more suitable for CNVkit.
Parameters
----------
bait_arr : GenomicArray
Bait intervals from a BED or interval file.
annotate : str, optional
Path to annotation file (BED, GFF, refFlat, etc.) to assign gene names
to target regions.
do_short_names : bool, optional
Reduce multi-accession bait labels to be short and consistent.
Default is False.
do_split : bool, optional
Split large target intervals into smaller pieces. Default is False.
avg_size : float, optional
Average target size when splitting large intervals.
Default is 200/0.75 (~267 bp).
Returns
-------
GenomicArray
Processed target intervals ready for CNVkit analysis.
"""
tgt_arr = bait_arr.copy()
# Drop zero-width regions
tgt_arr = tgt_arr[tgt_arr.start != tgt_arr.end]
if do_split:
logging.info("Splitting large targets")
tgt_arr = tgt_arr.subdivide(avg_size, 0)
if annotate:
logging.info("Applying annotations as target names")
annotation = tabio.read_auto(annotate)
antitarget.compare_chrom_names(tgt_arr, annotation)
tgt_arr["gene"] = annotation.into_ranges(tgt_arr, "gene", "-")
if do_short_names:
logging.info("Shortening target interval labels")
tgt_arr["gene"] = list(shorten_labels(tgt_arr["gene"]))
return tgt_arr
[docs]
def shorten_labels(gene_labels: Series) -> Iterator[str]:
"""Reduce multi-accession interval labels to the minimum consistent.
So: BED or interval_list files have a label for every region. We want this
to be a short, unique string, like the gene name. But if an interval list is
instead a series of accessions, including additional accessions for
sub-regions of the gene, we can extract a single accession that covers the
maximum number of consecutive regions that share this accession.
e.g.::
...
mRNA|JX093079,ens|ENST00000342066,mRNA|JX093077,ref|SAMD11,mRNA|AF161376,mRNA|JX093104
ens|ENST00000483767,mRNA|AF161376,ccds|CCDS3.1,ref|NOC2L
...
becomes::
...
mRNA|AF161376
mRNA|AF161376
...
"""
longest_name_len = 0
curr_names = set()
curr_gene_count = 0
for label in gene_labels:
next_names = set(label.rstrip().split(","))
assert len(next_names)
overlap = curr_names.intersection(next_names)
if overlap:
# Continuing the same gene; update shared accessions
curr_names = filter_names(overlap)
curr_gene_count += 1
else:
# End of the old gene -- emit shared name(s)
for _i in range(curr_gene_count):
out_name = shortest_name(curr_names)
yield out_name
longest_name_len = max(longest_name_len, len(out_name))
# Start of a new gene
curr_gene_count = 1
curr_names = next_names
# Final emission
for _i in range(curr_gene_count):
out_name = shortest_name(curr_names)
yield out_name
longest_name_len = max(longest_name_len, len(out_name))
logging.info("Longest name length: %d", longest_name_len)
[docs]
def filter_names(names: set[str], exclude: tuple[str] = ("mRNA",)) -> set[str]:
"""Remove less-meaningful accessions from the given set."""
if len(names) > 1:
ok_names = set(n for n in names if not any(n.startswith(ex) for ex in exclude))
if ok_names:
return ok_names
# Names are not filter-worthy; leave them as they are for now
return names
[docs]
def shortest_name(names: set[str]) -> str:
"""Return the shortest trimmed name from the given set."""
name = min(filter_names(names), key=len)
if len(name) > 2 and "|" in name[1:-1]:
# Split 'DB|accession' and extract the accession sans-DB
name = name.split("|")[-1]
return name