Source code for protflow.jobstarters

"""
jobstarters
===========

This module, `jobstarters`, provides a set of classes and methods to facilitate the submission and management 
of computing jobs on various job scheduling systems. JobStarters are passed to Runner objects in their .run()
methods to facilitate a standardized execution of commands generated by the Runner. JobStarters can also be
executed outside of Runner classes as is shown in the examples.

The JobStarter class defines a base `JobStarter` class with methods that need to be implemented by subclasses 
to start jobs and wait for their completion.

Overview
--------
The module includes the following classes and methods:

Classes
-------
- `JobStarter`: An abstract base class that defines the interface for all jobstarters.
- `SbatchArrayJobstarter`: A concrete implementation of `JobStarter` for managing SLURM job arrays.
- `LocalJobStarter`: A concrete implementation of `JobStarter` for managing local jobs.

Usage
-----
To use a jobstarter, instantiate an appropriate subclass (e.g., `SbatchArrayJobstarter`) and call its `start` method with the desired commands and options. Use the `wait_for_job` method if you need to wait for job completion.

Example:
    >>> from jobstarters import SbatchArrayJobstarter
    >>> job_starter = SbatchArrayJobstarter(max_cores=50, remove_cmdfile=True)
    >>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output")

Note
----
This module is designed to be extended with additional jobstarters for different scheduling systems as needed. If you want to implement your own JobStarter and need assistance, please contact any of the authors of ProtFlow for assistance. We are happy about every contribution!

"""
# imports
import os
import time
import logging
import itertools
import subprocess
from multiprocessing import ProcessError

# dependencies
import numpy as np

