Source code for protflow.tools.boltz

"""
ProtFlow runner for Boltz.

This module provides a high-level `Boltz` runner that:
(1) prepares Boltz-compatible YAML inputs from sequences or structures,
(2) composes command lines from global and pose-specific options,
(3) distributes inference across available cores via a `JobStarter`,
and (4) aggregates Boltz outputs (confidence, affinity, NPZ artifacts) into a
single score table for downstream orchestration.

The typical workflow is:

1. Ensure paths and environment hooks for Boltz are configured
   (see Notes on `BOLTZ_PATH`, `BOLTZ_PYTHON`, `BOLTZ_PRE_CMD`).
2. Provide inputs as a `Poses` collection (FASTA, PDB/CIF, or already
   Boltz-formatted YAML). If needed, convert to YAML with
   `convert_poses_to_boltz_yaml`.
3. Call `Boltz.run(...)` with command-line `options` and optional
   `pose_options` to fan-out runs.
4. Consume the returned `Poses` object whose `.df` is augmented with a
   per-model score table and file locations of produced artifacts.

Notes
-----
- Configuration keys
  The runner reads its defaults from ProtFlow’s config via:
  `BOLTZ_PATH` (path to the `boltz` CLI entry point or module),
  `BOLTZ_PYTHON` (interpreter used to invoke Boltz), and
  `BOLTZ_PRE_CMD` (shell prefix such as environment activation).
  Use `protflow.config` utilities to set these once per environment.
- MSA handling
  Boltz can run with an empty MSA or fetch MSAs from a server. The runner
  exposes `msa_setting` to steer YAML content (`"empty"` vs `"server"`),
  while the CLI switch `--use_msa_server` remains the source of truth for
  server fetching. See `Boltz._parse_msa_setting` and
  `convert_chain_seq_dict_to_yaml_dict`.

Examples
--------
Run Boltz on a batch of structures, writing outputs to a fresh work directory
and collecting scores:

>>> from protflow.runners.boltz import Boltz
>>> from protflow.poses import Poses
>>> poses = Poses(
...     files=["A.pdb", "B.pdb", "C.pdb"],
...     work_dir="work/boltz_demo"
... )
>>> runner = Boltz()  # uses config defaults (BOLTZ_PATH/PYTHON/PRE_CMD)
>>> poses = runner.run(
...     poses=poses,
...     prefix="boltz_run",
...     options="--num_samples 4 --use_msa_server",
...     overwrite=False,
... )
>>> poses.df.columns[:8]  # score columns will include confidence & file paths
...

"""
# generals
import os
import json
import shutil
import logging
from glob import glob
from pathlib import Path

# dependencies
import yaml
import pandas as pd

# custom
from ..poses import Poses, get_format
from .. import load_config_path, require_config
from ..jobstarters import JobStarter, split_list
from ..runners import Runner, RunnerOutput, parse_generic_options, options_flags_to_string
from ..utils.biopython_tools import load_sequence_from_fasta, get_sequence_from_pose, biopython_load_protein

