Source code for skgenome.tabio

"""I/O for tabular formats of genomic data (regions or features)."""

from __future__ import annotations
import collections
import contextlib
import logging
import os
import re
import sys
from typing import TYPE_CHECKING, Optional, Union

import pandas as pd
from Bio.File import as_handle

from ..gary import GenomicArray as GA
from . import (
    bedio,
    genepred,
    gff,
    picard,
    seg,
    seqdict,
    tab,
    textcoord,
    vcfio,
    vcfsimple,
)

if TYPE_CHECKING:
    from collections.abc import Iterator
    from cnvlib.cnary import CopyNumArray
    from cnvlib.vary import VariantArray
    from io import TextIOWrapper
    from tempfile import _TemporaryFileWrapper


[docs] def read( infile: str, fmt: str = "tab", into: Optional[Union[type[GA], type[CopyNumArray]]] = None, sample_id: Optional[str] = None, meta: None = None, **kwargs, ) -> Union[CopyNumArray, VariantArray, GA]: """Read tabular data from a file or stream into a genome object. Supported formats: see `READERS` If a format supports multiple samples, return the sample specified by `sample_id`, or if unspecified, return the first sample and warn if there were other samples present in the file. Parameters ---------- infile : handle or string Filename or opened file-like object to read. fmt : string File format. into : class GenomicArray class or subclass to instantiate, overriding the default for the target file format. sample_id : string Sample identifier. meta : dict Metadata, as arbitrary key-value pairs. **kwargs : Additional keyword arguments to the format-specific reader function. Returns ------- GenomicArray or subclass The data from the given file instantiated as `into`, if specified, or the default base class for the given file format (usually GenomicArray). """ from cnvlib.core import fbase if fmt == "auto": return read_auto(infile) if fmt in READERS: reader, suggest_into = READERS[fmt] else: raise ValueError(f"Unknown format: {fmt}") if meta is None: meta = {} if "sample_id" not in meta: if sample_id: meta["sample_id"] = sample_id else: fname = get_filename(infile) if fname: meta["sample_id"] = fbase(fname) if "filename" not in meta: fname = get_filename(infile) if fname: meta["filename"] = infile if fmt in ("seg", "vcf") and sample_id is not None: # Multi-sample formats: choose one sample kwargs["sample_id"] = sample_id try: dframe = reader(infile, **kwargs) except pd.errors.EmptyDataError: # File is blank/empty, most likely logging.info("Blank %s file?: %s", fmt, infile) dframe = [] if fmt == "vcf": from cnvlib.vary import VariantArray as VA suggest_into = VA result = (into or suggest_into)(dframe, meta) result.sort_columns() result.sort() return result
# ENH CategoricalIndex --- # if dframe: # dframe['chromosome'] = pd.Categorical(dframe['chromosome'], # dframe.chromosome.drop_duplicates(), # ordered=True) # Create a multi-index of genomic coordinates (like GRanges) # dframe.set_index(['chromosome', 'start'], inplace=True)
[docs] def read_auto(infile: str) -> GA: """Auto-detect a file's format and use an appropriate parser to read it.""" if not isinstance(infile, str) and not hasattr(infile, "seek"): raise ValueError( "Can only auto-detect format from filename or " + f"seekable (local, on-disk) files, which {infile} is not" ) fmt = sniff_region_format(infile) if hasattr(infile, "seek"): infile.seek(0) if fmt: logging.info("Detected file format: %s", fmt) else: # File is blank -- simple BED will handle this OK fmt = "bed3" return read(infile, fmt or "tab")
READERS = { # Format name, formatter, default target class "auto": (read_auto, GA), "bed": (bedio.read_bed, GA), "bed3": (bedio.read_bed3, GA), "bed4": (bedio.read_bed4, GA), "bed6": (bedio.read_bed6, GA), "dict": (seqdict.read_dict, GA), "gff": (gff.read_gff, GA), "interval": (picard.read_interval, GA), "genepred": (genepred.read_genepred, GA), "genepredext": (genepred.read_genepred_ext, GA), "refflat": (genepred.read_refflat, GA), "refgene": (genepred.read_refgene, GA), "picardhs": (picard.read_picard_hs, GA), "seg": (seg.read_seg, GA), "tab": (tab.read_tab, GA), "text": (textcoord.read_text, GA), "vcf": (vcfio.read_vcf, GA), "vcf-simple": (vcfsimple.read_vcf_simple, GA), "vcf-sites": (vcfsimple.read_vcf_sites, GA), } # _____________________________________________________________________
[docs] def write( garr: Union[CopyNumArray, GA], outfile: Optional[Union[_TemporaryFileWrapper, str]] = None, fmt: str = "tab", verbose: bool = True, **kwargs, ) -> None: """Write a genome object to a file or stream.""" formatter, show_header = WRITERS[fmt] if fmt in ("seg", "vcf"): kwargs["sample_id"] = garr.sample_id dframe = formatter(garr.data, **kwargs) with safe_write(outfile or sys.stdout, verbose=False) as handle: dframe.to_csv( handle, header=show_header, index=False, sep="\t", float_format="%.6g" ) if verbose: # Log the output path, if possible outfname = get_filename(outfile) if outfname: logging.info("Wrote %s with %d regions", outfname, len(dframe))
WRITERS = { # Format name, formatter, show header "bed": (bedio.write_bed, False), "bed3": (bedio.write_bed3, False), "bed4": (bedio.write_bed4, False), # "gff": (gff.write_gff, False), "interval": (picard.write_interval, False), "picardhs": (picard.write_picard_hs, True), "seg": (seg.write_seg, True), "tab": (tab.write_tab, True), "text": (textcoord.write_text, False), "vcf": (vcfio.write_vcf, True), } # _____________________________________________________________________
[docs] @contextlib.contextmanager def safe_write( outfile: Union[_TemporaryFileWrapper, str], verbose: bool = True ) -> Iterator[Union[TextIOWrapper, _TemporaryFileWrapper]]: """Write to a filename or file-like object with error handling. If given a file name, open it. If the path includes directories that don't exist yet, create them. If given a file-like object, just pass it through. """ if isinstance(outfile, str): dirname = os.path.dirname(outfile) if dirname and not os.path.isdir(dirname): os.mkdir(dirname) logging.info("Created directory %s", dirname) with open(outfile, "w") as handle: yield handle else: yield outfile # Log the output path, if possible (but don't contaminate stdout) if verbose: outfname = get_filename(outfile) if outfname: logging.info("Wrote %s", outfname)
[docs] def get_filename(infile: Union[_TemporaryFileWrapper, str]) -> str: if isinstance(infile, str): return infile if hasattr(infile, "name") and infile not in (sys.stdout, sys.stderr): # File(-like) handle return infile.name
[docs] def sniff_region_format(infile: str) -> Optional[str]: """Guess the format of the given file by reading the first line. Returns ------- str or None The detected format name, or None if the file is empty. """ # If the filename extension indicates the format, try that first fname_fmt = None fname = get_filename(infile) if fname: _base, ext = os.path.splitext(fname) ext = ext.lstrip(".") # if ext in known_extensions: if ext in format_patterns: fname_fmt = ext # Fallback: regex detection # has_track = False with as_handle(infile, "r") as handle: for line in handle: if not line.strip(): # Skip blank lines continue if line.startswith("track") or line.startswith("browser "): # NB: Could be UCSC BED or Ensembl GFF # has_track = True continue if fname_fmt and format_patterns[fname_fmt].match(line): return fname_fmt # Formats that (may) declare themselves in an initial '#' comment if line.startswith("##gff-version") or format_patterns["gff"].match(line): return "gff" if line.startswith(("##fileformat=VCF", "#CHROM\tPOS\tID")): return "vcf" if line.startswith("#"): continue # Formats that need to be guessed solely by regex if format_patterns["text"].match(line): return "text" if format_patterns["tab"].match(line): return "tab" if line.startswith("@") or format_patterns["interval"].match(line): return "interval" if format_patterns["refflat"].match(line): return "refflat" if format_patterns["bed"].match(line): return "bed" raise ValueError( "File %r does not appear to be a recognized " "format! (Any of: %s)\n" "First non-blank line:\n%s" % (fname, ", ".join(format_patterns.keys()), line) )
format_patterns = collections.OrderedDict( [ # ('genepred', re.compile()), # ('genepredext', re.compile()), ("text", re.compile(r"\w+:\d*-\d*.*")), ("tab", re.compile("\t".join(("chromosome", "start", "end")))), ( "interval", re.compile("\t".join((r"\w+", r"\d+", r"\d+", r"[.+-]", r"\S+$"))), ), ( "refflat", re.compile( "\t".join( ( r"\S+", r"\S+", r"\w+", r"[+-]", r"\d+", r"\d+", r"\d+", r"\d+", r"\d+", r"(\d+,)+", r"(\d+,)+$", ) ) ), ), ( "gff", re.compile( "\t".join( ( r"\w+", r"\S+", r"\w+", r"\d+", r"\d+", r"\S+", r"[.?+-]", r"[012.]", r".*", ) ) ), ), ("bed", re.compile("\t".join((r"\S+", r"\d+", r"\d+")))), ] )