Source code for cnvlib.scatter

"""Command-line interface and corresponding API for CNVkit."""
# NB: argparse CLI definitions and API functions are interwoven:
#   "_cmd_*" handles I/O and arguments processing for the command
#   "do_*" runs the command's functionality as an API
from __future__ import absolute_import, division, print_function

import collections
import logging

import numpy as np
from matplotlib import pyplot

from . import core, plots, smoothing
from .plots import MB
from .cnary import CopyNumArray as CNA
from .genome.rangelabel import unpack_range

HIGHLIGHT_COLOR = 'gold'
POINT_COLOR = '#606060'
SEG_COLOR = 'darkorange'
TREND_COLOR = '#C0C0C0'


[docs]def do_scatter(cnarr, segments=None, variants=None, show_range=None, show_gene=None, background_marker=None, do_trend=False, window_width=1e6, y_min=None, y_max=None, title=None, segment_color=SEG_COLOR): """Plot probe log2 coverages and segmentation calls together.""" if not show_gene and not show_range: genome_scatter(cnarr, segments, variants, do_trend, y_min, y_max, title, segment_color) else: chromosome_scatter(cnarr, segments, variants, show_range, show_gene, background_marker, do_trend, window_width, y_min, y_max, title, segment_color)
# === Genome-level scatter plots === def genome_scatter(cnarr, segments=None, variants=None, do_trend=False, y_min=None, y_max=None, title=None, segment_color=SEG_COLOR): """Plot all chromosomes, concatenated on one plot.""" if (cnarr or segments) and variants: # Lay out top 3/5 for the CN scatter, bottom 2/5 for SNP plot axgrid = pyplot.GridSpec(5, 1, hspace=.85) axis = pyplot.subplot(axgrid[:3]) axis2 = pyplot.subplot(axgrid[3:], sharex=axis) # Place chromosome labels between the CNR and SNP plots axis2.tick_params(labelbottom=False) chrom_sizes = plots.chromosome_sizes(cnarr or segments) snv_on_genome(axis2, variants, chrom_sizes, segments, do_trend) else: _fig, axis = pyplot.subplots() if title is None: title = (cnarr or segments or variants).sample_id if cnarr or segments: axis.set_title(title) cnv_on_genome(axis, cnarr, segments, do_trend, y_min, y_max, segment_color=segment_color) else: axis.set_title("Variant allele frequencies: %s" % title) chrom_sizes = collections.OrderedDict( (chrom, subarr["end"].max()) for chrom, subarr in variants.by_chromosome()) snv_on_genome(axis, variants, chrom_sizes, segments, do_trend) def cnv_on_genome(axis, probes, segments, do_trend=False, y_min=None, y_max=None, segment_color=SEG_COLOR): """Plot coverages and CBS calls for all chromosomes on one plot.""" # Group probes by chromosome (to calculate plotting coordinates) if probes: chrom_probe_centers = {chrom: 0.5 * (rows['start'] + rows['end']) for chrom, rows in probes.by_chromosome()} chrom_sizes = plots.chromosome_sizes(probes) else: chrom_sizes = plots.chromosome_sizes(segments) # Same for segment calls chrom_seg_coords = {chrom: list(zip(rows['log2'], rows['start'], rows['end'])) for chrom, rows in segments.by_chromosome() } if segments else {} x_starts = plots.plot_x_dividers(axis, chrom_sizes) x = [] seg_lines = [] # y-val, x-start, x-end for chrom, curr_offset in list(x_starts.items()): if probes: x.extend(chrom_probe_centers[chrom] + curr_offset) if chrom in chrom_seg_coords: seg_lines.extend((c[0], c[1] + curr_offset, c[2] + curr_offset) for c in chrom_seg_coords[chrom]) # Configure axes etc. axis.axhline(color='k') axis.set_ylabel("Copy ratio (log2)") if not (y_min and y_max): if segments: # Auto-scale y-axis according to segment mean-coverage values seg_auto_vals = segments[(segments.chromosome != 'chr6') & (segments.chromosome != 'chrY')]['log2'] if not y_min: y_min = min(seg_auto_vals.min() - .2, -1.5) if not y_max: y_max = max(seg_auto_vals.max() + .2, 1.5) else: if not y_min: y_min = -2.5 if not y_max: y_max = 2.5 axis.set_ylim(y_min, y_max) # Plot points if probes: axis.scatter(x, probes['log2'], color=POINT_COLOR, edgecolor='none', alpha=0.2, marker='.') # Add a local trend line if do_trend: axis.plot(x, _smooth_genome_log2(probes, smoothing.smoothed, 150), color=POINT_COLOR, linewidth=2, zorder=-1) # Plot segments for seg_line in seg_lines: y1, x1, x2 = seg_line axis.plot((x1, x2), (y1, y1), color=segment_color, linewidth=3, solid_capstyle='round') def _smooth_genome_log2(cnarr, smooth_func, width): """Fit a trendline through bin log2 ratios, handling chromosome boundaries. Returns ------- np.array Smoothed log2 values, calculated with `smooth_func` and `width`, equal in length to `cnarr`. """ # ENH: also split by centromeres (long internal gaps -- see PSCBS) # ENH: use pandas groupby out = [smooth_func(subcna['log2'], width) for _chrom, subcna in cnarr.by_chromosome()] return np.concatenate(out) def snv_on_genome(axis, variants, chrom_sizes, segments, do_trend): """Plot a scatter-plot of SNP chromosomal positions and shifts.""" axis.set_ylim(0.0, 1.0) axis.set_ylabel("VAF") x_starts = plots.plot_x_dividers(axis, chrom_sizes) # Calculate the coordinates of plot components chrom_snvs = dict(variants.by_chromosome()) x_posns_chrom = {} y_posns_chrom = {} trends = [] for chrom, curr_offset in x_starts.items(): snvs = chrom_snvs.get(chrom, []) if not len(snvs): x_posns_chrom[chrom] = [] y_posns_chrom[chrom] = [] continue posns = snvs['start'].values x_posns = posns + curr_offset vafs = snvs['alt_freq'].values x_posns_chrom[chrom] = x_posns y_posns_chrom[chrom] = vafs # Trend bars: always calculated, only shown on request if segments: # Draw average VAF within each segment for v_start, v_end, v_freq in group_snvs_by_segments(posns, vafs, segments, chrom): trends.append((v_start + curr_offset, v_end + curr_offset, v_freq)) else: # Draw chromosome-wide average VAF for mask_vaf in ((vafs > 0.5), (vafs <= 0.5)): if sum(mask_vaf) > 1: these_posns = x_posns[mask_vaf] trends.append((these_posns[0], these_posns[-1], np.median(vafs[mask_vaf]))) # Test for significant shifts in VAF # ENH - use segments if provided # if significant, colorize those points / that median line sig_chroms = [] # test_loh(partition_by_chrom(chrom_snvs)) # Render significantly shifted heterozygous regions separately x_posns = [] y_posns = [] x_posns_sig = [] y_posns_sig = [] for chrom in chrom_sizes: posns = x_posns_chrom[chrom] vafs = y_posns_chrom[chrom] if chrom in sig_chroms: x_posns_sig.extend(posns) y_posns_sig.extend(vafs) else: x_posns.extend(posns) y_posns.extend(vafs) # Plot the points axis.scatter(x_posns, y_posns, color=POINT_COLOR, edgecolor='none', alpha=0.2, marker='.') axis.scatter(x_posns_sig, y_posns_sig, color='salmon', edgecolor='none', alpha=0.3) # Add trend lines to each chromosome if do_trend or segments: # Draw a line across each chromosome at the median shift level for x_start, x_end, y_trend in trends: # ENH: color by segment gain/loss axis.plot([x_start, x_end], [y_trend, y_trend], color=TREND_COLOR, linewidth=2, zorder=-1, solid_capstyle='round') # === Chromosome-level scatter plots === def chromosome_scatter(cnarr, segments, variants, show_range, show_gene, background_marker, do_trend, window_width, y_min, y_max, title, segment_color): """Plot a specified region on one chromosome. Possibilities:: Options | Shown ------------ | -------- -c | -g | Genes | Region ------- | -- | ----- | ------ - | + | given | auto: gene(s) + margin chr | - | none | whole chrom chr | + | given | whole chrom chr:s-e | - | all | given chr:s-e | + | given | given """ chrom, start, end = unpack_range(show_range) window_coords = () genes = [] if show_gene: gene_names = show_gene.split(',') # Scan for probes matching the specified gene gene_coords = plots.gene_coords_by_name(cnarr or segments, gene_names) if len(gene_coords) != 1: raise ValueError("Genes %s are split across chromosomes %s" % (show_gene, list(gene_coords.keys()))) g_chrom, genes = gene_coords.popitem() if chrom: # Confirm that the selected chromosomes match core.assert_equal("Chromosome also selected by region (-c) " "does not match", **{"chromosome": chrom, "gene(s)": g_chrom}) else: chrom = g_chrom # Set the display window to the selected genes +/- a margin genes.sort() window_coords = (max(0, genes[0][0] - window_width), genes[-1][1] + window_width) if start is not None or end is not None: # Default selection endpoint to the maximum chromosome position if not end: end = (cnarr or segments or variants ).filter(chromosome=chrom).end.iat[-1] if window_coords: # Genes were specified, & window was set around them if start > window_coords[0] or end < window_coords[1]: raise ValueError("Selected gene region " + chrom + (":%d-%d" % window_coords) + " is outside specified region " + show_range) window_coords = (max(0, start - window_width), end + window_width) if cnarr and not genes: genes = plots.gene_coords_by_range(cnarr, chrom, start, end)[chrom] if not genes and window_width > (end - start) / 10.0: # No genes in the selected region, so highlight the region # itself (unless the selection is ~entire displayed window) logging.info("No genes found in selection; will show the " "selected region itself instead") genes = [(start, end, "Selection")] elif show_range and window_coords: # Specified range is only chrom, no start-end # Reset window around selected genes to show the whole chromosome window_coords = () # Prune plotted elements to the selected region sel_probes = (cnarr.in_range(chrom, *window_coords) if cnarr else CNA([])) sel_seg = (segments.in_range(chrom, *window_coords, mode='trim') if segments else CNA([])) sel_snvs = (variants.in_range(chrom, *window_coords) if variants else None) logging.info("Showing %d probes and %d selected genes in region %s", len(sel_probes), len(genes), (chrom + ":%d-%d" % window_coords if window_coords else chrom)) # Create plots if cnarr or segments: # Plot CNVs at chromosome level if variants: # Lay out top 3/5 for the CN scatter, bottom 2/5 for SNP plot axgrid = pyplot.GridSpec(5, 1, hspace=.5) axis = pyplot.subplot(axgrid[:3]) axis2 = pyplot.subplot(axgrid[3:], sharex=axis) # Plot allele freqs for only the selected region snv_on_chromosome(axis2, sel_snvs, sel_seg, genes, do_trend) else: _fig, axis = pyplot.subplots() axis.set_xlabel("Position (Mb)") cnv_on_chromosome(axis, sel_probes, sel_seg, genes, background_marker=background_marker, do_trend=do_trend, x_limits=window_coords, y_min=y_min, y_max=y_max, segment_color=segment_color) elif variants: # Only plot SNVs in a single-panel layout _fig, axis = pyplot.subplots() snv_on_chromosome(axis, sel_snvs, sel_seg, genes, do_trend) if title is None: title = "%s %s" % ((cnarr or segments or variants).sample_id, chrom) axis.set_title(title) def cnv_on_chromosome(axis, probes, segments, genes, background_marker=None, do_trend=False, x_limits=None, y_min=None, y_max=None, segment_color=SEG_COLOR): """Draw a scatter plot of probe values with CBS calls overlaid. Parameters ---------- genes : list Of tuples: (start, end, gene name) """ # Get scatter plot coordinates x = 0.5 * (probes['start'] + probes['end']) * MB # bin midpoints y = probes['log2'] if 'weight' in probes: w = 46 * probes['weight'] ** 2 + 2 else: w = np.repeat(30, len(x)) is_bg = (probes['gene'] == 'Background') # Configure axes # TODO - use segment y-values if probes not given if not y_min: y_min = max(-5.0, min(y.min() - .1, -.3)) if len(y) else -1.1 if not y_max: y_max = max(.3, y.max() + (.25 if genes else .1)) if len(y) else 1.1 if x_limits: x_min, x_max = x_limits axis.set_xlim(x_min * MB, x_max * MB) else: set_xlim_from(axis, probes, segments) setup_chromosome(axis, y_min, y_max, "Copy ratio (log2)") if genes: # Rotate text in proportion to gene density ngenes = len(genes) text_size = ('small' if ngenes <= 6 else 'x-small') if ngenes <= 3: text_rot = 'horizontal' elif ngenes <= 6: text_rot = 30 elif ngenes <= 10: text_rot = 45 elif ngenes <= 20: text_rot = 60 else: text_rot = 'vertical' for gene in genes: gene_start, gene_end, gene_name = gene # Highlight and label gene region # (rescale positions from bases to megabases) axis.axvspan(gene_start * MB, gene_end * MB, alpha=0.5, color=HIGHLIGHT_COLOR, zorder=-1) axis.text(0.5 * (gene_start + gene_end) * MB, min(2.4, y.max() + .1) if len(y) else .1, gene_name, horizontalalignment='center', rotation=text_rot, size=text_size) # size='small') if background_marker in (None, 'o'): # Plot targets and antitargets with the same marker axis.scatter(x, y, w, color=POINT_COLOR, alpha=0.4, marker='o') else: # Use the given marker to plot antitargets separately x_fg = [] y_fg = [] w_fg = [] x_bg = [] y_bg = [] # w_bg = [] for x_pt, y_pt, w_pt, is_bg_pt in zip(x, y, w, is_bg): if is_bg_pt: x_bg.append(x_pt) y_bg.append(y_pt) # w_bg.append(w_pt) else: x_fg.append(x_pt) y_fg.append(y_pt) w_fg.append(w_pt) axis.scatter(x_fg, y_fg, w_fg, color=POINT_COLOR, alpha=0.4, marker='o') axis.scatter(x_bg, y_bg, color=POINT_COLOR, alpha=0.5, marker=background_marker) # Add a local trend line if do_trend: axis.plot(x, smoothing.smoothed(y, 50), color=POINT_COLOR, linewidth=2, zorder=-1) # Get coordinates for CBS lines & draw them if segments: for row in segments: axis.plot((row.start * MB, row.end * MB), (row.log2, row.log2), color=segment_color, linewidth=4, solid_capstyle='round') def snv_on_chromosome(axis, variants, segments, genes, do_trend): # TODO set x-limits if not already done for probes/segments # set_xlim_from(axis, None, segments, variants) # setup_chromosome(axis, 0.0, 1.0, "VAF") axis.set_ylim(0.0, 1.0) axis.set_ylabel("VAF") axis.set_xlabel("Position (Mb)") axis.get_yaxis().tick_left() axis.get_xaxis().tick_top() axis.tick_params(which='both', direction='out', labelbottom=False, labeltop=False) x_mb = variants['start'] * MB y = variants['alt_freq'].values axis.scatter(x_mb, y, color=POINT_COLOR, alpha=0.3) # TODO - highlight genes/selection if segments: # Draw average VAF within each segment # TODO coordinate this w/ do_trend posns = variants['start'].values # * MB for v_start, v_end, v_freq in group_snvs_by_segments(posns, y, segments): # ENH: color by segment gain/loss axis.plot([v_start * MB, v_end * MB], [v_freq, v_freq], color='#C0C0C0', linewidth=2, #zorder=1, solid_capstyle='round') def set_xlim_from(axis, probes=None, segments=None, variants=None): """Configure axes for plotting a single chromosome's data. Parameters ---------- probes : CopyNumArray segments : CopyNumArray variants : VariantArray All should already be subsetted to the region that will be plotted. """ min_x = np.inf max_x = 0 for arr in (probes, segments, variants): if arr and len(arr): max_x = max(max_x, arr.end.iat[-1]) min_x = min(min_x, arr.start.iat[0]) if max_x <= min_x: if min_x != np.inf: logging.warn("*WARNING* selection start %s > end %s; did you " "correctly sort the input file by genomic location?", min_x, max_x) raise ValueError("No usable data points to plot out of " "%d probes, %d segments, %d variants" % (len(probes) if probes else 0, len(segments) if segments else 0, len(variants) if variants else 0)) axis.set_xlim(min_x * MB, max_x * MB) def setup_chromosome(axis, y_min=None, y_max=None, y_label=None): """Configure axes for plotting a single chromosome's data.""" if y_min and y_max: axis.set_ylim(y_min, y_max) if y_min < 0 < y_max: axis.axhline(color='k') if y_label: axis.set_ylabel(y_label) axis.tick_params(which='both', direction='out') axis.get_xaxis().tick_bottom() axis.get_yaxis().tick_left() # === Shared === # XXX use by_ranges def group_snvs_by_segments(snv_posns, snv_freqs, segments, chrom=None): """Group SNP allele frequencies by segment. Yields ------ tuple (start, end, value) """ if chrom: segments = segments.filter(chromosome=chrom) seg_starts = segments.start # Assign a segment number to each variant, basically indices = np.maximum(seg_starts.searchsorted(snv_posns), 1) - 1 for i in sorted(set(indices)): mask_in_seg = (indices == i) freqs = snv_freqs[mask_in_seg] posns = snv_posns[mask_in_seg] # Separately emit VAFs above and below .5 for plotting mask_above_mid = (freqs > 0.5) for mask_vaf in (mask_above_mid, ~mask_above_mid): if sum(mask_vaf) > 1: these_posns = posns[mask_vaf] yield (these_posns[0], these_posns[-1], np.median(freqs[mask_vaf]))