"""BAM utilities."""
import logging
import os
from itertools import islice
import numpy as np
import pandas as pd
import pysam
from io import StringIO
from pathlib import Path,PurePath
[docs]def idxstats(bam_fname, drop_unmapped=False, fasta=None):
"""Get chromosome names, lengths, and number of mapped/unmapped reads.
Use the BAM index (.bai) to get the number of reads and size of each
chromosome. Contigs with no mapped reads are skipped.
"""
handle = StringIO(pysam.idxstats(bam_fname, split_lines=False, reference_filename=fasta))
table = pd.read_csv(handle, sep='\t', header=None,
names=['chromosome', 'length', 'mapped', 'unmapped'])
if drop_unmapped:
table = table[table.mapped != 0].drop('unmapped', axis=1)
return table
[docs]def bam_total_reads(bam_fname, fasta=None):
"""Count the total number of mapped reads in a BAM file.
Uses the BAM index to do this quickly.
"""
table = idxstats(bam_fname, drop_unmapped=True, fasta=fasta)
return table.mapped.sum()
[docs]def ensure_bam_index(bam_fname):
"""Ensure a BAM file is indexed, to enable fast traversal & lookup.
For MySample.bam, samtools will look for an index in these files, in order:
- MySample.bam.bai
- MySample.bai
"""
if PurePath(bam_fname).suffix == ".cram":
if os.path.isfile(bam_fname + '.crai'):
# MySample.cram.crai
bai_fname = bam_fname + '.crai'
else:
# MySample.crai
bai_fname = bam_fname[:-1] + 'i'
if not is_newer_than(bai_fname, bam_fname):
logging.info("Indexing CRAM file %s", bam_fname)
pysam.index(bam_fname)
bai_fname = bam_fname + '.crai'
assert os.path.isfile(bai_fname), \
"Failed to generate cram index " + bai_fname
else:
if os.path.isfile(bam_fname + '.bai'):
# MySample.bam.bai
bai_fname = bam_fname + '.bai'
else:
# MySample.bai
bai_fname = bam_fname[:-1] + 'i'
if not is_newer_than(bai_fname, bam_fname):
logging.info("Indexing BAM file %s", bam_fname)
pysam.index(bam_fname)
bai_fname = bam_fname + '.bai'
assert os.path.isfile(bai_fname), \
"Failed to generate bam index " + bai_fname
return bai_fname
[docs]def ensure_bam_sorted(bam_fname, by_name=False, span=50, fasta=None):
"""Test if the reads in a BAM file are sorted as expected.
by_name=True: reads are expected to be sorted by query name. Consecutive
read IDs are in alphabetical order, and read pairs appear together.
by_name=False: reads are sorted by position. Consecutive reads have
increasing position.
"""
if by_name:
# Compare read IDs
def out_of_order(read, prev):
return not (prev is None or
prev.qname <= read.qname)
else:
# Compare read locations
def out_of_order(read, prev):
return not (prev is None or
read.tid != prev.tid or
prev.pos <= read.pos)
# ENH - repeat at 50%, ~99% through the BAM
bam = pysam.Samfile(bam_fname, 'rb', reference_filename=fasta)
last_read = None
for read in islice(bam, span):
if out_of_order(read, last_read):
return False
last_read = read
bam.close()
return True
[docs]def is_newer_than(target_fname, orig_fname):
"""Compare file modification times."""
if not os.path.isfile(target_fname):
return False
return (os.stat(target_fname).st_mtime >= os.stat(orig_fname).st_mtime)
[docs]def get_read_length(bam, span=1000, fasta=None):
"""Get (median) read length from first few reads in a BAM file.
Illumina reads all have the same length; other sequencers might not.
Parameters
----------
bam : str or pysam.Samfile
Filename or pysam-opened BAM file.
n : int
Number of reads used to calculate median read length.
"""
was_open = False
if isinstance(bam, str):
bam = pysam.Samfile(bam, 'rb', reference_filename=fasta)
else:
was_open = True
lengths = [read.query_length for read in islice(bam, span)
if read.query_length > 0]
if was_open:
bam.seek(0)
else:
bam.close()
return np.median(lengths)