"""
jobstarters
===========
This module, `jobstarters`, provides a set of classes and methods to facilitate the submission and management
of computing jobs on various job scheduling systems. JobStarters are passed to Runner objects in their .run()
methods to facilitate a standardized execution of commands generated by the Runner. JobStarters can also be
executed outside of Runner classes as is shown in the examples.
The JobStarter class defines a base `JobStarter` class with methods that need to be implemented by subclasses
to start jobs and wait for their completion.
Overview
--------
The module includes the following classes and methods:
Classes
-------
- `JobStarter`: An abstract base class that defines the interface for all jobstarters.
- `SbatchArrayJobstarter`: A concrete implementation of `JobStarter` for managing SLURM job arrays.
- `LocalJobStarter`: A concrete implementation of `JobStarter` for managing local jobs.
Usage
-----
To use a jobstarter, instantiate an appropriate subclass (e.g., `SbatchArrayJobstarter`) and call its `start` method with the desired commands and options. Use the `wait_for_job` method if you need to wait for job completion.
Example:
>>> from jobstarters import SbatchArrayJobstarter
>>> job_starter = SbatchArrayJobstarter(max_cores=50, remove_cmdfile=True)
>>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output")
Note
----
This module is designed to be extended with additional jobstarters for different scheduling systems as needed. If you want to implement your own JobStarter and need assistance, please contact any of the authors of ProtFlow for assistance. We are happy about every contribution!
"""
# imports
import os
import time
import logging
import itertools
import subprocess
from multiprocessing import ProcessError
# dependencies
import numpy as np
[docs]
class JobStarter:
"""
Abstract base class for job starters.
This class defines the interface for all job starters. Subclasses should implement methods
to start jobs and wait for their completion. It also includes a method to set the maximum
number of cores available for the jobs.
Examples
--------
This class is designed to be extended by other classes that implement specific job
scheduling systems.
Example subclass implementation:
.. code-block:: python
class CustomJobStarter(JobStarter):
def start(self, cmds, jobname, wait, output_path):
# Implementation for starting jobs
pass
def wait_for_job(self, jobname, interval):
# Implementation for waiting for job completion
pass
"""
[docs]
def __init__(self, max_cores: int = None):
"""
Initializes the JobStarter with an optional maximum number of cores.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is None.
"""
self.max_cores = max_cores
self.last_error_message = None
[docs]
def start(self, cmds: list, jobname: str, wait: bool, output_path: str) -> None:
"""
Submits a list of commands as jobs to the scheduling system.
Parameters
----------
cmds : list
A list of commands to be submitted as jobs.
jobname : str
The name of the job.
wait : bool
Whether to wait for the job to complete before proceeding.
output_path : str
The path where output files should be stored.
Raises
------
NotImplementedError
If this method is not implemented in a subclass.
"""
raise NotImplementedError("Jobstarter 'start' function was not overwritten!")
[docs]
def wait_for_job(self, jobname: str, interval: float) -> None:
"""
Waits for a job to complete before proceeding.
Parameters
----------
jobname : str
The name of the job to wait for.
interval : float
The interval (in seconds) at which to check the job status.
Raises
------
NotImplementedError
If this method is not implemented in a subclass.
"""
raise NotImplementedError("Jobstarter 'wait_for_job' function was not overwritten!")
[docs]
def set_max_cores(self, cores: int) -> None:
"""
Sets the maximum number of cores available for the jobs.
Parameters
----------
cores : int
The maximum number of cores to set.
"""
self.max_cores = cores
[docs]
def set_last_error_message(self, error_path:str, read_bytes:int=16384):
"""
Saves content of an error logfile.
Parameters
----------
error_path : str
The path to the error logfile.
read_bytes : int, optional
Defines how many bytes of the log file should be read (starting from the back). Default is 8192.
"""
if os.path.isfile(error_path):
with open(error_path, 'rb') as f:
f.seek(0, 2) # go to end
filesize = f.tell()
f.seek(max(0, filesize - read_bytes)) # jump back some bytes
data = f.read().decode("utf-8", errors="ignore")
if filesize > read_bytes:
data = f"Error message truncated. See full output at {os.path.abspath(error_path)}.\n{data}"
self.last_error_message = data
else:
self.last_error_message = None
[docs]
class SbatchArrayJobstarter(JobStarter):
"""
Jobstarter that manages the submission of job arrays to SLURM clusters.
This class extends the `JobStarter` base class to provide functionality specific to SLURM job arrays.
It handles tasks such as generating command files, submitting jobs using `sbatch`, and waiting for job
completion. It also supports options for GPU usage and automatic cleanup of command files after job
completion.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is 100.
remove_cmdfile : bool, optional
Whether to remove the command file after job completion. Default is False.
options : str, optional
Additional SBATCH options to be used when submitting jobs. Default is None.
gpus : bool, optional
Whether to use GPUs for the job. Default is False.
Raises
------
TypeError
If the options parameter is not a string or list.
Examples
--------
Example usage:
>>> from jobstarters import SbatchArrayJobstarter
>>> job_starter = SbatchArrayJobstarter(max_cores=50, remove_cmdfile=True, options="--time=10:00", gpus=True)
>>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output")
"""
[docs]
def __init__(self, max_cores: int = 100, remove_cmdfile: bool = False, options: str = None, gpus: bool = False, batch_cmds: int = None):
"""
Initializes the SbatchArrayJobstarter with optional parameters.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is 100.
remove_cmdfile : bool, optional
Whether to remove the command file after job completion. Default is False.
options : str, optional
Additional SBATCH options to be used when submitting jobs. Default is None.
gpus : bool, optional
Whether to use GPUs for the job. Default is False.
batch_cmds : bool, optional
Whether to batch the input cmds to the specified number. Default is None.
Note
----
The options parameter must be set when the Jobstarter is created, not when the `.start` function is executed.
"""
super().__init__() # runs init-function of parent class (JobStarter)
self.max_cores = max_cores
self.remove_cmdfile = remove_cmdfile
self.batch_cmds = batch_cmds
self.set_options(options, gpus=gpus)
self.bash = False
self.last_job_name = None
# static attribute, can be changed depending on slurm settings:
self.slurm_max_arrayjobs = 1000
[docs]
def start(self, cmds: list, jobname: str, wait: bool = True, output_path: str = "./", batch_cmds: int = None) -> None:
"""
Writes commands into a command file and starts an SBATCH job running the command file.
Parameters
----------
cmds : list
List of commands to be executed as part of the job array.
jobname : str
Name of the job.
wait : bool, optional
Whether to wait for the job to complete before returning. Default is True.
output_path : str, optional
Path where output files should be stored. Default is "./".
batch_cmds : bool, optional
Whether to batch the input cmds to the specified number. Default is None.
Raises
------
RuntimeError
If the SLURM submission fails.
"""
if self.bash:
cmds = [f'/bin/bash -c "{cmd}"' for cmd in cmds]
# batch input cmds to number of available cores if specified
batch_cmds = batch_cmds or self.batch_cmds
if batch_cmds and len(cmds) > batch_cmds:
cmds = ["; ".join(sublist) for sublist in split_list(cmds, n_sublists=batch_cmds)]
# check if cmds is smaller than 1000. If yes, split cmds and start split array!
if len(cmds) > self.slurm_max_arrayjobs:
logging.info("The commands-list you supplied is longer than self.slurm_max_arrayjobs. Your job will be subdivided into multiple arrays.")
for sublist in split_list(cmds, self.slurm_max_arrayjobs):
self.start(cmds=sublist, jobname=jobname, wait=wait, output_path=output_path)
return None
# write cmd-file
jobname = add_timestamp(jobname)
with open((cmdfile := f"{output_path}/{jobname}_cmds"), 'w', encoding="UTF-8") as f:
f.write("\n".join(cmds))
# write sbatch command and run
log_file = os.path.join(output_path, f"{jobname}_slurm.out")
error_file = os.path.join(output_path, f"{jobname}_slurm.err")
self.options += f" -vvv -e {error_file} -o {log_file} --open-mode=append"
sbatch_cmd = f'sbatch -a 1-{str(len(cmds))}%{str(self.max_cores)} -J {jobname} {self.options} --wrap "eval {chr(92)}`sed -n {chr(92)}${{SLURM_ARRAY_TASK_ID}}p {cmdfile}{chr(92)}`"'
# save last job name, e.g. for timer wrapper
self.last_job_name = jobname
with open(f"{output_path}/{jobname}_jobstarter.log", "w", encoding="UTF-8") as out_file:
# Run the sbatch command and direct both stdout and stderr to the log file
subprocess.run(sbatch_cmd, shell=True, stdout=out_file, stderr=out_file, check=True)
# wait for job and clean up
if wait:
self.wait_for_job(jobname)
if self.remove_cmdfile:
subprocess.run(f"rm {cmdfile}", shell=True, stdout=True, stderr=True, check=True)
self.set_last_error_message(error_file)
return None
def _use_bash(self, use_bash: bool) -> None:
'''Configure whether to use bash shell to execute commands or default.'''
self.bash = use_bash
[docs]
def parse_options(self, options: object) -> str:
"""
Parses the SBATCH options.
Parameters
----------
options : object
SBATCH options in string or list format.
Returns
-------
str
Parsed SBATCH options.
Raises
------
TypeError
If the options parameter is not a string or list.
"""
# parse options
if isinstance(options, list):
return " ".join(options)
if isinstance(options, str):
return options
if options is None:
return ""
raise TypeError(f"Unsupported type for argument options: {type(options)}. Supported types: [str, list]")
[docs]
def set_options(self, options: object, gpus: int) -> None:
"""
Sets the SBATCH options.
Parameters
----------
options : object
SBATCH options in string or list format.
gpus : int
Number of GPUs to be used per node.
"""
self.options = self.parse_options(options)
if gpus:
self.options += f"--gpus-per-node {gpus}"
[docs]
def wait_for_job(self, jobname: str, interval: float = 5) -> None:
"""
Waits for SLURM jobs to be finished.
Parameters
----------
jobname : str
Name of the job to wait for.
interval : float, optional
Interval (in seconds) at which to check the job status. Default is 5.
"""
# Check if job is running by capturing the length of the output of squeue command that only returns jobs with <jobname>:
job_output = subprocess.run(f'squeue -n {jobname} -o "%A"', shell=True, capture_output=True, text=True, check=False).stdout.strip().split("\n")
while len(job_output) > 1:
time.sleep(interval)
job_output = subprocess.run(f'squeue -n {jobname} -o "%A"', shell=True, capture_output=True, text=True, check=False).stdout.strip().split("\n")
logging.info(f"Job {jobname} completed.\n")
time.sleep(interval)
return None
[docs]
class LocalJobStarter(JobStarter):
"""
Jobstarter that runs jobs locally using subprocess.run().
This class extends the `JobStarter` base class to provide functionality for running jobs
locally on the machine. It handles the execution of commands using subprocesses, manages
the maximum number of concurrent processes, and captures the output and error logs for
each command.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is 1.
Raises
------
ProcessError
If a subprocess crashes during execution.
Examples
--------
Example usage:
>>> from jobstarters import LocalJobStarter
>>> job_starter = LocalJobStarter(max_cores=2)
>>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output")
"""
[docs]
def __init__(self, max_cores:int=1):
"""
Initializes the LocalJobStarter with an optional parameter for maximum cores.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is 1.
"""
super().__init__()
self.max_cores = max_cores
[docs]
def start(self, cmds: list, jobname: str, wait: bool = True, output_path: str = "./") -> None:
"""
Submits a list of commands to be run locally, managing the execution and logging of each command.
Parameters
----------
cmds : list
List of commands to be executed locally.
jobname : str
Name of the job.
wait : bool, optional
Whether to wait for all commands to complete before returning. Default is True.
output_path : str, optional
Path where output files should be stored. Default is None.
Raises
------
ProcessError
If a subprocess crashes during execution.
"""
def start_process(command, log_file, error_file):
# Open the file to capture output and error
with open(log_file, 'w', encoding="UTF-8") as out_file, \
open(error_file, 'w', encoding="UTF-8") as err_file:
# Start the process
process = subprocess.Popen(command, env=env, executable="/bin/bash", shell=True, stdout=out_file, stderr=err_file)
process.command = command # type: ignore ### giving process a custom attribute for later error tractability
return process
def update_active_processes(active_processes: list, error_file: str) -> list:
'''checks how many of the processes are active.
wait: wait for process to be finished when removing.'''
for process in active_processes: # [:] if copy is required
if process.poll() is not None: # process finished
returncode = process.wait()
if returncode != 0:
self.set_last_error_message(error_file)
raise ProcessError(f"Subprocess Crashed. Check last output log of Subprocess! Command: {process.command}")
active_processes.remove(process)
return active_processes
# collect environment context
env = os.environ.copy()
# write cmds to file:
cmdfile_path = f"{output_path}/{jobname}_cmds.txt"
with open(cmdfile_path, 'w', encoding='UTF-8') as f:
f.write("\n".join(cmds)+"\n")
# invert cmds to start from the top with .pop()
cmds = cmds[::-1]
# initialize job loop
active_processes = []
i = 0
while len(cmds) >= 1:
log_file = os.path.join(output_path, f"process_{str(i)}.out")
error_file = os.path.join(output_path, f"process_{str(i)}.err")
# first check if any processes need to be removed
while len(active_processes) >= self.max_cores:
update_active_processes(active_processes, error_file)
time.sleep(1) # avoid busy waiting
# setup process:
cmd = cmds.pop()
i += 1
# start
active_processes.append(start_process(cmd, log_file, error_file))
# wait for completion loop
while len(active_processes) != 0:
update_active_processes(active_processes, error_file)
time.sleep(1)
self.set_last_error_message(error_file)
return None
[docs]
def wait_for_job(self, jobname:str, interval:float) -> None:
"""
(No-op) Method for waiting for started jobs.
Parameters
----------
jobname : str
Name of the job to wait for.
interval : float
Interval (in seconds) at which to check the job status.
"""
return None
[docs]
def add_timestamp(x: str) -> str:
"""
Adds a unique timestamp to a string using the time library.
This function appends a unique timestamp to the given string. The timestamp is generated
using the current time, which ensures that the resulting string is unique in most cases.
The timestamp is added as a suffix, separated by an underscore.
Parameters
----------
x : str
The input string to which the timestamp will be added.
Returns
-------
str
The input string with a unique timestamp appended.
Examples
--------
>>> add_timestamp("jobname")
'jobname_1632417284'
Notes
-----
The timestamp is derived from the current time in seconds since the epoch, with the
fractional part of the seconds included to ensure higher precision and uniqueness.
"""
return "_".join([x, f"{str(time.time()).rsplit('.', maxsplit=1)[-1]}"])
[docs]
def split_list(input_list: list, element_length: int = None, n_sublists: int = None) -> list:
"""
Splits a list into nested sublists with specified lengths or number of sublists.
This function divides the input list into a nested list of sublists. The division can be
based on the maximum length of each sublist or the desired number of sublists. Only one
of the parameters, `element_length` or `n_sublists`, should be specified at a time.
Parameters
----------
input_list : list
The list to be split into sublists.
element_length : int, optional
The maximum length of each sublist. If specified, the input list will be split
into sublists each having up to `element_length` elements.
n_sublists : int, optional
The desired number of sublists. If specified, the input list will be divided into
`n_sublists` sublists.
Returns
-------
list
A nested list containing the sublists.
Raises
------
ValueError
If both `element_length` and `n_sublists` are specified or if neither is specified.
Examples
--------
Splitting a list into sublists of a specified maximum length:
>>> split_list([1, 2, 3, 4, 5, 6], element_length=2)
[[1, 2], [3, 4], [5, 6]]
Splitting a list into a specified number of sublists:
>>> split_list([1, 2, 3, 4, 5, 6], n_sublists=3)
[[1, 2], [3, 4], [5, 6]]
Notes
-----
- If `n_sublists` is specified and is greater than the length of the input list, the number
of sublists will be equal to the length of the input list.
- If neither `element_length` nor `n_sublists` is provided, or if both are provided,
a `ValueError` will be raised.
"""
# safety
if element_length and n_sublists:
raise ValueError("Only either element_length or n_sublists can be specified, but not both!")
if not element_length and not n_sublists:
raise ValueError("At least one of arguments 'element_length or n_sublists has to be given!")
# handling n_sublists
if n_sublists:
# if initial list is shorter than n_sublists, take minimum value
split_n = min([n_sublists, len(input_list)])
return [list(x) for x in np.array_split(input_list, int(split_n))]
# handling element_length
result = []
iterator = iter(input_list)
while True:
sublist = list(itertools.islice(iterator, element_length))
if not sublist:
break
result.append(sublist)
return result
[docs]
def get_SLURM_stats(job_name, start_time=None):
"""
Query ``sacct`` and return aggregated resource statistics for a SLURM job array.
Shells out to the SLURM ``sacct`` command to retrieve per-task timing
and CPU accounting data for all tasks in the array job identified by
*job_name*. The raw per-task records are aggregated into a single
summary dictionary, which is returned to the caller.
.. warning::
This function must be called from the **cluster login node** or
another host that has the ``sacct`` binary in its ``PATH`` and access
to SLURM's accounting database. Calling it from within a compute-node
job step (e.g. inside a running SLURM batch script) will fail because
``sacct`` is not available on compute nodes.
Parameters
----------
job_name : str
The SLURM job name to query (passed to ``sacct --name``). This
corresponds to the ``jobname`` argument supplied to
:meth:`~protflow.jobstarters.SbatchArrayJobstarter.start` and is
stored in
:attr:`~protflow.jobstarters.SbatchArrayJobstarter.last_job_name`
after each submission.
start_time : str, optional
ISO-8601 datetime string (``YYYY-MM-DDTHH:MM:SS``) passed to
``sacct --starttime`` to restrict results to jobs that began at or
after this timestamp. When omitted, ``sacct`` returns all matching
records regardless of age, which may cause false matches against
stale jobs with the same name from earlier sessions. It is strongly
recommended to pass the :attr:`~SbatchArrayRunnerTimer.session_start`
attribute of the enclosing :class:`SbatchArrayRunnerTimer` to avoid
this.
Returns
-------
dict
A dictionary containing aggregated statistics.
**On success**, keys include:
job_name : str
The *job_name* argument echoed back.
total_cpu_sec : int
Sum of ``CPUTimeRaw`` across all tasks.
avg_task_runtime_sec : float
Mean wall-clock elapsed time per task in seconds (2 decimal places).
max_task_runtime_sec : int
Wall-clock elapsed time of the longest-running task.
min_task_runtime_sec : int
Wall-clock elapsed time of the shortest-running task.
num_tasks : int
Total number of task records returned.
total_cpus_reserved : int
Sum of ``AllocCPUS`` across all tasks.
state : str
``"COMPLETED"`` or ``"MIXED (<states>)"``.
queried_after : str or None
The *start_time* argument echoed back.
**On failure**, keys include:
job_name : str
The *job_name* argument echoed back.
error : str
Human-readable description of the failure.
Raises
------
None
This function does **not** propagate exceptions. All errors are
caught and returned as a dictionary with an ``error`` key.
Notes
-----
* The ``sacct`` command is invoked with ``-X`` (suppress sub-step
records), ``--format JobName,ElapsedRaw,CPUTimeRaw,AllocCPUS,State``,
``-n`` (no header), and ``-P`` (pipe-delimited output). The
resulting fields are parsed by position.
* ``ElapsedRaw`` is SLURM's wall-clock elapsed time for each individual
task in seconds; ``CPUTimeRaw`` is ``ElapsedRaw × AllocCPUS`` and
reflects total CPU-core-seconds reserved (not necessarily consumed).
* The command is executed as a shell string (``shell=True``) so that
``--starttime`` and other arguments with special characters are handled
correctly by the system shell.
* Empty or whitespace-only lines in ``sacct``'s stdout are filtered
before parsing.
* The ``state`` aggregation logic is strict: ``"COMPLETED"`` is only
returned when **every** task's state is exactly ``"COMPLETED"``
(set equality). A single failed or cancelled task will produce a
``"MIXED"`` state.
Examples
--------
Query statistics for a recently submitted job::
from protflow.jobstarters import get_SLURM_stats
stats = get_SLURM_stats("caliby_seqdes", start_time="2025-06-01T12:00:00")
print(stats)
"""
# Base command
cmd = [
"sacct",
"--name", job_name,
"-X",
"--format", "JobName,ElapsedRaw,CPUTimeRaw,AllocCPUS,State",
"-n", "-P"
]
# Add starttime if provided
if start_time:
cmd += ["--starttime", start_time]
cmd = " ".join(cmd)
try:
# run sacct command on the shell
res = subprocess.run(cmd, capture_output=True, text=True, check=True, shell=True)
lines = [l for l in res.stdout.strip().split('\n') if l]
if not lines:
return {"job_name": job_name, "error": f"No records found since {start_time or 'beginning'}"}
# Initialize aggregators
total_cpu_sec = 0
task_runtimes = []
states = set()
total_cpus_allocated = 0
for line in lines:
p = line.split('|')
# p[1] = Elapsed (Wall-clock for this specific task)
# p[2] = CPUTime (Elapsed * AllocCPUS for this task)
# p[3] = AllocCPUS
elapsed = int(p[1]) if p[1] else 0
cpu_raw = int(p[2]) if p[2] else 0
cpus = int(p[3]) if p[3] else 1
total_cpu_sec += cpu_raw
total_cpus_allocated += cpus
task_runtimes.append(elapsed)
states.add(p[4])
# Logic for final state: If everything is COMPLETED, return COMPLETED.
# Otherwise, list the unique states (e.g., "COMPLETED, FAILED")
final_state = "COMPLETED" if states == {"COMPLETED"} else f"MIXED ({', '.join(states)})"
return {
"job_name": job_name,
"total_cpu_sec": total_cpu_sec,
"avg_task_runtime_sec": round(sum(task_runtimes) / len(task_runtimes), 2) if task_runtimes else 0,
"max_task_runtime_sec": max(task_runtimes) if task_runtimes else 0,
"min_task_runtime_sec": min(task_runtimes) if task_runtimes else 0,
"num_tasks": len(task_runtimes),
"total_cpus_reserved": total_cpus_allocated,
"state": final_state,
"queried_after": start_time
}
except subprocess.CalledProcessError as e:
return {"job_name": job_name, "error": f"SLURM Error: {e.stderr}"}
except Exception as e:
return {"job_name": job_name, "error": str(e)}