"""CNV utilities."""
from __future__ import absolute_import, division, print_function
import sys
import os.path
from itertools import takewhile
from .ngfrills import safe_write
# __________________________________________________________________________
# I/O helpers
[docs]def write_tsv(outfname, rows, colnames=None):
"""Write rows, with optional column header, to tabular file."""
with safe_write(outfname or sys.stdout) as handle:
if colnames:
header = '\t'.join(colnames) + '\n'
handle.write(header)
handle.writelines('\t'.join(map(str, row)) + '\n'
for row in rows)
[docs]def write_text(outfname, text, *more_texts):
"""Write one or more strings (blocks of text) to a file."""
with safe_write(outfname or sys.stdout) as handle:
handle.write(text)
if more_texts:
for mtext in more_texts:
handle.write(mtext)
[docs]def write_dataframe(outfname, dframe, header=True):
"""Write a pandas.DataFrame to a tabular file."""
with safe_write(outfname or sys.stdout) as handle:
dframe.to_csv(handle, header=header,
index=False, sep='\t', float_format='%.6g')
# __________________________________________________________________________
# Sorting key functions
[docs]def sorter_chrom(label):
"""Create a sorting key from chromosome label.
Sort by integers first, then letters or strings. The prefix "chr"
(case-insensitive), if present, is stripped automatically for sorting.
E.g. chr1 < chr2 < chr10 < chrX < chrY < chrM
"""
# Strip "chr" prefix
chrom = (label[3:] if label.lower().startswith('chr')
else label)
if chrom in ('X', 'Y'):
key = (1000, chrom)
else:
# Separate numeric and special chromosomes
nums = ''.join(takewhile(str.isdigit, chrom))
chars = chrom[len(nums):]
nums = int(nums) if nums else 0
if not chars:
key = (nums, '')
elif len(chars) == 1:
key = (2000 + nums, chars)
else:
key = (3000 + nums, chars)
return key
[docs]def sorter_chrom_at(index):
"""Create a sort key function that gets chromosome label at a list index."""
return lambda row: sorter_chrom(row[index])
# __________________________________________________________________________
# More helpers
[docs]def assert_equal(msg, **values):
"""Evaluate and compare two or more values for equality.
Sugar for a common assertion pattern. Saves re-evaluating (and retyping)
the same values for comparison and error reporting.
Example:
>>> assert_equal("Mismatch", expected=1, saw=len(['xx', 'yy']))
...
ValueError: Mismatch: expected = 1, saw = 2
"""
ok = True
key1, val1 = values.popitem()
msg += ": %s = %r" % (key1, val1)
for okey, oval in values.iteritems():
msg += ", %s = %r" % (okey, oval)
if oval != val1:
ok = False
if not ok:
raise ValueError(msg)
[docs]def check_unique(items, title):
"""Ensure all items in an iterable are identical; return that one item."""
its = set(items)
assert len(its) == 1, ("Inconsistent %s keys: %s"
% (title, ' '.join(map(str, sorted(its)))))
return its.pop()
[docs]def fbase(fname):
"""Strip directory and all extensions from a filename."""
return os.path.basename(fname).split('.', 1)[0]
[docs]def rbase(fname):
"""Strip directory and final extension from a filename."""
return os.path.basename(fname).rsplit('.', 1)[0]