Source code for skgenome.intersect

"""DataFrame-level intersection operations.

Calculate overlapping regions, similar to bedtools intersect.

The functions here operate on pandas DataFrame and Series instances, not
GenomicArray types.

"""

from __future__ import annotations
from typing import TYPE_CHECKING, Any, Optional, Union

import numpy as np
import pandas as pd

from .combiners import first_of, join_strings, make_const

if TYPE_CHECKING:
    from collections.abc import Callable
    from collections.abc import Iterator, Sequence
    from numpy import ndarray
    from pandas.core.indexes.base import Index

Numeric = Union[int, float, np.number]


[docs] def by_ranges( table: pd.DataFrame, other: pd.DataFrame, mode: str, keep_empty: bool ) -> None: """Group rows by another GenomicArray's bin coordinate ranges.""" for _chrom, bin_rows, src_rows in by_shared_chroms(other, table, keep_empty): if src_rows is not None: subranges = iter_ranges( src_rows, None, bin_rows["start"], bin_rows["end"], mode ) for bin_row, subrange in zip(bin_rows.itertuples(index=False), subranges, strict=False): yield bin_row, subrange elif keep_empty: for bin_row in bin_rows.itertuples(index=False): yield bin_row, [] # ENH: empty dframe matching table
[docs] def by_shared_chroms( table: pd.DataFrame, other: pd.DataFrame, keep_empty: bool = True ) -> Iterator[ Union[tuple[str, pd.DataFrame, None], tuple[str, pd.DataFrame, pd.DataFrame]] ]: """Group rows for both `table` and `other` by matching chromosome names.""" # When both `table` and `other` contain only one chromosome each, and it's # the same chromosome, we can just return the original tables. table_chr, other_chr = set(table["chromosome"]), set(other["chromosome"]) if len(table_chr) == 1 and table_chr == other_chr: yield table["chromosome"].iat[0], table, other # yield None, table, other else: other_chroms = {c: o for c, o in other.groupby("chromosome", sort=False)} for chrom, ctable in table.groupby("chromosome", sort=False): if chrom in other_chroms: otable = other_chroms[chrom] yield chrom, ctable, otable elif keep_empty: yield chrom, ctable, None
[docs] def into_ranges( source: pd.DataFrame, dest: pd.DataFrame, src_col: str, default: Any, summary_func: Optional[Callable], ) -> Union[pd.DataFrame, pd.Series]: """Group a column in `source` by regions in `dest` and summarize.""" if not len(source) or not len(dest): return dest if summary_func is None: # Choose a type-appropriate summary function elem = source[src_col].iat[0] if isinstance(elem, str | np.bytes_): summary_func = join_strings elif isinstance(elem, float | np.float64): summary_func = np.nanmedian else: summary_func = first_of elif not callable(summary_func): # Just fill in the given value, I suppose. summary_func = make_const(summary_func) def series2value(ser): if len(ser) == 0: return default if len(ser) == 1: return ser.iat[0] return summary_func(ser) column = source[src_col] result = [ series2value(column[slc]) for slc in iter_slices(source, dest, "outer", True) ] return pd.Series(result)
[docs] def iter_ranges( table: pd.DataFrame, chrom: Optional[str], starts: Optional[Sequence[Numeric]], ends: Optional[Sequence[Numeric]], mode: str, ) -> Iterator[pd.DataFrame]: """Iterate through sub-ranges.""" assert mode in ("inner", "outer", "trim") # Optional if we've already subsetted by chromosome (not checked!) if chrom: assert isinstance(chrom, str) # ENH: accept array? try: table = table[table["chromosome"] == chrom] except KeyError as exc: raise KeyError(f"Chromosome {chrom} is not in this probe set") from exc for region_idx, start_val, end_val in idx_ranges( table, starts, ends, "inner" if mode == "inner" else "outer" ): subtable = table.iloc[region_idx] if mode == "trim": subtable = subtable.copy() # Update 5' endpoints to the boundary if start_val: subtable.start = subtable.start.clip(lower=start_val) # Update 3' endpoints to the boundary if end_val: subtable.end = subtable.end.clip(upper=end_val) yield subtable
[docs] def iter_slices( table: pd.DataFrame, other: pd.DataFrame, mode: str, keep_empty: bool ) -> Iterator[Union[ndarray, Index]]: """Yields indices to extract ranges from `table`. Returns an iterable of integer arrays that can apply to Series objects, i.e. columns of `table`. These indices are of the DataFrame/Series' Index, not array coordinates -- so be sure to use DataFrame.loc, Series.loc, or Series getitem, as opposed to .iloc or indexing directly into Numpy arrays. """ for _c, bin_rows, src_rows in by_shared_chroms(other, table, keep_empty): if src_rows is None: # Emit empty indices since 'table' is missing this chromosome for _ in range(len(bin_rows)): yield pd.Index([], dtype="int64") else: for slc, _s, _e in idx_ranges(src_rows, bin_rows.start, bin_rows.end, mode): indices = src_rows.index[slc].to_numpy() if keep_empty or len(indices): yield indices
[docs] def idx_ranges( table: pd.DataFrame, starts: Union[list[int], pd.Series], ends: Union[list[int], pd.Series], mode: str, ) -> None: """Iterate through sub-ranges.""" assert mode in ("inner", "outer") # Edge cases: when the `table` is either empty, or both `starts` and `ends` # are None, we want to signal the calling function to use the entire table. # To do this, we return slice(None), which, when passed to either .loc or # .iloc, will do just this. We cannot pass table.index to accomplish this # because it will not work with .iloc if the table is already subset by # chromosome. if not len(table) or (starts is None and ends is None): yield slice(None), None, None else: # Don't be fooled by nested bins if ( (ends is not None and len(ends)) and (starts is not None and len(starts)) ) and not table.end.is_monotonic_increasing: # At least one bin is fully nested -- account for it irange_func = _irange_nested else: irange_func = _irange_simple yield from irange_func(table, starts, ends, mode)
def _irange_simple( table: pd.DataFrame, starts: pd.Series, ends: pd.Series, mode: str ) -> Iterator[tuple[slice, int, int]]: """Slice subsets of table when regions are not nested.""" if starts is not None and len(starts): if mode == "inner": # Only rows entirely after the start point start_idxs = table.start.searchsorted(starts) else: # Include all rows overlapping the start point start_idxs = table.end.searchsorted(starts, "right") else: starts = np.zeros(len(ends) if ends is not None else 1, dtype=np.int_) start_idxs = starts.copy() if ends is not None and len(ends): if mode == "inner": end_idxs = table.end.searchsorted(ends, "right") else: end_idxs = table.start.searchsorted(ends) else: end_idxs = np.repeat(len(table), len(starts)) ends = [None] * len(starts) for start_idx, start_val, end_idx, end_val in zip( start_idxs, starts, end_idxs, ends, strict=False ): yield (slice(start_idx, end_idx), start_val, end_val) def _irange_nested( table: pd.DataFrame, starts: Sequence, ends: Sequence, mode: str ) -> Iterator[tuple[ndarray, int, int]]: """Slice subsets of table when regions are nested.""" # ENH: Binary Interval Search (BITS) or Layer&Quinlan(2015) assert len(starts) == len(ends) > 0 for start_val, end_val in zip(starts, ends, strict=False): # Mask of table rows to keep for this query region region_mask = np.ones(len(table), dtype=np.bool_) if start_val: if mode == "inner": # Only rows entirely after the start point start_idx = table.start.searchsorted(start_val) region_mask[: int(start_idx)] = 0 else: # Include all rows overlapping the start point region_mask = table.end.to_numpy() > start_val if end_val is not None: if mode == "inner": # Only rows up to the end point region_mask &= table.end.to_numpy() <= end_val else: # Include all rows overlapping the end point end_idx = table.start.searchsorted(end_val) region_mask[int(end_idx) :] = 0 yield region_mask, start_val, end_val
[docs] def venn(table, other, mode: str): # TODO -- implement 'venn' via fjoin algorithm # 'cut' table at all 'other' boundaries # -> extra column '_venn_':int (0, 1, 2) # 0=self only, 1=both, 2=other only # -> 'cut' just drops the '_venn_' column # -> 'subtract' drops 1 and 2? # (is that faster? probably not) # -> 'jaccard' does math with it... return table