Source code for cnvlib.parallel

"""Utilities for multi-core parallel processing."""

from __future__ import annotations
import atexit
import tempfile
import gzip
import os
from contextlib import contextmanager, suppress
from concurrent import futures
from typing import TYPE_CHECKING, Any, Union

if TYPE_CHECKING:
    from collections.abc import Callable
    from collections.abc import Iterator
    from concurrent.futures.process import ProcessPoolExecutor


[docs] class SerialPool: """Mimic the concurrent.futures.Executor interface, but run in serial."""
[docs] def __init__(self) -> None: pass
[docs] def submit(self, func: Callable, *args) -> SerialFuture: """Just call the function on the arguments.""" try: result = func(*args) return SerialFuture(result=result) except Exception as exc: return SerialFuture(exception=exc)
[docs] def map(self, func: Callable, iterable: Iterator[Any]) -> map: """Just apply the function to `iterable`.""" return map(func, iterable)
[docs] def shutdown(self, wait=True) -> None: """Do nothing.""" pass
[docs] class SerialFuture: """Mimic the concurrent.futures.Future interface."""
[docs] def __init__(self, result: Any = None, exception: Exception | None = None) -> None: self._result = result self._exception = exception
[docs] def result(self) -> Any: if self._exception is not None: raise self._exception return self._result
[docs] @contextmanager def pick_pool(nprocs: int) -> Iterator[Union[SerialPool, ProcessPoolExecutor]]: if nprocs == 1: yield SerialPool() else: if nprocs < 1: nprocs = None with futures.ProcessPoolExecutor(max_workers=nprocs) as pool: yield pool
[docs] def rm(path: str) -> None: """Safely remove a file.""" with suppress(OSError): os.unlink(path)
[docs] def to_chunks(bed_fname: str, chunk_size: int = 5000) -> Iterator[str]: """Split a BED file into `chunk_size`-line parts for parallelization.""" k, chunk = 0, 0 fd, name = tempfile.mkstemp(suffix=".bed", prefix=f"tmp.{chunk}.") outfile = os.fdopen(fd, "w") atexit.register(rm, name) opener = gzip.open if bed_fname.endswith(".gz") else open with opener(bed_fname) as infile: for line in infile: if line[0] == "#": continue k += 1 outfile.write(line) if k % chunk_size == 0: outfile.close() yield name chunk += 1 fd, name = tempfile.mkstemp(suffix=".bed", prefix=f"tmp.{chunk}.") outfile = os.fdopen(fd, "w") outfile.close() if k % chunk_size: outfile.close() yield name