"""
jobstarters
===========
This module, `jobstarters`, provides a set of classes and methods to facilitate the submission and management
of computing jobs on various job scheduling systems. JobStarters are passed to Runner objects in their .run()
methods to facilitate a standardized execution of commands generated by the Runner. JobStarters can also be
executed outside of Runner classes as is shown in the examples.
The JobStarter class defines a base `JobStarter` class with methods that need to be implemented by subclasses
to start jobs and wait for their completion.
Overview
--------
The module includes the following classes and methods:
Classes
-------
- `JobStarter`: An abstract base class that defines the interface for all jobstarters.
- `SbatchArrayJobstarter`: A concrete implementation of `JobStarter` for managing SLURM job arrays.
- `LocalJobStarter`: A concrete implementation of `JobStarter` for managing local jobs.
Usage
-----
To use a jobstarter, instantiate an appropriate subclass (e.g., `SbatchArrayJobstarter`) and call its `start` method with the desired commands and options. Use the `wait_for_job` method if you need to wait for job completion.
Example:
>>> from jobstarters import SbatchArrayJobstarter
>>> job_starter = SbatchArrayJobstarter(max_cores=50, remove_cmdfile=True)
>>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output")
Note
----
This module is designed to be extended with additional jobstarters for different scheduling systems as needed. If you want to implement your own JobStarter and need assistance, please contact any of the authors of ProtFlow for assistance. We are happy about every contribution!
"""
# imports
import os
import time
import logging
import itertools
import subprocess
from multiprocessing import ProcessError
# dependencies
import numpy as np
[docs]
class JobStarter:
"""
Abstract base class for job starters.
This class defines the interface for all job starters. Subclasses should implement methods
to start jobs and wait for their completion. It also includes a method to set the maximum
number of cores available for the jobs.
Examples
--------
This class is designed to be extended by other classes that implement specific job
scheduling systems.
Example subclass implementation:
.. code-block:: python
class CustomJobStarter(JobStarter):
def start(self, cmds, jobname, wait, output_path):
# Implementation for starting jobs
pass
def wait_for_job(self, jobname, interval):
# Implementation for waiting for job completion
pass
"""
[docs]
def __init__(self, max_cores: int = None):
"""
Initializes the JobStarter with an optional maximum number of cores.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is None.
"""
self.max_cores = max_cores
self.last_error_message = None
[docs]
def start(self, cmds: list, jobname: str, wait: bool, output_path: str) -> None:
"""
Submits a list of commands as jobs to the scheduling system.
Parameters
----------
cmds : list
A list of commands to be submitted as jobs.
jobname : str
The name of the job.
wait : bool
Whether to wait for the job to complete before proceeding.
output_path : str
The path where output files should be stored.
Raises
------
NotImplementedError
If this method is not implemented in a subclass.
"""
raise NotImplementedError("Jobstarter 'start' function was not overwritten!")
[docs]
def wait_for_job(self, jobname: str, interval: float) -> None:
"""
Waits for a job to complete before proceeding.
Parameters
----------
jobname : str
The name of the job to wait for.
interval : float
The interval (in seconds) at which to check the job status.
Raises
------
NotImplementedError
If this method is not implemented in a subclass.
"""
raise NotImplementedError("Jobstarter 'wait_for_job' function was not overwritten!")
[docs]
def set_max_cores(self, cores: int) -> None:
"""
Sets the maximum number of cores available for the jobs.
Parameters
----------
cores : int
The maximum number of cores to set.
"""
self.max_cores = cores
[docs]
def set_last_error_message(self, error_path:str, read_bytes:int=16384):
"""
Saves content of an error logfile.
Parameters
----------
error_path : str
The path to the error logfile.
read_bytes : int, optional
Defines how many bytes of the log file should be read (starting from the back). Default is 8192.
"""
if os.path.isfile(error_path):
with open(error_path, 'rb') as f:
f.seek(0, 2) # go to end
filesize = f.tell()
f.seek(max(0, filesize - read_bytes)) # jump back some bytes
data = f.read().decode("utf-8", errors="ignore")
if filesize > read_bytes:
data = f"Error message truncated. See full output at {os.path.abspath(error_path)}.\n{data}"
self.last_error_message = data
else:
self.last_error_message = None
[docs]
class SbatchArrayJobstarter(JobStarter):
"""
Jobstarter that manages the submission of job arrays to SLURM clusters.
This class extends the `JobStarter` base class to provide functionality specific to SLURM job arrays.
It handles tasks such as generating command files, submitting jobs using `sbatch`, and waiting for job
completion. It also supports options for GPU usage and automatic cleanup of command files after job
completion.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is 100.
remove_cmdfile : bool, optional
Whether to remove the command file after job completion. Default is False.
options : str, optional
Additional SBATCH options to be used when submitting jobs. Default is None.
gpus : bool, optional
Whether to use GPUs for the job. Default is False.
Raises
------
TypeError
If the options parameter is not a string or list.
Examples
--------
Example usage:
>>> from jobstarters import SbatchArrayJobstarter
>>> job_starter = SbatchArrayJobstarter(max_cores=50, remove_cmdfile=True, options="--time=10:00", gpus=True)
>>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output")
"""
[docs]
def __init__(self, max_cores: int = 100, remove_cmdfile: bool = False, options: str = None, gpus: bool = False, batch_cmds: int = None):
"""
Initializes the SbatchArrayJobstarter with optional parameters.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is 100.
remove_cmdfile : bool, optional
Whether to remove the command file after job completion. Default is False.
options : str, optional
Additional SBATCH options to be used when submitting jobs. Default is None.
gpus : bool, optional
Whether to use GPUs for the job. Default is False.
batch_cmds : bool, optional
Whether to batch the input cmds to the specified number. Default is None.
Note
----
The options parameter must be set when the Jobstarter is created, not when the `.start` function is executed.
"""
super().__init__() # runs init-function of parent class (JobStarter)
self.max_cores = max_cores
self.remove_cmdfile = remove_cmdfile
self.batch_cmds = batch_cmds
self.set_options(options, gpus=gpus)
self.bash = False
# static attribute, can be changed depending on slurm settings:
self.slurm_max_arrayjobs = 1000
[docs]
def start(self, cmds: list, jobname: str, wait: bool = True, output_path: str = "./", batch_cmds: int = None) -> None:
"""
Writes commands into a command file and starts an SBATCH job running the command file.
Parameters
----------
cmds : list
List of commands to be executed as part of the job array.
jobname : str
Name of the job.
wait : bool, optional
Whether to wait for the job to complete before returning. Default is True.
output_path : str, optional
Path where output files should be stored. Default is "./".
batch_cmds : bool, optional
Whether to batch the input cmds to the specified number. Default is None.
Raises
------
RuntimeError
If the SLURM submission fails.
"""
if self.bash:
cmds = [f'/bin/bash -c "{cmd}"' for cmd in cmds]
# batch input cmds to number of available cores if specified
batch_cmds = batch_cmds or self.batch_cmds
if batch_cmds and len(cmds) > batch_cmds:
cmds = ["; ".join(sublist) for sublist in split_list(cmds, n_sublists=batch_cmds)]
# check if cmds is smaller than 1000. If yes, split cmds and start split array!
if len(cmds) > self.slurm_max_arrayjobs:
logging.info("The commands-list you supplied is longer than self.slurm_max_arrayjobs. Your job will be subdivided into multiple arrays.")
for sublist in split_list(cmds, self.slurm_max_arrayjobs):
self.start(cmds=sublist, jobname=jobname, wait=wait, output_path=output_path)
return None
# write cmd-file
jobname = add_timestamp(jobname)
with open((cmdfile := f"{output_path}/{jobname}_cmds"), 'w', encoding="UTF-8") as f:
f.write("\n".join(cmds))
# write sbatch command and run
log_file = os.path.join(output_path, f"{jobname}_slurm.out")
error_file = os.path.join(output_path, f"{jobname}_slurm.err")
self.options += f" -vvv -e {error_file} -o {log_file} --open-mode=append"
sbatch_cmd = f'sbatch -a 1-{str(len(cmds))}%{str(self.max_cores)} -J {jobname} {self.options} --wrap "eval {chr(92)}`sed -n {chr(92)}${{SLURM_ARRAY_TASK_ID}}p {cmdfile}{chr(92)}`"'
with open(f"{output_path}/{jobname}_jobstarter.log", "w", encoding="UTF-8") as out_file:
# Run the sbatch command and direct both stdout and stderr to the log file
subprocess.run(sbatch_cmd, shell=True, stdout=out_file, stderr=out_file, check=True)
# wait for job and clean up
if wait:
self.wait_for_job(jobname)
if self.remove_cmdfile:
subprocess.run(f"rm {cmdfile}", shell=True, stdout=True, stderr=True, check=True)
self.set_last_error_message(error_file)
return None
def _use_bash(self, use_bash: bool) -> None:
'''Configure whether to use bash shell to execute commands or default.'''
self.bash = use_bash
[docs]
def parse_options(self, options: object) -> str:
"""
Parses the SBATCH options.
Parameters
----------
options : object
SBATCH options in string or list format.
Returns
-------
str
Parsed SBATCH options.
Raises
------
TypeError
If the options parameter is not a string or list.
"""
# parse options
if isinstance(options, list):
return " ".join(options)
if isinstance(options, str):
return options
if options is None:
return ""
raise TypeError(f"Unsupported type for argument options: {type(options)}. Supported types: [str, list]")
[docs]
def set_options(self, options: object, gpus: int) -> None:
"""
Sets the SBATCH options.
Parameters
----------
options : object
SBATCH options in string or list format.
gpus : int
Number of GPUs to be used per node.
"""
self.options = self.parse_options(options)
if gpus:
self.options += f"--gpus-per-node {gpus}"
[docs]
def wait_for_job(self, jobname: str, interval: float = 5) -> None:
"""
Waits for SLURM jobs to be finished.
Parameters
----------
jobname : str
Name of the job to wait for.
interval : float, optional
Interval (in seconds) at which to check the job status. Default is 5.
"""
# Check if job is running by capturing the length of the output of squeue command that only returns jobs with <jobname>:
job_output = subprocess.run(f'squeue -n {jobname} -o "%A"', shell=True, capture_output=True, text=True, check=False).stdout.strip().split("\n")
while len(job_output) > 1:
time.sleep(interval)
job_output = subprocess.run(f'squeue -n {jobname} -o "%A"', shell=True, capture_output=True, text=True, check=False).stdout.strip().split("\n")
logging.info(f"Job {jobname} completed.\n")
time.sleep(interval)
return None
[docs]
class LocalJobStarter(JobStarter):
"""
Jobstarter that runs jobs locally using subprocess.run().
This class extends the `JobStarter` base class to provide functionality for running jobs
locally on the machine. It handles the execution of commands using subprocesses, manages
the maximum number of concurrent processes, and captures the output and error logs for
each command.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is 1.
Raises
------
ProcessError
If a subprocess crashes during execution.
Examples
--------
Example usage:
>>> from jobstarters import LocalJobStarter
>>> job_starter = LocalJobStarter(max_cores=2)
>>> job_starter.start(cmds=["echo 'Hello World!'"], jobname="test_job", wait=True, output_path="/path/to/output")
"""
[docs]
def __init__(self, max_cores:int=1):
"""
Initializes the LocalJobStarter with an optional parameter for maximum cores.
Parameters
----------
max_cores : int, optional
The maximum number of cores that can be used for the jobs. Default is 1.
"""
super().__init__()
self.max_cores = max_cores
[docs]
def start(self, cmds: list, jobname: str, wait: bool = True, output_path: str = "./") -> None:
"""
Submits a list of commands to be run locally, managing the execution and logging of each command.
Parameters
----------
cmds : list
List of commands to be executed locally.
jobname : str
Name of the job.
wait : bool, optional
Whether to wait for all commands to complete before returning. Default is True.
output_path : str, optional
Path where output files should be stored. Default is None.
Raises
------
ProcessError
If a subprocess crashes during execution.
"""
def start_process(command, log_file, error_file):
# Open the file to capture output and error
with open(log_file, 'w', encoding="UTF-8") as out_file, \
open(error_file, 'w', encoding="UTF-8") as err_file:
# Start the process
process = subprocess.Popen(command, env=env, executable="/bin/bash", shell=True, stdout=out_file, stderr=err_file)
process.command = command # type: ignore ### giving process a custom attribute for later error tractability
return process
def update_active_processes(active_processes: list, error_file: str) -> list:
'''checks how many of the processes are active.
wait: wait for process to be finished when removing.'''
for process in active_processes: # [:] if copy is required
if process.poll() is not None: # process finished
returncode = process.wait()
if returncode != 0:
self.set_last_error_message(error_file)
raise ProcessError(f"Subprocess Crashed. Check last output log of Subprocess! Command: {process.command}")
active_processes.remove(process)
return active_processes
# collect environment context
env = os.environ.copy()
# write cmds to file:
cmdfile_path = f"{output_path}/{jobname}_cmds.txt"
with open(cmdfile_path, 'w', encoding='UTF-8') as f:
f.write("\n".join(cmds)+"\n")
# invert cmds to start from the top with .pop()
cmds = cmds[::-1]
# initialize job loop
active_processes = []
i = 0
while len(cmds) >= 1:
log_file = os.path.join(output_path, f"process_{str(i)}.out")
error_file = os.path.join(output_path, f"process_{str(i)}.err")
# first check if any processes need to be removed
while len(active_processes) >= self.max_cores:
update_active_processes(active_processes, error_file)
time.sleep(1) # avoid busy waiting
# setup process:
cmd = cmds.pop()
i += 1
# start
active_processes.append(start_process(cmd, log_file, error_file))
# wait for completion loop
while len(active_processes) != 0:
update_active_processes(active_processes, error_file)
time.sleep(1)
self.set_last_error_message(error_file)
return None
[docs]
def wait_for_job(self, jobname:str, interval:float) -> None:
"""
(No-op) Method for waiting for started jobs.
Parameters
----------
jobname : str
Name of the job to wait for.
interval : float
Interval (in seconds) at which to check the job status.
"""
return None
[docs]
def add_timestamp(x: str) -> str:
"""
Adds a unique timestamp to a string using the time library.
This function appends a unique timestamp to the given string. The timestamp is generated
using the current time, which ensures that the resulting string is unique in most cases.
The timestamp is added as a suffix, separated by an underscore.
Parameters
----------
x : str
The input string to which the timestamp will be added.
Returns
-------
str
The input string with a unique timestamp appended.
Examples
--------
>>> add_timestamp("jobname")
'jobname_1632417284'
Notes
-----
The timestamp is derived from the current time in seconds since the epoch, with the
fractional part of the seconds included to ensure higher precision and uniqueness.
"""
return "_".join([x, f"{str(time.time()).rsplit('.', maxsplit=1)[-1]}"])
[docs]
def split_list(input_list: list, element_length: int = None, n_sublists: int = None) -> list:
"""
Splits a list into nested sublists with specified lengths or number of sublists.
This function divides the input list into a nested list of sublists. The division can be
based on the maximum length of each sublist or the desired number of sublists. Only one
of the parameters, `element_length` or `n_sublists`, should be specified at a time.
Parameters
----------
input_list : list
The list to be split into sublists.
element_length : int, optional
The maximum length of each sublist. If specified, the input list will be split
into sublists each having up to `element_length` elements.
n_sublists : int, optional
The desired number of sublists. If specified, the input list will be divided into
`n_sublists` sublists.
Returns
-------
list
A nested list containing the sublists.
Raises
------
ValueError
If both `element_length` and `n_sublists` are specified or if neither is specified.
Examples
--------
Splitting a list into sublists of a specified maximum length:
>>> split_list([1, 2, 3, 4, 5, 6], element_length=2)
[[1, 2], [3, 4], [5, 6]]
Splitting a list into a specified number of sublists:
>>> split_list([1, 2, 3, 4, 5, 6], n_sublists=3)
[[1, 2], [3, 4], [5, 6]]
Notes
-----
- If `n_sublists` is specified and is greater than the length of the input list, the number
of sublists will be equal to the length of the input list.
- If neither `element_length` nor `n_sublists` is provided, or if both are provided,
a `ValueError` will be raised.
"""
# safety
if element_length and n_sublists:
raise ValueError("Only either element_length or n_sublists can be specified, but not both!")
if not element_length and not n_sublists:
raise ValueError("At least one of arguments 'element_length or n_sublists has to be given!")
# handling n_sublists
if n_sublists:
# if initial list is shorter than n_sublists, take minimum value
split_n = min([n_sublists, len(input_list)])
return [list(x) for x in np.array_split(input_list, int(split_n))]
# handling element_length
result = []
iterator = iter(input_list)
while True:
sublist = list(itertools.islice(iterator, element_length))
if not sublist:
break
result.append(sublist)
return result