[docs] class Boltz(Runner): """ The Boltz runner prepares inputs (optionally batching by core), assembles Boltz commands, dispatches them via a `JobStarter`, and aggregates results into a unified score file stored in the run directory. Parameters ---------- boltz_path : str, optional Executable or module path used with `predict` subcommand. If not provided, loaded from `BOLTZ_PATH` in the ProtFlow config. boltz_python : str, optional Python interpreter used to execute Boltz. Defaults to `BOLTZ_PYTHON` from the ProtFlow config. pre_cmd : str, optional Shell prefix prepended to each command. Use this to activate environments or modules (e.g., `conda activate boltz`). If omitted, taken from `BOLTZ_PRE_CMD` in the ProtFlow config. jobstarter : JobStarter, optional Default jobstarter to use if none is provided to `run()`. Attributes ---------- name : str Fixed runner name: `"Boltz"`. index_layers : int Number of index layers used when merging outputs (defaults to 2). jobstarter : JobStarter or None Optional default jobstarter stored on the runner instance. boltz_path : str Resolved Boltz executable/module path. boltz_python : str Resolved interpreter path. pre_cmd : str Resolved shell prefix (may be empty). Notes ----- - Score caching If a score file already exists for the given `prefix` and format and `overwrite` is `False` (and `--override` not present in `options`), existing results are returned without re-running Boltz. - Batching behavior If `pose_options` are *not* provided, inputs are automatically split into at most `jobstarter.max_cores` batches to improve throughput. Examples -------- Minimal run with default configuration, batched across cores: >>> runner = Boltz() >>> poses = runner.run( ... poses, prefix="demo", ... options="--num_samples 2 --use_msa_server" ... ) """
[docs] def __init__(self, boltz_path: str = None, boltz_python: str = None, pre_cmd: str = None, jobstarter: JobStarter = None): """ Initialize the Boltz runner and resolve configuration. Parameters ---------- boltz_path : str, optional Path to the Boltz program or module (with `predict` subcommand). Defaults to `BOLTZ_PATH` from ProtFlow config. boltz_python : str, optional Interpreter to call Boltz with. Defaults to `BOLTZ_PYTHON`. pre_cmd : str, optional Shell prefix (e.g., environment activation). Defaults to `BOLTZ_PRE_CMD`. jobstarter : JobStarter, optional Default jobstarter to use when `run(jobstarter=None)`. Raises ------ KeyError If required configuration keys are missing from the ProtFlow config. """ config = require_config() self.boltz_path = boltz_path or load_config_path(config, "BOLTZ_PATH") self.boltz_python = boltz_python or load_config_path(config, "BOLTZ_PYTHON") self.pre_cmd = pre_cmd or load_config_path(config, "BOLTZ_PRE_CMD", is_pre_cmd=True) self.name = "Boltz" self.index_layers = 2 # boltz can output many samples. We will always add index layers to reduce code complexity self.jobstarter = jobstarter
[docs] def __str__(self): """ String representation. Returns ------- str The literal string ``"Boltz"``. """ return "Boltz"
def _parse_msa_setting(self, options: str, msa_setting: list[str]) -> str: """ Normalize/resolve the MSA strategy used for YAML generation. The runner allows two MSA modes in the produced pose YAMLs: - ``"empty"``: write ``msa: empty`` for each chain. - ``"server"``: also write ``msa: empty``, but *expect* the CLI option ``--use_msa_server`` to instruct Boltz to fetch MSAs during runtime. Resolution order: 1) If `msa_setting` is provided, it must be one of ``{"server", "empty", None}`` and takes precedence. 2) Otherwise, if `"--use_msa_server"` appears in `options`, return ``"server"``. 3) Else default to ``"empty"``. Parameters ---------- options : str Command-line options that will be passed to Boltz. msa_setting : str Desired YAML MSA mode or an empty/None value to auto-detect. Returns ------- str Either ``"server"`` or ``"empty"``. Warns ----- UserWarning If `msa_setting == "empty"` while `"--use_msa_server"` is present in `options`, since those choices conflict and could surprise users at execution time. Raises ------ ValueError If `msa_setting` is neither ``"server"``, ``"empty"``, nor `None`. """ # raise warning! if msa_setting == "empty" and "--use_msa_server" in options: logging.warning("msa_setting was set to :empty: while --use_msa_server was in options. This will lead to unexpected behavior.") # msa_setting has priority if msa_setting: allowed_settings = {"server", "empty", None} if msa_setting not in allowed_settings: raise ValueError(f"paramter :msa_setting: can be only one of {allowed_settings}! Your setting: {msa_setting}") return msa_setting # check in options if "--use_msa_server" in options: msa_setting = "server" else: msa_setting = "empty" return msa_setting def _parse_options(self, poses: Poses, options: str, pose_options: str|list[str], max_cores: int, out_dir: str, overwrite: bool = False) -> list[str]: '''Internal helper to parse options for boltz. Construct one or more fully-formed option strings for Boltz. If `pose_options` are supplied (string or list of strings), the runner expands them per input pose. Otherwise, a single options string is replicated across batches (up to `max_cores`) to enable parallel runs. In all cases, the output directory (`out_dir`) is injected into the parsed options, and the presence of `overwrite=True` appends the flag `--override` if it was not already present. Parameters ---------- poses : Poses Input poses collection (used when mapping pose-level options). options : str Global CLI options (e.g., ``"--num_samples 4 --use_msa_server"``). pose_options : str or list of str Pose-specific overrides, templated for a given pose (handled by `prep_pose_options`). If provided, batching is disabled. max_cores : int Maximum number of concurrent batches (via `JobStarter`). out_dir : str Directory where Boltz should write outputs for this run. overwrite : bool, optional If `True`, ensure `--override` is present in the options. Returns ------- list of str One options string per Boltz command to be executed. Raises ------ ValueError If `pose_options` expansion fails or options cannot be parsed. ''' if pose_options: # parse pose-specific options pose_options = self.prep_pose_options(poses, pose_options) parsed_options_raw = [parse_generic_options(options, pose_option) for pose_option in pose_options] # add out_dir to opts for opts_dict, flags in parsed_options_raw: opts_dict["out_dir"] = out_dir if overwrite and "override" not in flags: flags.append("override") # recompile options strings. parsed_options = [options_flags_to_string(opts, flags, sep="--", no_quotes=False) for opts, flags in parsed_options_raw] # if no pose_options were given, predictions can be batched for faster inference. else: # create options for batched inputs options_raw = parse_generic_options(options=options, pose_options=None, sep="--") # keep cmd-opts in quotes (if needed) # add out_dir to opts options_raw[0]["out_dir"] = out_dir if overwrite and "override" not in options_raw[1]: options_raw[1].append("override") options_raw = options_flags_to_string(*options_raw, sep="--", no_quotes=False) # one options string per input batch parsed_options = [options_raw for _ in range(max_cores)] # output return parsed_options def _parse_poses(self, poses: Poses, pose_options: str|list[str], work_dir: str, max_cores: int) -> list[str]: '''helper function to parse poses for batch processing. Determine Boltz input units (per pose vs. per batch subfolder). When `pose_options` are provided, Boltz consumes each pose file directly. Otherwise, the runner creates up to `max_cores` batch subdirectories under ``{work_dir}/batch_inputs/batch_XXXX/`` and copies a partition of pose files into each to improve throughput. Parameters ---------- poses : Poses The input collection (its `.poses_list()` is consulted). pose_options : str or list of str Presence disables batching; absence enables batching. work_dir : str Working directory for this run (batch subfolders are created here). max_cores : int Number of batch buckets to create at most. Returns ------- list of str Either a list of individual pose file paths or batch directories. ''' if pose_options: # parse poses boltz_inputs = poses.poses_list() else: # batch input files into number of maximum specified cores: logging.info("Pose options not specified. Running in batch mode.") poses_sublists = split_list(poses.poses_list(), n_sublists=max_cores) # create input dirs and move sublist input files there boltz_inputs = [] for i, pose_sublist in enumerate(poses_sublists, start=1): # create subdir for batched inputs subdir_name = os.path.join(work_dir, "batch_inputs", f"batch_{str(i).zfill(4)}") os.makedirs(subdir_name, exist_ok=True) # copy poses in batch folders for pose in pose_sublist: shutil.copy(pose, subdir_name) # add to boltz input_list boltz_inputs.append(subdir_name) return boltz_inputs def _write_cmds(self, boltz_inputs: list[str], parsed_options: list[str]) -> list[str]: ''' Compose Boltz command strings from resolved inputs and options. Each command is of the form: ``{pre_cmd} {boltz_python} {boltz_path} predict {input} {options}`` Parameters ---------- boltz_inputs : list of str Per-command input path (individual YAML or batch directory). parsed_options : list of str Per-command options string as produced by `_parse_options`. Returns ------- list of str Shell commands ready to be dispatched via `JobStarter.start()`. ''' cmd_list = [ f"{self.pre_cmd} {self.boltz_python} {self.boltz_path} predict {input_fn} {parsed_options}".strip() for input_fn, parsed_options in zip(boltz_inputs, parsed_options) ] return cmd_list
[docs] def run( self, poses: Poses, prefix: str, jobstarter: JobStarter = None, options: str = None, pose_options: str|list[str] = None, params: "BoltzParams" = None, overwrite: bool = False, msa_setting: str = "" ) -> Poses: ''' Execute Boltz on the given `poses` and collect results. The runner prepares inputs (converting to Boltz YAML if needed), resolves MSA behavior, optionally augments pose YAMLs using a provided `BoltzParams` object, dispatches the commands via `JobStarter`, then aggregates prediction confidence/affinity scores and artifact paths into a DataFrame saved as ``{prefix}/{name}_scores.{storage_format}``. Parameters ---------- poses : Poses Input poses. Has to be protflow.poses.Poses class with poses in FASTA, PDB/CIF, or Boltz YAML; if not YAML, they are converted with `convert_poses_to_boltz_yaml`. prefix : str Run prefix / subdirectory under `poses.work_dir`. Boltz outputs will be stored in {poses.work_dir}/{prefix}/output jobstarter : JobStarter, optional Overrides the runner’s default jobstarter. If omitted, the runner tries, in order: the provided value, the instance default, and `poses.default_jobstarter`. options : str, optional Global CLI options for Boltz (e.g., ``"--num_samples 8"``, ``"--use_msa_server"``). pose_options : str or list of str, optional Pose-specific option template(s); if provided, disables batching. params : BoltzParams, optional If given, used to *modify* or *extend* per-pose YAMLs (e.g., sequences, ligands, constraints, templates, properties) before running. Files are emitted under ``{prefix}/boltz_inputs/``. overwrite : bool, optional If `True` (or if `--override` is present in `options`), re-run even if a scorefile already exists. msa_setting : str, optional One of ``{"server", "empty", ""}``. Empty/None means auto-resolve based on `options` (presence of `--use_msa_server`). Returns ------- Poses The original `Poses` with results merged and indices layered. Artifacts (models, NPZs) are recorded as path columns. Raises ------ RuntimeError If Boltz finishes without producing any scores. TypeError If inputs cannot be converted to Boltz YAML (unsupported formats). Examples -------- Convert PDBs to YAML, add a ligand, and run with 4 samples per pose: >>> from protflow.runners.boltz import Boltz >>> from protflow.runners.boltz import BoltzParams >>> params = BoltzParams() >>> params.add_ligand(ligand="CC(=O)O", id="LIG", ligand_type="smiles") >>> runner = Boltz() >>> poses = runner.run( ... poses=poses, ... prefix="boltz_with_ligand", ... params=params, ... options="--num_samples 4", ... overwrite=True ... ) Notes ----- - Score caching: if a prior score file exists and neither `overwrite` nor `--override` is set, the runner returns cached results to save time. - Batching: when `pose_options` is absent, inputs are partitioned into at most `jobstarter.max_cores` batch folders to parallelize runs. - Artifacts: columns like ``plddt_location``, ``pae_location``, and ``pde_location`` point to NPZ files produced by Boltz for each model. - Override behavior: Boltz Runner sets overwrite=True if --override is specified in options (does not work for pose_options)! ''' # setup runner work_dir, jobstarter = self.generic_run_setup( poses=poses, prefix=prefix, jobstarters=[jobstarter, self.jobstarter, poses.default_jobstarter] ) boltz_out_dir = os.path.join(work_dir, "outputs") os.makedirs(boltz_out_dir, exist_ok=True) # sanitize options = options or "" # check for output scorefile = os.path.join(work_dir, f"{self.name}_scores.{poses.storage_format}") if os.path.isfile(scorefile) and not (overwrite or "--override" in options): scores = get_format(scorefile)(scorefile) # loads scorefile DF with correct loading function logging.info(f"Found existing scorefile at {scorefile}. Returning {len(scores.index)} poses from previous run without running calculations.") return RunnerOutput(poses=poses, results=scores, prefix=prefix, index_layers=self.index_layers).return_poses() #### write boltz inputs # parse msa_setting msa_setting = self._parse_msa_setting(options, msa_setting) # check if poses are in correct format (yaml) (unless bypass_poses_check) if not all(fp.endswith(".yaml") for fp in poses.poses_list()): convert_poses_to_boltz_yaml(poses, prefix=f"{prefix}/poses_yaml", msa=msa_setting) # if BoltzParams are given, use BoltzParams to generate new poses based on params if params: boltz_input_dir = os.path.join(work_dir, "boltz_inputs") params.generate_yaml_files(poses, boltz_input_dir) # if pose_options are specified, run as is. Otherwise batch predictions boltz_inputs = self._parse_poses( poses=poses, pose_options=pose_options, work_dir=work_dir, max_cores=jobstarter.max_cores ) parsed_options = self._parse_options( poses=poses, options=options, pose_options=pose_options, max_cores=jobstarter.max_cores, out_dir=boltz_out_dir, overwrite=overwrite ) # compile commands# parse options and pose_options: cmds = self._write_cmds(boltz_inputs, parsed_options) # run boltz jobstarter.start( cmds = cmds, jobname = f"{self.name}", output_path = work_dir ) # collect scores scores = collect_boltz_scores(boltz_out_dir) # output safety if len(scores) == 0: raise RuntimeError(f"Boltz crashed. Check output logs and output directory for error logs: {work_dir}") logging.info(f"Saving scores of {self} at {scorefile}") self.save_runner_scorefile(scores=scores, scorefile=scorefile) # return outputs logging.info(f"{self} finished. Returning {len(scores.index)} poses.") return RunnerOutput(poses=poses, results=scores, prefix=prefix, index_layers=self.index_layers).return_poses()
[docs] def convert_poses_to_boltz_yaml(poses: Poses, prefix: str, msa: str = None, overwrite: bool = True, reset_poses: bool = True) -> None: """For now, this only reads the protein sequence, not anything else (no ligand support). Convert input poses to Boltz-compatible YAMLs. Creates one YAML per pose under ``{poses.work_dir}/{prefix}``, encoding chain sequences (and MSA choice) for Boltz. Optionally updates ``poses.df["poses"]`` to point to the newly created YAMLs. Parameters ---------- poses : Poses Input poses (protflow.poses.Poses class); poses must be in FASTA/PDB/CIF format poses table. prefix : str Subdirectory name under ``poses.work_dir`` where YAMLs are written. msa : str or None One of ``"server"``, ``"empty"``, or a path to a custom ``.a3m`` file. ``"server"`` writes empty MSA entries and expects Boltz to fetch MSAs. overwrite : bool, optional If ``True``, existing YAMLs for the same prefix are replaced. reset_poses : bool, optional If ``True``, replace the ``poses`` column with YAML paths. Returns ------- None Raises ------ KeyError If the output columns for this prefix already exist in ``poses.df``. ValueError If ``msa`` is neither ``"server"``, ``"empty"``, a valid path, nor ``None``. Examples -------- >>> convert_poses_to_boltz_yaml(poses, prefix="boltz_inputs", msa="empty") >>> convert_poses_to_boltz_yaml(poses, prefix="boltz_inputs_srv", msa="server", reset_poses=False) Notes ----- - The function is sequence-centric (ligands/templates/properties are handled later via :class:`BoltzParams`). """ def _check_prefix(poses, prefix): if f"{prefix}_location" in poses.df.columns or f"{prefix}_description" in poses.df.columns: raise KeyError(f"Column {prefix} found in Poses DataFrame! Pick different Prefix!") def _determine_split_char(seq: str) -> str: return ":" if ":" in seq else "/" # create output folder out_dir = os.path.join(os.path.abspath(poses.work_dir), prefix) os.makedirs(out_dir, exist_ok=True) # check if outputs already exist: out_fn_list = [ os.path.join(out_dir, os.path.splitext(os.path.basename(pose))[0] + ".yaml") # replaces file-extension with .yaml for pose in poses.poses_list() ] # create new output names if all(os.path.isfile(out_fn) for out_fn in out_fn_list) and not overwrite: logging.info(f"Boltz yaml files exist at {out_dir}. Skipping creation to save time.") # set new poses and exit if reset_poses: poses.df["poses"] = out_fn_list return None # sanity _check_prefix(poses, prefix) # get sequence from poses, this differs depending on which type of pose we have (.fasta or .pdb/.cif). if all(pose.endswith((".fa", ".fas", ".fasta")) for pose in poses.poses_list()): # load raw sequences sequences = [load_sequence_from_fasta(pose, return_multiple_entries=False) for pose in poses.poses_list()] # assign chain IDs for sequences (start with [A -> Z], then [AA -> ZZ]): sequence_dict_list = [{idx_to_char(i): chain_seq for i, chain_seq in enumerate(seq.split(_determine_split_char(seq)))} for seq in sequences] elif all(pose.endswith((".pdb", "cif")) for pose in poses.poses_list()): sequence_dict_list = [get_sequence_from_pose(biopython_load_protein(pose, model_id=0), with_chains=True) for pose in poses.poses_list()] else: raise TypeError("Boltz only supports files in .pdb, .cif, or .fa format!") # now convert pose-level lists to valid boltz yamls. [{chain: seq, ...}, ...] -> [boltz-yaml-formatted-pose, ...] pose_yamls_raw = [convert_chain_seq_dict_to_yaml_dict(pose_dict, msa=msa, ignore_nonexistent_msa_file=True) for pose_dict in sequence_dict_list] # now create boltz pose_yamls boltz_pose_yamls = [ {"sequences": [{"protein": chain_dict} for chain_dict in pose_yaml]} for pose_yaml in pose_yamls_raw ] # store yamls for pose_yaml, out_fn in zip(boltz_pose_yamls, out_fn_list): boltz_yaml_writer(out_fn, pose_yaml) # set new poses if reset_poses: poses.df["poses"] = out_fn_list return None
[docs] def edit_boltz_yaml(*args, **kwargs) -> None: """ Placeholder for future YAML editing utilities. Raises ------ NotImplementedError Always raised; function is a stub. """ raise NotImplementedError
[docs] class BoltzParams: """ Builder for per-pose Boltz YAML content. Collects entries for proteins, nucleic acids, ligands, constraints, templates, and arbitrary properties. Each field value can be provided either as a *literal* or as a reference to a column in ``poses.df``. Column-referenced values are marked by passing their keys via ``poses_cols`` and are resolved at YAML generation time. Notes ----- - Each added entity is stored internally and later rendered into the final YAML structure via :meth:`generate_yaml_files`. - For sequence modifications, use a list of dicts with at least ``{"position": <int>, "ccd": <str>}``. """
[docs] def __init__(self): """ Initialize an empty parameter collection. The instance accumulates lists: ``proteins``, ``dna``, ``rna``, ``ligands``, ``constraints``, ``templates``, and ``properties``—all of which are reflected into the resulting YAML during :meth:`generate_yaml_files`. """ self.proteins = [] self.dna = [] self.rna = [] self.ligands = [] self.constraints = [] self.templates = [] self.properties = []
def _check_modifications_format(self, modifications) -> list[dict]|None: """ Validate the format of residue modifications. Parameters ---------- modifications : list[dict] or None A list of dicts with keys like ``"position"`` (int) and ``"ccd"`` (str), e.g. ``[{"position": 42, "ccd": "MSE"}]``; or ``None``. Returns ------- list[dict] or None The validated list (or ``None``) for downstream use. Raises ------ ValueError If ``modifications`` is not a list of dicts. KeyError If any dict lacks required keys such as ``"position"`` or ``"ccd"``. """ if modifications is None: return None if not (isinstance(modifications, list) and all(isinstance(elem, dict) for elem in modifications)): raise ValueError(f':modifications: parameter has to be in format [{"position": RES_IDX, "ccd": CCD}, ...]. modifications: {modifications}') for mod in modifications: if "position" not in mod or "ccd" not in mod: raise KeyError(f'One of your modifications is missing a "ccd" or "position" key. :modifications: parameter has to be in format: [{"position": RES_IDX, "ccd": CCD}, ...]. culprit: {mod}') return modifications
[docs] def add_protein(self, sequence: str, id: str|list[str], msa: str|bool = False, modifications: list[dict]|str = None, cyclic: bool = False, poses_cols: list[str] = None) -> None: # pylint: disable=W0622 ## we adhere to Boltz naming convention here, so id overwrite will be ignored in the sake of user experience. '''Helper to add protein entry. Parameters ---------- sequence : str Amino-acid sequence; may be a literal or a column name (see Notes). id : str or list[str] Chain ID(s) to use in the YAML; may be literal or a column name. modifications : list[dict] or None, optional Per-residue modifications (see :meth:`_check_modifications_format`). e.g. [{"position": RES_IDX, "ccd": CCD}, ...] (can also be a string pointing to a column in poses.df that contains the modifications dicts) cyclic : bool, optional Whether the peptide is cyclic. poses_cols : list[str], optional Keys that should be **read from** ``poses.df`` instead of used literally, e.g. ``["sequence", "id", "modifications"]``. Returns ------- None Examples -------- >>> bp.add_protein(sequence="ACDE...", id="A") >>> bp.add_protein(sequence="seq_col", id="chain_id_col", poses_cols=["sequence", "id"]) Notes ----- Any key named in ``poses_cols`` is treated as a reference to a column in the current pose row when rendering YAML. ''' # instantiate default value poses_cols = poses_cols or [] # compile protein dict in BoltzParams representation. protein_dict = { "id": id, "sequence": sequence, "msa": msa, "modifications": modifications if "modifications" in poses_cols else self._check_modifications_format(modifications), "cyclic": cyclic } protein_dict = {key: (val, key in poses_cols) for key, val in protein_dict.items()} # wrap in poses_cols flag! # add proteins entry to BoltzParams instance. self.proteins.append(protein_dict)
[docs] def add_dna(self, sequence: str, id: str|list[str], modifications: list[dict] = None, cyclic: bool = False, poses_cols: list[str] = None) -> None: # pylint: disable=W0622 ## we adhere to Boltz naming convention here, so id overwrite will be ignored in the sake of user experience. """ Add a DNA entry. Parameters ---------- sequence : str Nucleotide sequence (literal or column name). id : str or list[str] Identifier(s) for the DNA entry. modifications : list[dict] or None, optional Residue-level modifications for DNA. cyclic : bool, optional Whether the polymer is cyclic. poses_cols : list[str], optional Keys to interpret as column names in ``poses.df``. Returns ------- None """ # instantiate default value poses_cols = poses_cols or [] # compile dna dict in BoltzParams representation dna_dict = { "id": id, "sequence": sequence, "modifications": modifications if "modifications" in poses_cols else self._check_modifications_format(modifications), "cyclic": cyclic } dna_dict = {key: (val, key in poses_cols) for key, val in dna_dict.items()} # wrap in poses_cols! # add dna entry to BoltzParams instance self.dna.append(dna_dict)
[docs] def add_rna(self, sequence: str, id: str|list[str], modifications: list[dict] = None, cyclic: bool = False, poses_cols: list[str] = None) -> None: # pylint: disable=W0622 ## we adhere to Boltz naming convention here, so id overwrite will be ignored in the sake of user experience. """ Add an RNA entry. Parameters ---------- sequence : str Nucleotide sequence (literal or column name). id : str or list[str] Identifier(s) for the RNA entry. modifications : list[dict] or None, optional Residue-level modifications for RNA. cyclic : bool, optional Whether the polymer is cyclic. poses_cols : list[str], optional Keys to interpret as column names in ``poses.df``. Returns ------- None """ # instantiate default value poses_cols = poses_cols or [] # compile dna dict in BoltzParams representation rna_dict = { "id": id, "sequence": sequence, "modifications": modifications if "modifications" in poses_cols else self._check_modifications_format(modifications), "cyclic": cyclic } rna_dict = {key: (val, key in poses_cols) for key, val in rna_dict.items()} # wrap in poses_cols! # add rna entry to BoltzParams instance self.rna.append(rna_dict)
[docs] def add_ligand(self, ligand: str, id: str|list[str], ligand_type: str = "smiles", poses_cols: list[str] = None) -> None: # pylint: disable=W0622 ## we adhere to Boltz naming convention here, so id overwrite will be ignored in the sake of user experience. """ Add a ligand entry. Parameters ---------- ligand : str The ligand specification. For ``ligand_type="smiles"``, provide a SMILES; for ``"ccd"``, provide an RCSB CCD ID. id : str or list[str] Ligand ID(s) in the output YAML. ligand_type : {"smiles", "ccd"} How to interpret ``ligand``. poses_cols : list[str], optional Keys (e.g., ``["ligand", "id"]``) to read from ``poses.df``. ``"ligand_type"`` is not supported as a pose-column. Returns ------- None Raises ------ ValueError If ``"ligand_type"`` is included in ``poses_cols``. """ # instantiate default value poses_cols = poses_cols or [] # sanity if "ligand_type" in poses_cols: raise ValueError("We are sorry, but ligand_type is not yet supported in 'poses_cols'.") # verify ligand type if ligand_type.lower() not in {"smiles", "ccd"}: raise ValueError(f"Parameter :ligand_type: can be only one of {{'smiles', 'ccd'}}. ligand_type: {ligand_type}") # compile ligand dict in BoltzParams representation ligand_dict = { "id": (id, "id" in poses_cols), ligand_type.lower(): (ligand, "ligand" in poses_cols), } # add ligands entry to BoltzParams instance self.ligands.append(ligand_dict)
[docs] def add_constraint(self, constraint_type: str, poses_cols: list[str] = None, **kwargs) -> None: """ Add a geometric or pocket constraint. Parameters ---------- constraint_type : str One of typical types such as ``"bond"``, ``"angle"``, ``"dihedral"``, ``"contact"``, or ``"pocket"`` (see Notes for expected fields). poses_cols : list[str], optional Keys in ``kwargs`` that should be read from ``poses.df``. **kwargs Constraint parameters (literal values or column names if listed in ``poses_cols``). Returns ------- None Examples -------- Contact constraint between two tokens: >>> bp.add_constraint( ... "contact", ... token1=["A", 42], token2=["B", "CA"], max_distance=6.0 ... ) Notes ----- - ``bond/angle/dihedral`` expect standard token lists like ``["CHAIN", RES_IDX/ATOM_NAME]``. - ``pocket`` typically expects a ``binder`` (chain) and a list of pocket ``contacts`` plus an optional ``max_distance``. """ if constraint_type.lower() not in {"bond", "pocket", "contact"}: raise ValueError(f"Parameter :constraint_type: has to be one of {'bond', 'pocket', 'contact'}, your constraint_type: {constraint_type}") # instantiate default value poses_cols = poses_cols or [] # wrap keys for constraints in poses_cols flags: processed_kwargs = {key: (val, key in poses_cols) for key, val in kwargs.items()} # create dictionary that stores constraints and their kwargs. constraint_dict = {constraint_type.lower(): dict(processed_kwargs)} # add constraint entry to BoltzParams instance self.constraints.append(constraint_dict)
[docs] def add_template(self, template: str, template_type: str, poses_cols: list[str] = None, **kwargs) -> None: ''' Add a structural template. In ``**kwargs``, add the parameters of the given template that you want to use. Parameters ---------- template : str Path or identifier of the template (literal or column name). template_type : {"pdb", "cif"} Template format. poses_cols : list[str], optional Keys (including any in ``kwargs``) to be read from ``poses.df``. **kwargs Additional template parameters supported by Boltz (e.g., chain selection, residue ranges). Returns ------- None See the original Boltz documentation for details: https://github.com/jwohlwend/boltz/blob/main/docs/prediction.md ''' if template_type.lower() not in {"cif", "pdb"}: raise ValueError(f"Parameter :template_type: can only be one of {{'cif', 'pdb'}}, your template_type: {template_type}") # instantiate default value poses_cols = poses_cols or [] # wrap keys for templates in poses_cols flags processed_kwargs = {key: (val, key in poses_cols) for key, val in kwargs.items()} # create dictionary that stores constraints and their kwargs: templates_dict = {template_type.lower(): (template, "template" in poses_cols), **processed_kwargs} self.templates.append(templates_dict)
[docs] def add_property(self, property_type: str, poses_cols: list[str] = None, **kwargs) -> None: """ Attach arbitrary key–value properties to the YAML. Parameters ---------- property_type : str A top-level property category (e.g., ``"inference"``). poses_cols : list[str], optional Keys in ``kwargs`` that should be read from ``poses.df``. **kwargs Property payload (literal values or column names if listed in ``poses_cols``). Returns ------- None Examples -------- >>> BoltzParams.add_property('affinity', binder="binder_chain_col", poses_cols=["binder"]) >>> BoltzParams.add_property('affinity', binder="B") """ supported_properties = {"affinity"} if property_type not in supported_properties: raise ValueError(f"property {property_type} not supported. Supported properties: {supported_properties}") # parse poses cols poses_cols = poses_cols or [] # process property kwargs processed_kwargs = {key: (val, key in poses_cols) for key, val in kwargs.items()} property_dict = {property_type: processed_kwargs} self.properties.append(property_dict)
[docs] def generate_yaml_files(self, poses: Poses, out_dir: str, reset_poses: bool = True) -> None: '''Converts poses into new .yaml files at 'prefix' based on current paramters. or: render accumulated parameters into per-pose YAML files. Resolves all values that were marked as pose-columns against ``poses.df`` and writes one YAML per pose into ``out_dir``. Optionally updates ``poses.df["poses"]`` to point to the new files. Parameters ---------- poses : Poses Poses whose table provides column values for pose-bound fields. out_dir : str Output directory where YAML files are written. reset_poses : bool, optional If ``True``, replace the ``poses`` column with the new YAML paths. Returns ------- None Raises ------ KeyError If a requested pose-column is missing from ``poses.df``. ''' def _parse_dict_for_pose(pose: pd.Series, entity_dict: dict) -> dict: '''Fills in values from pose.df if values have "pose_col" set to true.''' parsed_dict = { key: pose[val] if is_pose_col else val # selects value from pose.df if pose_col was specified. for key, (val, is_pose_col) in entity_dict.items() } return parsed_dict def _add_key_if_not_there(input_dict, key, value) -> None: '''Adds {key: value} into 'input_dict' if 'key' is not yet in 'input_dict'.''' if key not in input_dict: input_dict[key] = value # sanity if not all(fp.endswith(".yaml") for fp in poses.poses_list()): raise TypeError("Poses must be in boltz-compatible .yaml format. Use the function 'protflow.tools.boltz.convert_poses_to_boltz_yaml()' for this!") # create output dir os.makedirs(out_dir, exist_ok=True) # operate per-pose # add proteins, dna, rna, and ligands to sequences entry: new_poses = [] for pose in poses: # read pose yaml pose_yaml = boltz_yaml_reader(pose["poses"]) #print(pose_yaml) # add sequences for protein_dict in self.proteins: _add_key_if_not_there(pose_yaml, "sequences", []) pose_yaml["sequences"].append({"protein": _parse_dict_for_pose(pose, protein_dict)}) for dna_dict in self.dna: pose_yaml["sequences"].append({"dna": _parse_dict_for_pose(pose, dna_dict)}) for rna_dict in self.rna: pose_yaml["sequences"].append({"rna": _parse_dict_for_pose(pose, rna_dict)}) for ligand_dict in self.ligands: pose_yaml["sequences"].append({"ligand": _parse_dict_for_pose(pose, ligand_dict)}) # add constraints (constraints are in different format than proteins/dna/rna/ligand) for constraint_dict in self.constraints: _add_key_if_not_there(pose_yaml, "constraints", []) pose_yaml["constraints"].append({cst_type: _parse_dict_for_pose(pose, cst_dict) for cst_type, cst_dict in constraint_dict.items()}) # add templates for template_dict in self.templates: _add_key_if_not_there(pose_yaml, "templates", []) pose_yaml["templates"].append(_parse_dict_for_pose(pose, template_dict)) # add properties for property_dict in self.properties: _add_key_if_not_there(pose_yaml, "properties", []) pose_yaml["properties"].append({property_type: _parse_dict_for_pose(pose, property_args) for property_type, property_args in property_dict.items()}) # write output new_pose_fn = os.path.join(out_dir, os.path.basename(pose["poses"])) boltz_yaml_writer(new_pose_fn, pose_yaml) # add new filename to new_poses list for integration into poses later new_poses.append(new_pose_fn) # set new poses if reset_poses: poses.df["poses"] = new_poses logging.info(f"Finished converting poses to .yaml files based on BoltzParams.\nAdded {len(self.proteins)} proteins, {len(self.ligands)} ligands, {len(self.dna)} DNA molecules, and {len(self.rna)} RNA molecules.\nAdded {len(self.constraints)} constraints, {len(self.templates)} templates, and {len(self.properties)} properties.")
[docs] def convert_chain_seq_dict_to_yaml_dict(chain_seq_dict: dict[str,str], msa: str = None, ignore_nonexistent_msa_file: bool = False) -> dict[str,str]: ''' Converts dictionary that contains {chain: seq, ...} into boltz-compatible protein entries {}. When msa is set to 'server', the function will set <msa: empty> (use option --use_msa_server!) Convert a chain→sequence mapping into Boltz YAML "protein" entries. Parameters ---------- chain_seq_dict : dict[str, str] Mapping from chain ID to amino-acid sequence. msa : {"server", "empty", "auto"} or str or None, optional If ``"server"/"empty"/"auto"/None`` → write ``"msa": "empty"`` per chain. If a string path → use it as the MSA file for all chains (exists unless ``ignore_nonexistent_msa_file=True``). ignore_nonexistent_msa_file : bool, optional If ``True``, skip the existence check for the path given in ``msa``. Returns ------- list of dict One dict per chain with keys ``id``, ``sequence``, and ``msa``. Raises ------ FileNotFoundError If ``msa`` is a path that does not exist and ``ignore_nonexistent_msa_file`` is ``False``. ValueError If ``msa`` is not one of the accepted values. Examples -------- >>> convert_chain_seq_dict_to_yaml_dict({"A": "ACDE", "B": "FGHI"}, msa="empty") [{'id': 'A', 'sequence': 'ACDE', 'msa': 'empty'}, {'id': 'B', 'sequence': 'FGHI', 'msa': 'empty'}] ''' # parse MSA option match msa: case "server" | "empty" | "auto" | None: msa_val = "empty" case str(): msa_val = msa if not os.path.isfile(msa) and not ignore_nonexistent_msa_file: raise FileNotFoundError(f"Specified MSA file not found: {msa}") case _: raise ValueError(f"Not allowed: {msa}. Either provide a path to an existing MSA, None, 'server' (to get msa from msa-server), or 'empty'.") # create protein yaml for each chain. protein_yaml = [ { "id": chain, "sequence": seq, "msa": msa_val } for chain, seq in chain_seq_dict.items() ] return protein_yaml
def _folders_in_dir(dir_path: str) -> list: '''finds and returns all folders in :dir_path: that don't start with a . (hidden folders).''' dir_path = Path(dir_path) # Note: if this causes issues in the future with random folders, add an additional check for the subdirectory # to contain at least a file with f'{parent_folder_name}_model_0.{"cif" or "pdb"}' return_dirs = [p for p in dir_path.iterdir() if p.is_dir() and not p.name.startswith(".")] # exclude hidden folders return return_dirs def _read_boltz_confidence_file(fp: str) -> pd.Series: '''Reads boltz confidence output file.''' with open(fp, 'r', encoding="UTF-8") as f: scores_dict = json.load(f) return pd.Series(scores_dict) def _get_last_dir_name(path: str) -> str: '''returns name of last directory in path.''' p = Path(path) if p.is_dir() or str(path).endswith("/"): return p.name return p.parent.name
[docs] def collect_boltz_scores(boltz_output_dir: str) -> pd.DataFrame: """ Aggregate per-model Boltz outputs into a Pandas DataFrame. Expects the Boltz output layout: ``{boltz_output_dir}/{input}/predictions/{pose}/`` containing: - structure files: ``{pose}_model_*.cif`` or ``.pdb`` - confidence JSONs: ``confidence_{pose}_model_{i}.json`` - optional affinity JSON: ``affinity_{pose}.json`` - NPZ artifacts per model: ``plddt_*``, ``pae_*``, ``pde_*`` Parameters ---------- boltz_output_dir : str Top-level directory passed to Boltz via ``--out_dir``. Returns ------- pandas.DataFrame One row per model with at least: ``description``, ``location``, and paths for ``plddt_location``, ``pae_location``, ``pde_location``; plus all JSON keys. Notes ----- The ``description`` column is ``{pose}_model_{rank}`` and ``location`` points to the corresponding ``.pdb/.cif`` model file. :contentReference[oaicite:3]{index=3} """ # create list of output files out_fl = _folders_in_dir(boltz_output_dir) out_fl = [os.path.join(out_f, "predictions") for out_f in out_fl] # create output aggregation list out_l = [] # loop over output folders {input_dir/input_file}/{predictions}/{input_file}/{diffusion_samples} -> multiple input files and multiple diffusion samples for out_f in out_fl: for input_file in _folders_in_dir(out_f): # input_file should be: /path/to/boltz_output_dir/{boltz_input}/predictions/{input_file}/ # basename of pose description = _get_last_dir_name(input_file) # loop over output models output_models = glob(f"{input_file}/{description}_model_*.cif") + glob(f"{input_file}/{description}_model_*.pdb") for pose_fp in output_models: # determine rank rank = int(os.path.splitext(os.path.basename(pose_fp))[0].rsplit("_", maxsplit=1)[-1]) # takes the rank (1) from /path/to/confidence_{pose_description}_model_1.json ## collect scores pose_confidence_file = f"{input_file}/confidence_{description}_model_{rank}.json" confidence_scores = _read_boltz_confidence_file(pose_confidence_file) # parse description confidence_scores["description"] = f"{description}_model_{rank}" confidence_scores["location"] = pose_fp # read affinity scores affinity_fn = f"{input_file}/affinity_{description}.json" if os.path.isfile(affinity_fn): affinity_scores = _read_boltz_confidence_file(affinity_fn) confidence_scores = pd.concat([confidence_scores, affinity_scores]) # add .npz file-locations into the scores npz_file_headers = ["plddt", "pae", "pde"] npz_files = [f"{input_file}/{header}_{description}_model_{rank}.npz" for header in npz_file_headers] for npz_file, header in zip(npz_files, npz_file_headers): confidence_scores[f"{header}_location"] = npz_file # append model scores to global output list: out_l.append(confidence_scores) # aggregate scores in DataFrame scores = pd.DataFrame(out_l) return scores
[docs] def idx_to_char(idx: int) -> str: """ Convert a 0-based index to letters like Excel columns. 0 -> 'A', 25 -> 'Z', 26 -> 'AA', 27 -> 'AB', ... """ if not isinstance(idx, int): raise TypeError("idx must be an int") if idx < 0: raise ValueError("idx must be >= 0") n = idx + 1 # shift to 1-based index chars = [] while n > 0: n, rem = divmod(n - 1, 26) chars.append(chr(ord('A') + rem)) return ''.join(reversed(chars))
# --- flow-style helper for specific sequences ---
[docs] class FlowSeq(list): """ Marker list that forces YAML *flow style*. When dumped with :class:`MyDumper`, lists of this type are emitted as ``[a, b, c]`` on one line rather than block style. Used to keep compact representations for IDs and token tuples in Boltz YAMLs. :contentReference[oaicite:4]{index=4} """ pass # pylint: disable=W0107
def _flow_seq_representer(dumper, data): return dumper.represent_sequence('tag:yaml.org,2002:seq', data, flow_style=True)
[docs] class MyDumper(yaml.SafeDumper): """YAML dumper enabling flow-style emission for :class:`FlowSeq`.""" pass # pylint: disable=W0107
# write class that autodetects lists in yaml and converts them into flow stuff def _process_boltz_yaml_for_output(boltz_yaml: dict) -> dict: '''This is now a manually done function which is annoying. Try to convert this with patterns later.''' # fix id entries in the same line, e.g.: 'id: [A, B]' for sequence_entry in boltz_yaml.get("sequences", []): (_, entity_dict), = sequence_entry.items() if "id" in entity_dict and isinstance(entity_dict["id"], list): entity_dict["id"] = FlowSeq(entity_dict["id"]) # same for constraint entries for constraint_entry in boltz_yaml.get("constraints", []): (constraint_type, constraint_dict), = constraint_entry.items() if constraint_type == "bond": constraint_dict["atom1"] = FlowSeq(constraint_dict["atom1"]) constraint_dict["atom2"] = FlowSeq(constraint_dict["atom2"]) if constraint_type == "pocket": constraint_dict["contacts"] = FlowSeq(constraint_dict["contacts"]) if constraint_type == "contact": constraint_dict["token1"] = FlowSeq(constraint_dict["token1"]) constraint_dict["token2"] = FlowSeq(constraint_dict["token2"]) # same for template entries (specifying ID's usually happens in lists if multiple IDs are specified) for template_entry in boltz_yaml.get("constraints", []): for template_key in template_entry: if isinstance(template_entry[template_key], list): template_entry[template_key] = FlowSeq(template_entry[template_key]) return boltz_yaml
[docs] def boltz_yaml_writer(out_path: str, boltz_yaml: dict) -> None: """ Write a Boltz YAML document to disk (pretty, stable layout). Parameters ---------- out_path : str Output ``.yaml`` path. boltz_yaml : dict YAML document to write (will be processed for flow-style lists). Returns ------- None """ MyDumper.add_representer(FlowSeq, _flow_seq_representer) processed_yaml = _process_boltz_yaml_for_output(boltz_yaml) with open(out_path, 'w', encoding="UTF-8") as f: yaml.dump( processed_yaml, f, Dumper=MyDumper, sort_keys=False, default_flow_style=False, indent=2, width=10**9, allow_unicode=True )
[docs] def boltz_yaml_reader(in_path: str) -> dict: """ Read a Boltz YAML file into a Python dictionary. Parameters ---------- in_path : str Path to a ``.yaml`` file. Returns ------- dict Parsed YAML document. """ with open(in_path, 'r', encoding="UTF-8") as f: out_dict = yaml.safe_load(f) return out_dict