Source code for cnvlib.access

#!/usr/bin/env python
"""List the locations of accessible sequence regions in a FASTA file.

Inaccessible regions, e.g. telomeres and centromeres, are masked out with N in
the reference genome sequence; this script scans those to identify the
coordinates of the accessible regions (those between the long spans of N's).
"""
import logging

import numpy as np
from skgenome import tabio, GenomicArray as GA


[docs]def do_access(fa_fname, exclude_fnames=(), min_gap_size=5000, skip_noncanonical=True): """List the locations of accessible sequence regions in a FASTA file.""" fa_regions = get_regions(fa_fname) if skip_noncanonical: fa_regions = drop_noncanonical_contigs(fa_regions) access_regions = GA.from_rows(fa_regions) for ex_fname in exclude_fnames: excluded = tabio.read(ex_fname, 'bed3') access_regions = access_regions.subtract(excluded) return GA.from_rows(join_regions(access_regions, min_gap_size))
[docs]def drop_noncanonical_contigs(region_tups): """Drop contigs with noncanonical names. `region_tups` is an iterable of (chrom, start, end) tuples. Yield the same, but dropping noncanonical chrom. """ from .antitarget import is_canonical_contig_name return (tup for tup in region_tups if is_canonical_contig_name(tup[0]))
[docs]def get_regions(fasta_fname): """Find accessible sequence regions (those not masked out with 'N').""" with open(fasta_fname) as infile: chrom = cursor = run_start = None for line in infile: if line.startswith('>'): # Emit the last chromosome's last run, if any if run_start is not None: yield log_this(chrom, run_start, cursor) # Start new chromosome chrom = line.split(None, 1)[0][1:] run_start = None cursor = 0 logging.info("%s: Scanning for accessible regions", chrom) else: line = line.rstrip() if 'N' in line: if all(c == 'N' for c in line): # Shortcut if the line is all N chars if run_start is not None: yield log_this(chrom, run_start, cursor) run_start = None else: # Slow route: line is a mix of N and non-N chars line_chars = np.array(line, dtype='c') n_indices = np.where(line_chars == b'N')[0] # Emit the first block of non-N chars, if any if run_start is not None: yield log_this(chrom, run_start, cursor + n_indices[0]) elif n_indices[0] != 0: yield log_this(chrom, cursor, cursor + n_indices[0]) # Emit any short intermediate blocks gap_mask = np.diff(n_indices) > 1 if gap_mask.any(): ok_starts = n_indices[:-1][gap_mask] + 1 + cursor ok_ends = n_indices[1:][gap_mask] + cursor for start, end in zip(ok_starts, ok_ends): yield log_this(chrom, start, end) # Account for any tailing non-N chars if n_indices[-1] + 1 < len(line_chars): run_start = cursor + n_indices[-1] + 1 else: run_start = None else: if run_start is None: # Start of a new run of non-N characters run_start = cursor cursor += len(line) # Emit the last run if it's accessible (i.e. not a telomere) if run_start is not None: yield log_this(chrom, run_start, cursor)
[docs]def log_this(chrom, run_start, run_end): """Log a coordinate range, then return it as a tuple.""" logging.info("\tAccessible region %s:%d-%d (size %d)", chrom, run_start, run_end, run_end - run_start) return (chrom, run_start, run_end)
[docs]def join_regions(regions, min_gap_size): """Filter regions, joining those separated by small gaps.""" min_gap_size = min_gap_size or 0 for chrom, rows in regions.by_chromosome(): logging.info("%s: Joining over small gaps", chrom) coords = iter(zip(rows['start'], rows['end'])) prev_start, prev_end = next(coords) for start, end in coords: gap = start - prev_end assert gap > 0, ("Impossible gap between %s %d-%d and %d-%d (=%d)" % (chrom, prev_start, prev_end, start, end, gap)) if gap < min_gap_size: # Join with the previous region logging.info("\tJoining %s %d-%d and %d-%d (gap size %d)", chrom, prev_start, prev_end, start, end, gap) prev_end = end else: # Keep the gap; emit the previous region as-is logging.info("\tKeeping gap %s:%d-%d (size %d)", chrom, prev_end, start, gap) yield (chrom, prev_start, prev_end) prev_start, prev_end = start, end yield (chrom, prev_start, prev_end)