Source code for cnvlib.parallel

"""Utilities for multi-core parallel processing."""
from __future__ import absolute_import, division, print_function
from builtins import object

import atexit
import tempfile
import gzip
import os
from contextlib import contextmanager
from concurrent import futures
#  from concurrent.futures import wait

[docs]class SerialPool(object): """Mimic the concurrent.futures.Executor interface, but run in serial.""" def __init__(self): pass
[docs] def submit(self, func, *args): """Just call the function on the arguments.""" return SerialFuture(func(*args))
[docs] def map(self, func, iterable): """Just apply the function to `iterable`.""" return map(func, iterable)
[docs] def shutdown(self, wait=True): """Do nothing.""" pass
[docs]class SerialFuture(object): """Mimic the concurrent.futures.Future interface.""" def __init__(self, result): self._result = result
[docs] def result(self): return self._result
[docs]@contextmanager def pick_pool(nprocs): if nprocs == 1: yield SerialPool() else: if nprocs < 1: nprocs = None with futures.ProcessPoolExecutor(max_workers=nprocs) as pool: yield pool
[docs]def rm(path): """Safely remove a file.""" try: os.unlink(path) except OSError: pass
[docs]def to_chunks(bed_fname, chunk_size=5000): """Split a BED file into `chunk_size`-line parts for parallelization.""" k, chunk = 0, 0 fd, name = tempfile.mkstemp(suffix=".bed", prefix="tmp.%s." % chunk) outfile = os.fdopen(fd, "w") atexit.register(rm, name) opener = ( if bed_fname.endswith(".gz") else open) with opener(bed_fname) as infile: for line in infile: if line[0] == "#": continue k += 1 outfile.write(line) if k % chunk_size == 0: outfile.close() yield name chunk += 1 fd, name = tempfile.mkstemp(suffix=".bed", prefix="tmp.%s." % chunk) outfile = os.fdopen(fd, "w") outfile.close() if k % chunk_size: outfile.close() yield name