[docs] class JobStarter: """ Abstract base class for job starters. This class defines the interface for all job starters. Subclasses should implement methods to start jobs and wait for their completion. It also includes a method to set the maximum number of cores available for the jobs. Examples -------- This class is designed to be extended by other classes that implement specific job scheduling systems. Example subclass implementation: .. code-block:: python class CustomJobStarter(JobStarter): def start(self, cmds, jobname, wait, output_path): # Implementation for starting jobs pass def wait_for_job(self, jobname, interval): # Implementation for waiting for job completion pass """
[docs] def __init__(self, max_cores: int = None): """ Initializes the JobStarter with an optional maximum number of cores. Parameters ---------- max_cores : int, optional The maximum number of cores that can be used for the jobs. Default is None. """ self.max_cores = max_cores self.last_error_message = None
[docs] def start(self, cmds: list, jobname: str, wait: bool, output_path: str) -> None: """ Submits a list of commands as jobs to the scheduling system. Parameters ---------- cmds : list A list of commands to be submitted as jobs. jobname : str The name of the job. wait : bool Whether to wait for the job to complete before proceeding. output_path : str The path where output files should be stored. Raises ------ NotImplementedError If this method is not implemented in a subclass. """ raise NotImplementedError("Jobstarter 'start' function was not overwritten!")
[docs] def wait_for_job(self, jobname: str, interval: float) -> None: """ Waits for a job to complete before proceeding. Parameters ---------- jobname : str The name of the job to wait for. interval : float The interval (in seconds) at which to check the job status. Raises ------ NotImplementedError If this method is not implemented in a subclass. """ raise NotImplementedError("Jobstarter 'wait_for_job' function was not overwritten!")
[docs] def set_max_cores(self, cores: int) -> None: """ Sets the maximum number of cores available for the jobs. Parameters ---------- cores : int The maximum number of cores to set. """ self.max_cores = cores
[docs] def set_last_error_message(self, error_path:str, read_bytes:int=16384): """ Saves content of an error logfile. Parameters ---------- error_path : str The path to the error logfile. read_bytes : int, optional Defines how many bytes of the log file should be read (starting from the back). Default is 8192. """ if os.path.isfile(error_path): with open(error_path, 'rb') as f: f.seek(0, 2) # go to end filesize = f.tell() f.seek(max(0, filesize - read_bytes)) # jump back some bytes data = f.read().decode("utf-8", errors="ignore") if filesize > read_bytes: data = f"Error message truncated. See full output at {os.path.abspath(error_path)}.\n{data}" self.last_error_message = data else: self.last_error_message = None
[docs] class SbatchArrayJobstarter(JobStarter): """ Jobstarter that manages the submission of job arrays to SLURM clusters. This class extends the `JobStarter` base class to provide functionality specific to SLURM job arrays. It handles tasks such as generating command files, submitting jobs using `sbatch`, and waiting for job completion. It also supports options for GPU usage and automatic cleanup of command files after job completion. Parameters ---------- max_cores : int, optional The maximum number of cores that can be used for the jobs. Default is 100. remove_cmdfile : bool, optional Whether to remove the command file after job completion. Default is False. options : str, optional Additional SBATCH options to be used when submitting jobs. Default is None. gpus : bool, optional Whether to use GPUs for the job. Default is False. Raises ------ TypeError If the options parameter is not a string or list. Examples -------- Example usage: >>> from jobstarters import SbatchArrayJobstarter >>> job_starter = SbatchArrayJobstarter(max_cores=50, remove_cmdfile=True, options="--time=10:00", gpus=True) >>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output") """
[docs] def __init__(self, max_cores: int = 100, remove_cmdfile: bool = False, options: str = None, gpus: bool = False, batch_cmds: int = None): """ Initializes the SbatchArrayJobstarter with optional parameters. Parameters ---------- max_cores : int, optional The maximum number of cores that can be used for the jobs. Default is 100. remove_cmdfile : bool, optional Whether to remove the command file after job completion. Default is False. options : str, optional Additional SBATCH options to be used when submitting jobs. Default is None. gpus : bool, optional Whether to use GPUs for the job. Default is False. batch_cmds : bool, optional Whether to batch the input cmds to the specified number. Default is None. Note ---- The options parameter must be set when the Jobstarter is created, not when the `.start` function is executed. """ super().__init__() # runs init-function of parent class (JobStarter) self.max_cores = max_cores self.remove_cmdfile = remove_cmdfile self.batch_cmds = batch_cmds self.set_options(options, gpus=gpus) self.bash = False self.last_job_name = None # static attribute, can be changed depending on slurm settings: self.slurm_max_arrayjobs = 1000
[docs] def start(self, cmds: list, jobname: str, wait: bool = True, output_path: str = "./", batch_cmds: int = None) -> None: """ Writes commands into a command file and starts an SBATCH job running the command file. Parameters ---------- cmds : list List of commands to be executed as part of the job array. jobname : str Name of the job. wait : bool, optional Whether to wait for the job to complete before returning. Default is True. output_path : str, optional Path where output files should be stored. Default is "./". batch_cmds : bool, optional Whether to batch the input cmds to the specified number. Default is None. Raises ------ RuntimeError If the SLURM submission fails. """ if self.bash: cmds = [f'/bin/bash -c "{cmd}"' for cmd in cmds] # batch input cmds to number of available cores if specified batch_cmds = batch_cmds or self.batch_cmds if batch_cmds and len(cmds) > batch_cmds: cmds = ["; ".join(sublist) for sublist in split_list(cmds, n_sublists=batch_cmds)] # check if cmds is smaller than 1000. If yes, split cmds and start split array! if len(cmds) > self.slurm_max_arrayjobs: logging.info("The commands-list you supplied is longer than self.slurm_max_arrayjobs. Your job will be subdivided into multiple arrays.") for sublist in split_list(cmds, self.slurm_max_arrayjobs): self.start(cmds=sublist, jobname=jobname, wait=wait, output_path=output_path) return None # write cmd-file jobname = add_timestamp(jobname) with open((cmdfile := f"{output_path}/{jobname}_cmds"), 'w', encoding="UTF-8") as f: f.write("\n".join(cmds)) # write sbatch command and run log_file = os.path.join(output_path, f"{jobname}_slurm.out") error_file = os.path.join(output_path, f"{jobname}_slurm.err") self.options += f" -vvv -e {error_file} -o {log_file} --open-mode=append" sbatch_cmd = f'sbatch -a 1-{str(len(cmds))}%{str(self.max_cores)} -J {jobname} {self.options} --wrap "eval {chr(92)}`sed -n {chr(92)}${{SLURM_ARRAY_TASK_ID}}p {cmdfile}{chr(92)}`"' # save last job name, e.g. for timer wrapper self.last_job_name = jobname with open(f"{output_path}/{jobname}_jobstarter.log", "w", encoding="UTF-8") as out_file: # Run the sbatch command and direct both stdout and stderr to the log file subprocess.run(sbatch_cmd, shell=True, stdout=out_file, stderr=out_file, check=True) # wait for job and clean up if wait: self.wait_for_job(jobname) if self.remove_cmdfile: subprocess.run(f"rm {cmdfile}", shell=True, stdout=True, stderr=True, check=True) self.set_last_error_message(error_file) return None
def _use_bash(self, use_bash: bool) -> None: '''Configure whether to use bash shell to execute commands or default.''' self.bash = use_bash
[docs] def parse_options(self, options: object) -> str: """ Parses the SBATCH options. Parameters ---------- options : object SBATCH options in string or list format. Returns ------- str Parsed SBATCH options. Raises ------ TypeError If the options parameter is not a string or list. """ # parse options if isinstance(options, list): return " ".join(options) if isinstance(options, str): return options if options is None: return "" raise TypeError(f"Unsupported type for argument options: {type(options)}. Supported types: [str, list]")
[docs] def set_options(self, options: object, gpus: int) -> None: """ Sets the SBATCH options. Parameters ---------- options : object SBATCH options in string or list format. gpus : int Number of GPUs to be used per node. """ self.options = self.parse_options(options) if gpus: self.options += f"--gpus-per-node {gpus}"
[docs] def wait_for_job(self, jobname: str, interval: float = 5) -> None: """ Waits for SLURM jobs to be finished. Parameters ---------- jobname : str Name of the job to wait for. interval : float, optional Interval (in seconds) at which to check the job status. Default is 5. """ # Check if job is running by capturing the length of the output of squeue command that only returns jobs with <jobname>: job_output = subprocess.run(f'squeue -n {jobname} -o "%A"', shell=True, capture_output=True, text=True, check=False).stdout.strip().split("\n") while len(job_output) > 1: time.sleep(interval) job_output = subprocess.run(f'squeue -n {jobname} -o "%A"', shell=True, capture_output=True, text=True, check=False).stdout.strip().split("\n") logging.info(f"Job {jobname} completed.\n") time.sleep(interval) return None
[docs] class LocalJobStarter(JobStarter): """ Jobstarter that runs jobs locally using subprocess.run(). This class extends the `JobStarter` base class to provide functionality for running jobs locally on the machine. It handles the execution of commands using subprocesses, manages the maximum number of concurrent processes, and captures the output and error logs for each command. Parameters ---------- max_cores : int, optional The maximum number of cores that can be used for the jobs. Default is 1. Raises ------ ProcessError If a subprocess crashes during execution. Examples -------- Example usage: >>> from jobstarters import LocalJobStarter >>> job_starter = LocalJobStarter(max_cores=2) >>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output") """
[docs] def __init__(self, max_cores:int=1): """ Initializes the LocalJobStarter with an optional parameter for maximum cores. Parameters ---------- max_cores : int, optional The maximum number of cores that can be used for the jobs. Default is 1. """ super().__init__() self.max_cores = max_cores
[docs] def start(self, cmds: list, jobname: str, wait: bool = True, output_path: str = "./") -> None: """ Submits a list of commands to be run locally, managing the execution and logging of each command. Parameters ---------- cmds : list List of commands to be executed locally. jobname : str Name of the job. wait : bool, optional Whether to wait for all commands to complete before returning. Default is True. output_path : str, optional Path where output files should be stored. Default is None. Raises ------ ProcessError If a subprocess crashes during execution. """ def start_process(command, log_file, error_file): # Open the file to capture output and error with open(log_file, 'w', encoding="UTF-8") as out_file, \ open(error_file, 'w', encoding="UTF-8") as err_file: # Start the process process = subprocess.Popen(command, env=env, executable="/bin/bash", shell=True, stdout=out_file, stderr=err_file) process.command = command # type: ignore ### giving process a custom attribute for later error tractability return process def update_active_processes(active_processes: list, error_file: str) -> list: '''checks how many of the processes are active. wait: wait for process to be finished when removing.''' for process in active_processes: # [:] if copy is required if process.poll() is not None: # process finished returncode = process.wait() if returncode != 0: self.set_last_error_message(error_file) raise ProcessError(f"Subprocess Crashed. Check last output log of Subprocess! Command: {process.command}") active_processes.remove(process) return active_processes # collect environment context env = os.environ.copy() # write cmds to file: cmdfile_path = f"{output_path}/{jobname}_cmds.txt" with open(cmdfile_path, 'w', encoding='UTF-8') as f: f.write("\n".join(cmds)+"\n") # invert cmds to start from the top with .pop() cmds = cmds[::-1] # initialize job loop active_processes = [] i = 0 while len(cmds) >= 1: log_file = os.path.join(output_path, f"process_{str(i)}.out") error_file = os.path.join(output_path, f"process_{str(i)}.err") # first check if any processes need to be removed while len(active_processes) >= self.max_cores: update_active_processes(active_processes, error_file) time.sleep(1) # avoid busy waiting # setup process: cmd = cmds.pop() i += 1 # start active_processes.append(start_process(cmd, log_file, error_file)) # wait for completion loop while len(active_processes) != 0: update_active_processes(active_processes, error_file) time.sleep(1) self.set_last_error_message(error_file) return None
[docs] def wait_for_job(self, jobname:str, interval:float) -> None: """ (No-op) Method for waiting for started jobs. Parameters ---------- jobname : str Name of the job to wait for. interval : float Interval (in seconds) at which to check the job status. """ return None
[docs] def add_timestamp(x: str) -> str: """ Adds a unique timestamp to a string using the time library. This function appends a unique timestamp to the given string. The timestamp is generated using the current time, which ensures that the resulting string is unique in most cases. The timestamp is added as a suffix, separated by an underscore. Parameters ---------- x : str The input string to which the timestamp will be added. Returns ------- str The input string with a unique timestamp appended. Examples -------- >>> add_timestamp("jobname") 'jobname_1632417284' Notes ----- The timestamp is derived from the current time in seconds since the epoch, with the fractional part of the seconds included to ensure higher precision and uniqueness. """ return "_".join([x, f"{str(time.time()).rsplit('.', maxsplit=1)[-1]}"])
[docs] def split_list(input_list: list, element_length: int = None, n_sublists: int = None) -> list: """ Splits a list into nested sublists with specified lengths or number of sublists. This function divides the input list into a nested list of sublists. The division can be based on the maximum length of each sublist or the desired number of sublists. Only one of the parameters, `element_length` or `n_sublists`, should be specified at a time. Parameters ---------- input_list : list The list to be split into sublists. element_length : int, optional The maximum length of each sublist. If specified, the input list will be split into sublists each having up to `element_length` elements. n_sublists : int, optional The desired number of sublists. If specified, the input list will be divided into `n_sublists` sublists. Returns ------- list A nested list containing the sublists. Raises ------ ValueError If both `element_length` and `n_sublists` are specified or if neither is specified. Examples -------- Splitting a list into sublists of a specified maximum length: >>> split_list([1, 2, 3, 4, 5, 6], element_length=2) [[1, 2], [3, 4], [5, 6]] Splitting a list into a specified number of sublists: >>> split_list([1, 2, 3, 4, 5, 6], n_sublists=3) [[1, 2], [3, 4], [5, 6]] Notes ----- - If `n_sublists` is specified and is greater than the length of the input list, the number of sublists will be equal to the length of the input list. - If neither `element_length` nor `n_sublists` is provided, or if both are provided, a `ValueError` will be raised. """ # safety if element_length and n_sublists: raise ValueError("Only either element_length or n_sublists can be specified, but not both!") if not element_length and not n_sublists: raise ValueError("At least one of arguments 'element_length or n_sublists has to be given!") # handling n_sublists if n_sublists: # if initial list is shorter than n_sublists, take minimum value split_n = min([n_sublists, len(input_list)]) return [list(x) for x in np.array_split(input_list, int(split_n))] # handling element_length result = [] iterator = iter(input_list) while True: sublist = list(itertools.islice(iterator, element_length)) if not sublist: break result.append(sublist) return result
[docs] def get_SLURM_stats(job_name, start_time=None): """ Query ``sacct`` and return aggregated resource statistics for a SLURM job array. Shells out to the SLURM ``sacct`` command to retrieve per-task timing and CPU accounting data for all tasks in the array job identified by *job_name*. The raw per-task records are aggregated into a single summary dictionary, which is returned to the caller. .. warning:: This function must be called from the **cluster login node** or another host that has the ``sacct`` binary in its ``PATH`` and access to SLURM's accounting database. Calling it from within a compute-node job step (e.g. inside a running SLURM batch script) will fail because ``sacct`` is not available on compute nodes. Parameters ---------- job_name : str The SLURM job name to query (passed to ``sacct --name``). This corresponds to the ``jobname`` argument supplied to :meth:`~protflow.jobstarters.SbatchArrayJobstarter.start` and is stored in :attr:`~protflow.jobstarters.SbatchArrayJobstarter.last_job_name` after each submission. start_time : str, optional ISO-8601 datetime string (``YYYY-MM-DDTHH:MM:SS``) passed to ``sacct --starttime`` to restrict results to jobs that began at or after this timestamp. When omitted, ``sacct`` returns all matching records regardless of age, which may cause false matches against stale jobs with the same name from earlier sessions. It is strongly recommended to pass the :attr:`~SbatchArrayRunnerTimer.session_start` attribute of the enclosing :class:`SbatchArrayRunnerTimer` to avoid this. Returns ------- dict A dictionary containing aggregated statistics. **On success**, keys include: job_name : str The *job_name* argument echoed back. total_cpu_sec : int Sum of ``CPUTimeRaw`` across all tasks. avg_task_runtime_sec : float Mean wall-clock elapsed time per task in seconds (2 decimal places). max_task_runtime_sec : int Wall-clock elapsed time of the longest-running task. min_task_runtime_sec : int Wall-clock elapsed time of the shortest-running task. num_tasks : int Total number of task records returned. total_cpus_reserved : int Sum of ``AllocCPUS`` across all tasks. state : str ``"COMPLETED"`` or ``"MIXED (<states>)"``. queried_after : str or None The *start_time* argument echoed back. **On failure**, keys include: job_name : str The *job_name* argument echoed back. error : str Human-readable description of the failure. Raises ------ None This function does **not** propagate exceptions. All errors are caught and returned as a dictionary with an ``error`` key. Notes ----- * The ``sacct`` command is invoked with ``-X`` (suppress sub-step records), ``--format JobName,ElapsedRaw,CPUTimeRaw,AllocCPUS,State``, ``-n`` (no header), and ``-P`` (pipe-delimited output). The resulting fields are parsed by position. * ``ElapsedRaw`` is SLURM's wall-clock elapsed time for each individual task in seconds; ``CPUTimeRaw`` is ``ElapsedRaw × AllocCPUS`` and reflects total CPU-core-seconds reserved (not necessarily consumed). * The command is executed as a shell string (``shell=True``) so that ``--starttime`` and other arguments with special characters are handled correctly by the system shell. * Empty or whitespace-only lines in ``sacct``'s stdout are filtered before parsing. * The ``state`` aggregation logic is strict: ``"COMPLETED"`` is only returned when **every** task's state is exactly ``"COMPLETED"`` (set equality). A single failed or cancelled task will produce a ``"MIXED"`` state. Examples -------- Query statistics for a recently submitted job:: from protflow.jobstarters import get_SLURM_stats stats = get_SLURM_stats("caliby_seqdes", start_time="2025-06-01T12:00:00") print(stats) """ # Base command cmd = [ "sacct", "--name", job_name, "-X", "--format", "JobName,ElapsedRaw,CPUTimeRaw,AllocCPUS,State", "-n", "-P" ] # Add starttime if provided if start_time: cmd += ["--starttime", start_time] cmd = " ".join(cmd) try: # run sacct command on the shell res = subprocess.run(cmd, capture_output=True, text=True, check=True, shell=True) lines = [l for l in res.stdout.strip().split('\n') if l] if not lines: return {"job_name": job_name, "error": f"No records found since {start_time or 'beginning'}"} # Initialize aggregators total_cpu_sec = 0 task_runtimes = [] states = set() total_cpus_allocated = 0 for line in lines: p = line.split('|') # p[1] = Elapsed (Wall-clock for this specific task) # p[2] = CPUTime (Elapsed * AllocCPUS for this task) # p[3] = AllocCPUS elapsed = int(p[1]) if p[1] else 0 cpu_raw = int(p[2]) if p[2] else 0 cpus = int(p[3]) if p[3] else 1 total_cpu_sec += cpu_raw total_cpus_allocated += cpus task_runtimes.append(elapsed) states.add(p[4]) # Logic for final state: If everything is COMPLETED, return COMPLETED. # Otherwise, list the unique states (e.g., "COMPLETED, FAILED") final_state = "COMPLETED" if states == {"COMPLETED"} else f"MIXED ({', '.join(states)})" return { "job_name": job_name, "total_cpu_sec": total_cpu_sec, "avg_task_runtime_sec": round(sum(task_runtimes) / len(task_runtimes), 2) if task_runtimes else 0, "max_task_runtime_sec": max(task_runtimes) if task_runtimes else 0, "min_task_runtime_sec": min(task_runtimes) if task_runtimes else 0, "num_tasks": len(task_runtimes), "total_cpus_reserved": total_cpus_allocated, "state": final_state, "queried_after": start_time } except subprocess.CalledProcessError as e: return {"job_name": job_name, "error": f"SLURM Error: {e.stderr}"} except Exception as e: return {"job_name": job_name, "error": str(e)}