Source code for protflow.poses

"""
poses Module
============

This module provides functionalities for handling and manipulating protein data within the ProtFlow framework. It focuses on managing protein data represented as Pandas DataFrames, allowing for efficient parsing, storage, and manipulation of protein data across various file formats. The module facilitates complex protein study workflows and integrates seamlessly with other components of the ProtFlow package.

Detailed Description
--------------------
The `poses` module offers a robust class, `Poses`, designed to encapsulate the functionality necessary to manage protein data. It supports various operations such as setting up work directories, parsing protein data, and integrating outputs from different computational processes. The module ensures that the results are organized and accessible for further analysis within the ProtFlow ecosystem.

Key Features
------------
- **Parsing Protein Data**: Supports reading protein data from various file formats like JSON, CSV, Pickle, Feather, and Parquet.
- **Data Storage and Retrieval**: Allows storing and retrieving protein data in multiple formats, facilitating easy data management.
- **Integration with ProtFlow**: Seamlessly integrates with ProtFlow's job management components, enhancing its utility in distributed computing environments.
- **Advanced Data Manipulation**: Provides functionalities to merge and prefix data from various sources, making it easier to handle complex datasets.
- **Flexible and Customizable**: Users can customize the data handling processes through various parameters, enabling tailored data management solutions.

Usage
-----
To use this module, create an instance of the `Poses` class and utilize its methods to manage protein data. Here is an example demonstrating its usage within a ProtFlow pipeline:

.. code-block:: python

    from poses import Poses

    # Initialize the Poses class with protein data and a working directory
    poses_instance = Poses(poses=my_protein_data, work_dir='path/to/work_dir')
    
    # Further operations using poses_instance
    poses_instance.save_scores('path/to/save/scores')
    poses_instance.filter_poses_by_rank(n=10, score_col='score', prefix='filtered_poses')

Examples
--------
Here is an example of how to initialize and use the `Poses` class for managing protein data:

.. code-block:: python

    from poses import Poses

    # Create an instance of the Poses class
    poses_instance = Poses(poses=my_protein_data, work_dir='path/to/work_dir')

    # Perform various operations using the instance
    poses_instance.set_work_dir('new/work/dir')
    poses_instance.save_scores('path/to/save/scores', out_format='csv')
    filtered_poses = poses_instance.filter_poses_by_value(score_col='score', value=0.5, operator='>')

Further Details
---------------
    - **Edge Cases**: The module handles various edge cases such as empty pose lists and the need to overwrite previous results. It includes robust error handling and logging for easier debugging and verification.
    - **Customizability**: Users can customize the data handling process through multiple parameters, including storage formats, pose-specific parameters, and job management settings.
    - **Integration**: The module integrates seamlessly with other components of the ProtFlow framework, leveraging shared configurations and data structures to provide a cohesive user experience.

This module is intended for researchers and developers who need to manage protein data within their computational workflows. By automating many of the setup and execution steps, it allows users to focus on interpreting results and advancing their scientific inquiries.

Notes
-----
This module is part of the ProtFlow package and is designed to work in tandem with other components of the package, especially those related to job management in HPC environments.

Authors
-------
Markus Braun, Adrian Tripp

Version
-------
0.1.0
"""

import os
from glob import glob
import re
from typing import Union
import shutil
import logging
import ast

# dependencies
import pandas as pd
import Bio.PDB

# customs
from . import jobstarters
from .jobstarters import JobStarter
from .residues import ResidueSelection
from .utils.utils import parse_fasta_to_dict
from .utils.biopython_tools import load_structure_from_pdbfile, get_sequence_from_pose
from .utils import plotting as plots

FORMAT_STORAGE_DICT = {
    "json": "to_json",
    "csv": "to_csv",
    "pkl": "to_pickle",
    "pickle": "to_pickle"
}

[docs] class Poses: """ Poses Class =========== The `Poses` class within the ProtFlow package is designed for handling protein data, enabling the parsing, storage, and manipulation of protein data represented as Pandas DataFrames. This class facilitates the management of complex protein study workflows and integrates seamlessly with other components of the ProtFlow framework. Detailed Description -------------------- The `Poses` class encapsulates the functionality necessary for comprehensive management of protein data. It supports various operations, including setting up work directories, parsing protein data from different sources, integrating outputs from different runners, and handling protein data in multiple file formats. This class is essential for users looking to streamline their protein data management within computational workflows. Key Features ------------ - **Work Directory Setup**: Easily sets up and manages work directories for storing intermediate and final results. - **Data Parsing**: Parses protein data from various sources and formats, including JSON, CSV, Pickle, Feather, and Parquet. - **Data Storage and Retrieval**: Stores and retrieves protein data in multiple file formats, ensuring flexibility in data management. - **Job Management Integration**: Integrates with ProtFlow's job management components, facilitating the handling of protein data in distributed computing environments. - **Advanced Data Manipulation**: Supports operations like merging, prefixing, and duplicating data, providing robust data manipulation capabilities. - **Filtering and Scoring**: Offers methods to filter protein data based on various criteria and calculate composite scores for better data analysis. - **Pose Handling**: Manages protein poses, including loading, saving, and converting between different formats (e.g., PDB to FASTA). Usage ----- To use this class, create an instance of the `Poses` class and utilize its methods to manage protein data. Here is an example demonstrating its usage within a ProtFlow pipeline: .. code-block:: python from poses import Poses # Initialize the Poses class with protein data and a working directory poses_instance = Poses(poses=my_protein_data, work_dir='path/to/work_dir') # Set up the work directory poses_instance.set_work_dir('path/to/new_work_dir') # Parse and manipulate poses poses_instance.set_poses(poses=my_protein_data) poses_instance.save_scores('path/to/save/scores', out_format='csv') # Filter poses filtered_poses = poses_instance.filter_poses_by_rank(n=10, score_col='score', prefix='filtered_poses') # Calculate a composite score poses_instance.calculate_composite_score(name='composite_score', scoreterms=['score1', 'score2'], weights=[0.5, 0.5], plot=True) Further Details --------------- - **Edge Cases**: The class handles various edge cases, such as empty pose lists, the need to overwrite previous results, and handling multiline FASTA inputs. - **Customizability**: Users can customize the data handling process through multiple parameters, including storage formats, pose-specific parameters, and job management settings. - **Integration**: The class integrates seamlessly with other components of the ProtFlow framework, leveraging shared configurations and data structures to provide a cohesive user experience. - **Error Handling**: Includes robust error handling and logging for easier debugging and verification of data processing steps. Attributes ---------- - `df` : pd.DataFrame A DataFrame to store protein data. - `work_dir` : str The working directory for storing data and results. - `storage_format` : str The format for storing protein data (e.g., 'json', 'csv'). - `default_jobstarter` : JobStarter The default job starter for managing jobs. Notes ----- This class is part of the ProtFlow package and is designed to work in tandem with other components of the package, especially those related to job management in HPC environments. Author ------ Markus Braun, Adrian Tripp Version ------- 0.1.0 """ ############################################# SETUP #########################################
[docs] def __init__(self, poses: list = None, work_dir: str = None, storage_format: str = "json", glob_suffix: str = None, jobstarter: JobStarter = jobstarters.SbatchArrayJobstarter()): """ Initializes the Poses class with optional parameters for poses, working directory, storage format, glob suffix, and job starter. Parameters ---------- poses : list, optional A list of paths to the protein data files to be managed. If not provided, an empty DataFrame is initialized. work_dir : str, optional The working directory where intermediate and final results will be stored. If not provided, the current directory is used. storage_format : str, optional The format used for storing protein data (default is 'json'). Supported formats include 'json', 'csv', 'pickle', 'feather', and 'parquet'. glob_suffix : str, optional A suffix used for globbing multiple files. This allows for batch processing of files matching the given pattern. jobstarter : JobStarter, optional An instance of the JobStarter class used to manage job submissions. The default is an instance of SbatchArrayJobstarter from the jobstarters module. Attributes ---------- df : pd.DataFrame A DataFrame to store protein data. work_dir : str The working directory for storing data and results. storage_format : str The format for storing protein data. default_jobstarter : JobStarter The default job starter for managing jobs. Notes ----- This method initializes the Poses class and sets up various attributes required for managing protein data. It prepares the environment for subsequent data manipulation and analysis operations. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with protein data and a working directory poses_instance = Poses(poses=my_protein_data, work_dir='path/to/work_dir') """ self.df = None self.set_work_dir(work_dir, set_scorefile=False) self.set_poses(poses, glob_suffix=glob_suffix) # setup poses.storage_format and poses.scorefile. self.set_storage_format(storage_format) self.set_scorefile(self.work_dir) # setup jobstarter self.default_jobstarter = jobstarter # set other empty attributes self.motifs = []
def __iter__(self): for _, row in self.df.iterrows(): yield row def __len__(self): return len(self.df) ############################################# SETUP METHODS #########################################
[docs] def set_scorefile(self, work_dir: str) -> None: """ Sets the scorefile path for storing protein scores. Parameters ---------- work_dir : str The working directory where the scorefile will be stored. If the work directory is not set, the scorefile is stored in the current directory. Attributes ---------- scorefile : str The path to the scorefile where protein scores are stored. Notes ----- This method configures the path for the scorefile based on the provided working directory. If no working directory is specified, the scorefile is stored in the current directory. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses() # Set the scorefile path poses_instance.set_scorefile(work_dir='path/to/work_dir') """ # if no work_dir is set, store scores in current directory. scorefile_path = os.path.join(work_dir, os.path.basename(work_dir)) if work_dir else "./poses" self.scorefile = f"{scorefile_path}_scores.{self.storage_format}"
[docs] def set_storage_format(self, storage_format: str) -> None: """ Sets the storage format for storing protein data. Parameters ---------- storage_format : str The format used for storing protein data. Supported formats include 'json', 'csv', 'pickle', 'feather', and 'parquet'. Raises ------ KeyError If the provided storage format is not supported. Notes ----- This method configures the storage format for protein data. It ensures that the format is one of the supported formats and raises an error if the format is invalid. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses() # Set the storage format to 'csv' poses_instance.set_storage_format('csv') """ if storage_format.lower() not in FORMAT_STORAGE_DICT: raise KeyError(f"Format {storage_format} not available. Format must be on of {[list(FORMAT_STORAGE_DICT)]}") self.storage_format = storage_format # removed .lower() maybe there is a storage format that needs caps letters.
[docs] def set_work_dir(self, work_dir: str, set_scorefile: bool = True) -> None: """ Sets up and configures the working directory for storing data and results. Parameters ---------- work_dir : str The working directory where data and results will be stored. If the directory does not exist, it will be created. set_scorefile : bool, optional If True, also sets the path for the scorefile in the specified working directory (default is True). Further Details --------------- This method creates the necessary subdirectories within the specified working directory to organize score files, filter results, and plots. It ensures that the required directory structure is in place for subsequent data management operations. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses() # Set the working directory poses_instance.set_work_dir('path/to/new_work_dir') Notes ----- - The method will log the creation of directories if they do not already exist. - If `set_scorefile` is set to True, the scorefile path will be configured within the working directory. """ def set_dir(dir_name: str, work_dir: str) -> str: '''Creates a directory inside of work_dir that has the name {dir_name}_dir. Also sets an attribute self.{dir_name} that points to the directory.''' if work_dir is None: return None dir_ = os.path.join(work_dir, dir_name) os.makedirs(dir_, exist_ok=True) return dir_ # setup and create work_dir if it does not already exist if work_dir: work_dir = os.path.abspath(work_dir) os.makedirs(work_dir, exist_ok=True) logging.info(f"Creating directory {os.path.abspath(work_dir)}") self.work_dir = os.path.abspath(work_dir) else: self.work_dir = None # setup common directories for workflows: self.scores_dir = set_dir("scores", work_dir) self.filter_dir = set_dir("filter", work_dir) self.plots_dir = set_dir("plots", work_dir) # setup scorefile if option is provided (default: True) if set_scorefile: self.set_scorefile(work_dir)
[docs] def set_logger(self) -> None: """ Configures the logger for the Poses class. Further Details --------------- This method sets up the logging configuration for the Poses class. It creates a logger that outputs log messages to both the console and a log file in the working directory (if set). This aids in debugging and tracking the progress of data processing operations. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses(work_dir='path/to/work_dir') # Set up the logger poses_instance.set_logger() Notes ----- - The log file is named after the working directory and stored within it. - The logging level is set to INFO, and log messages include timestamps, logger names, log levels, and messages. """ # Create a logger if self.work_dir: logfile_path = os.path.join(self.work_dir, f"{os.path.basename(self.work_dir)}.log") else: logfile_path = None # Configure the basic logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) if logfile_path: logging.getLogger().addHandler(logging.FileHandler(logfile_path))
[docs] def set_jobstarter(self, jobstarter: JobStarter) -> None: """ Configures the job starter for managing job submissions. Parameters ---------- jobstarter : JobStarter An instance of the JobStarter class used to manage job submissions. Further Details --------------- This method sets the job starter for the Poses class, which is used to manage job submissions in distributed computing environments. It allows the user to specify a custom job starter for handling computational tasks. Example ------- .. code-block:: python from poses import Poses from protflow.jobstarters import CustomJobStarter # Initialize the Poses class poses_instance = Poses() # Set a custom job starter custom_jobstarter = CustomJobStarter() poses_instance.set_jobstarter(custom_jobstarter) Notes ----- - The job starter must be an instance of the JobStarter class or a subclass thereof. - This method enables customization of job management to suit specific computational workflows. """ self.default_jobstarter = jobstarter
[docs] def change_poses_dir(self, poses_dir: str, copy: bool = False, overwrite: bool = False) -> "Poses": """ Changes the directory of the stored poses, with options to copy or overwrite existing poses. Parameters ---------- poses_dir : str The new directory where the poses will be located. copy : bool, optional If True, the poses will be copied to the new directory (default is False). overwrite : bool, optional If True, existing files in the new directory will be overwritten (default is False). Returns ------- Poses The updated Poses instance with poses located in the new directory. Further Details --------------- This method updates the paths of the stored poses to a new directory. If the `copy` parameter is set to True, the poses are copied to the new directory. The `overwrite` parameter controls whether existing files in the new directory are overwritten. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses(poses=my_protein_data, work_dir='path/to/work_dir') # Change the directory of the poses poses_instance.change_poses_dir('path/to/new_poses_dir', copy=True, overwrite=True) Notes ----- - If `copy` is set to False, the method only updates the paths in the DataFrame without moving the files. - Raises a ValueError if the new directory does not exist or if the poses do not exist in the specified directory (when `copy` is False). - Ensures the integrity of the poses by verifying their existence in the new directory. """ # define new poses: new_poses = [os.path.join(poses_dir, os.path.basename(pose)) for pose in self.poses_list()] # exchange with check if work_dir is a directory and the poses exist if not copy: # just change the name of the directory in the poses_df, don't copy the poses anywhere if not os.path.isdir(poses_dir): raise ValueError(f":work_dir: has to be existing directory!") if not all((os.path.isfile(pose) for pose in new_poses)): raise ValueError(f"Poses do not exist at specified directory. If you want to copy the poses there, set the parameter :copy: to True!") else: # actually copy the poses to a new directory (for whatever reason) if not os.path.isdir(poses_dir): os.makedirs(poses_dir) if overwrite: for old_path, new_path in zip(self.poses_list(), new_poses): shutil.copy(old_path, new_path) else: # if overwrite is False, check if the file exists first. This should save read/write speed. for old_path, new_path in zip(self.poses_list(), new_poses): if not os.path.isfile(new_path): shutil.copy(old_path, new_path) # change path in self.df["poses"] column self.df["poses"] = new_poses return self
[docs] def parse_poses(self, poses: Union[list,str] = None, glob_suffix: str = None) -> list: """ Parses the input poses, which can be provided as a list or a directory with a glob suffix. Parameters ---------- poses : Union[list, str], optional A list of file paths or a directory containing the protein data files. If not provided, an empty list is returned. glob_suffix : str, optional A suffix used for globbing multiple files in the specified directory. Returns ------- list A list of parsed pose file paths. Further Details --------------- This method handles various input types for parsing poses. It can parse a list of file paths directly or glob files in a specified directory using a suffix. The method ensures that all specified files exist and raises appropriate errors if they do not. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses() # Parse poses from a directory with a glob suffix parsed_poses = poses_instance.parse_poses(poses='path/to/pose_dir', glob_suffix='*.pdb') Notes ----- - Raises FileNotFoundError if any specified files do not exist. - Supports both single file and multiple file (via globbing) inputs. - Ensures that the returned list contains valid file paths. """ if isinstance(poses, str) and glob_suffix: parsed_poses = glob(f"{poses}/{glob_suffix}") if not parsed_poses: raise FileNotFoundError(f"No {glob_suffix} files were found in {poses}. Did you mean to glob? Was the path correct?") return parsed_poses if isinstance(poses, str) and not glob_suffix: if not os.path.isfile(poses): raise FileNotFoundError(f"File {poses} not found!") return [poses] if isinstance(poses, list): if not all((os.path.isfile(path) for path in poses)): raise FileNotFoundError(f"Not all files listed in poses were found.") return poses if poses is None: return [] raise TypeError(f"Unrecognized input type {type(poses)} for function parse_poses(). Allowed types: [list, str]")
[docs] def parse_descriptions(self, poses: list = None) -> list: """ Parses descriptions from the provided pose file paths. Parameters ---------- poses : list, optional A list of pose file paths from which descriptions are extracted. Returns ------- list A list of descriptions parsed from the pose file paths. Further Details --------------- This method extracts descriptions from the provided list of pose file paths. Descriptions are derived from the file names by stripping the directory path and file extension. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses() # Parse descriptions from pose file paths descriptions = poses_instance.parse_descriptions(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) Notes ----- - This method is useful for generating a list of concise descriptions based on file names. - Ensures that descriptions are derived in a consistent format, suitable for use in data management and analysis. """ return [description_from_path(pose) for pose in poses]
[docs] def set_poses(self, poses: Union[list,str,pd.DataFrame] = None, glob_suffix: str = None) -> None: """ Sets the poses for the Poses instance, parsing the input if necessary. Parameters ---------- poses : Union[list, str, pd.DataFrame], optional A list of file paths, a directory containing the protein data files, or a DataFrame containing the poses. If not provided, an empty DataFrame is initialized. glob_suffix : str, optional A suffix used for globbing multiple files in the specified directory. Further Details --------------- This method initializes the poses for the Poses instance. It can accept various input types, including a list of file paths, a directory for globbing files, or a DataFrame. The method ensures that the poses are correctly parsed and set up for further processing. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses() # Set poses from a directory with a glob suffix poses_instance.set_poses(poses='path/to/pose_dir', glob_suffix='*.pdb') # Set poses from a list of file paths poses_instance.set_poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) Notes ----- - If a DataFrame is provided, it is directly used as the poses DataFrame after integrity checks. - The method supports parsing multiline FASTA inputs and handles them appropriately. - Ensures that the poses DataFrame contains necessary columns for subsequent operations. """ # if DataFrame is passed, load directly. if isinstance(poses, pd.DataFrame): self.df = self.check_poses_df_integrity(poses) self.convert_resselection_cols(resselection_col="import_resselection_cols") return None if isinstance(poses, str) and any([poses.endswith(ext) for ext in ['csv', 'json', 'parquet', 'pickle', 'feather']]): self.df = get_format(poses)(poses) # importing .csv files results in the index column being read in as Unnamed: 0, it can be dropped if 'Unnamed: 0' in self.df.columns: self.df.drop('Unnamed: 0', axis=1, inplace=True) self.df = self.check_poses_df_integrity(self.df) self.convert_resselection_cols(resselection_col="import_resselection_cols") return None # if Poses are initialized freshly (with input poses as strings:) poses = self.parse_poses(poses, glob_suffix=glob_suffix) # handle multiline .fa inputs for poses! for pose in poses: if not pose.endswith(".fa") and not pose.endswith(".fasta"): continue if len(parse_fasta_to_dict(pose)) > 1: poses.remove(pose) poses += self.split_multiline_fasta(pose) self.df = pd.DataFrame({"input_poses": poses, "poses": poses, "poses_description": self.parse_descriptions(poses)}) return None
[docs] def check_prefix(self, prefix: str) -> None: """ Checks if the given prefix is already used in the poses DataFrame. Parameters ---------- prefix : str The prefix to be checked in the poses DataFrame. Raises ------ KeyError If the prefix is already used in the poses DataFrame. Further Details --------------- This method verifies whether the specified prefix is already in use within the poses DataFrame. It is useful for ensuring that new prefixes do not conflict with existing ones, maintaining data integrity. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses() # Check if a prefix is already used poses_instance.check_prefix('new_prefix') Notes ----- - The method raises a KeyError if the prefix is found in the DataFrame, indicating a conflict. - Ensures that new prefixes are unique and can be safely used for new columns or attributes. """ if f"{prefix}_location" in self.df.columns or f"{prefix}_description" in self.df.columns: raise KeyError(f"Prefix {prefix} is already taken in poses.df") if "/" in prefix: raise ValueError(f"Prefix must not contain a slash '/' as this will raise problems with runner directories")
[docs] def check_poses_df_integrity(self, df: pd.DataFrame) -> pd.DataFrame: """ Checks the integrity of the poses DataFrame, ensuring it contains necessary columns. Parameters ---------- df : pd.DataFrame The DataFrame to be checked for integrity. Returns ------- pd.DataFrame The validated poses DataFrame. Raises ------ KeyError If the DataFrame does not contain the mandatory columns 'input_poses', 'poses', and 'poses_description'. Further Details --------------- This method verifies that the poses DataFrame contains the necessary columns required for proper functioning. It ensures that the DataFrame has 'input_poses', 'poses', and 'poses_description' columns, which are essential for various operations. Example ------- .. code-block:: python from poses import Poses import pandas as pd # Initialize the Poses class poses_instance = Poses() # Create a sample DataFrame sample_df = pd.DataFrame({ 'input_poses': ['path/to/pose1.pdb'], 'poses': ['path/to/pose1.pdb'], 'poses_description': ['pose1'] }) # Check the integrity of the DataFrame validated_df = poses_instance.check_poses_df_integrity(sample_df) Notes ----- - The method raises a KeyError if any of the mandatory columns are missing. - Ensures that the DataFrame is properly structured for further data manipulation and analysis. """ cols = ["input_poses", "poses", "poses_description"] for col in cols: if col not in df.columns: raise KeyError(f"Corrupted Format: DataFrame does not contain mandatory Poses column {col}") return df
[docs] def convert_resselection_cols(self, resselection_col:str="import_resselection_cols"): """ Converts per-row residue selection descriptors into ``ResidueSelection`` objects for the columns listed in a list-like selector column, mutating the DataFrame in place. Parameters ---------- resselection_col : str, optional Name of the column that, for each row, contains a list/tuple of target column names to convert (default is ``import_resselection_cols``). When reading from CSV, this field may be a stringified list (e.g., ``['a','b']``), which will be parsed automatically. Returns ------- None This method modifies self.df in place and returns None. If resselection_col is not present in self.df, the method exits early. Raises ------ KeyError If a row's value in ``resselection_col`` exists but is not a list or tuple (after optional string-to-list parsing). ValueError If parsing a stringified list with ``ast.literal_eval`` fails due to an invalid literal. SyntaxError If parsing a malformed stringified list triggers a syntax error. TypeError If constructing a ``ResidueSelection`` from a cell value raises a type error. Further Details --------------- For each row, the method reads the list of target column names from ``resselection_col`` and attempts to convert the corresponding cells: - If a target column listed for a row does not exist in ``self.df``, a warning is logged and that column is skipped for the row. - If the target cell is already a ``ResidueSelection`` instance, it is left unchanged. - If the target cell is a ``str``, it is converted via ``ResidueSelection(value)`` (useful for CSV imports). - If the target cell is a ``dict``, it is converted via ``ResidueSelection(value, from_scorefile=True)`` (useful for JSON imports). - Empty selector lists are allowed and simply result in no action for that row. - Cells that are falsy (e.g., ``None``, empty string, empty dict) are skipped. Example ------- .. code-block:: python import pandas as pd from protflow.poses import poses # Sample DataFrame where each row specifies which columns to convert df = pd.DataFrame({ "import_resselection_cols": [ ["fixed_residues", "motif_residues"], # row 0: convert two columns "['motif_residues']", # row 1: stringified list (from CSV) [] # row 2: nothing to convert ], "fixed_residues": [ "A12,A34,A56", # str -> ResidueSelection(str) None, # skipped "A1" ], "motif_residues": [ {"residues":[["A",164],["A",165],["A",166],["A",167]]}, # dict -> ResidueSelection(dict, from_scorefile=True) "B5-B9", # str -> ResidueSelection(str) {} ] }) poses = Poses(df) poses.convert_resselection_cols() # mutates poses.df in place # After this call: # - df.loc[0, "fixed_residues"] is a ResidueSelection instance # - df.loc[0, "motif_residues"] is a ResidueSelection instance (from dict) # - df.loc[1, "motif_residues"] is a ResidueSelection instance # - Row 2 remains unchanged due to empty selector and falsy cells Notes ----- - Missing target columns are not fatal; a warning is logged and processing continues. - When importing from CSV, stringified lists in ``resselection_col`` are parsed with ``ast.literal_eval``; malformed strings will raise ``ValueError`` or ``SyntaxError``. - ``ResidueSelection`` construction is delegated; any errors it raises will propagate. """ if not resselection_col in self.df.columns: return None for idx, cols in self.df[resselection_col].items(): # if input was a csv file, lists are imported as str if isinstance(cols, str) and cols.startswith("[") and cols.endswith("]"): cols = ast.literal_eval(cols) # skip if no resselection cols are defined if not cols: continue # check for wrong content in col if not isinstance(cols, (list, tuple)): raise KeyError(f"Could not import residue selection columns from {resselection_col}! ") for col in cols: if col not in self.df.columns: logging.warning(f"Could not find column {col} in poses dataframe for conversion to ResidueSelection for pose {self.df.at[idx, 'poses_description']}!") continue cell_value = self.df.at[idx, col] if cell_value: # skip if already a ResidueSelection (e.g. when importing from pickle) if isinstance(cell_value, ResidueSelection): continue # pandas 3 infers string-like columns as StringDtype by default, which # cannot hold ResidueSelection objects unless we widen the column first. if not pd.api.types.is_object_dtype(self.df[col].dtype): self.df[col] = self.df[col].astype(object) # if importing from csv if isinstance(cell_value, str): self.df.at[idx, col] = ResidueSelection(cell_value) # if importing from json if isinstance(cell_value, dict): self.df.at[idx, col] = ResidueSelection(cell_value, from_scorefile=True)
[docs] def split_multiline_fasta(self, path: str, encoding: str = "UTF-8") -> list[str]: """ Splits a multiline FASTA file into individual FASTA files, each containing a single sequence. Parameters ---------- path : str The path to the multiline FASTA file. encoding : str, optional The encoding of the FASTA file (default is "UTF-8"). Returns ------- list[str] A list of file paths to the individual FASTA files. Further Details --------------- This method reads a multiline FASTA file and splits it into individual FASTA files, each containing a single sequence. The individual FASTA files are stored in a subdirectory named 'input_fastas_split' within the working directory. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with a working directory poses_instance = Poses(work_dir='path/to/work_dir') # Split a multiline FASTA file individual_fasta_paths = poses_instance.split_multiline_fasta('path/to/multiline.fasta') Notes ----- - The method creates a subdirectory named 'input_fastas_split' within the working directory to store the individual FASTA files. - The descriptions in the FASTA file are sanitized to replace special characters with underscores. - Raises an AttributeError if the working directory is not set. """ logging.warning(f"Multiline Fasta detected as input to poses. Splitting up the multiline fasta into multiple poses. Split fastas are stored at work_dir/input_fastas/") if not self.work_dir: raise AttributeError("Set up a work_dir attribute (Poses.set_work_dir()) for your poses class.") # read multilie-fasta file and split into individual poses fasta_dict = parse_fasta_to_dict(path, encoding=encoding) # prepare descriptions in fasta_dict for writing: symbols_to_replace = r"[\/\-\:\ \.\|\,]" fasta_dict = {re.sub(symbols_to_replace, "_", description): seq for description, seq in fasta_dict.items()} # setup fasta directory self.work_dir/input_fastas_split/ output_dir = os.path.abspath(os.path.join(self.work_dir, "input_fastas_split")) if not os.path.isdir(output_dir): os.makedirs(output_dir, exist_ok=True) # write individual poses in fasta directory: out_poses = [] for description, seq in fasta_dict.items(): fp = os.path.join(output_dir, f"{description}.fa") try: # check if files are already there. If contents do not match, write the new fasta-file subfasta_dict = parse_fasta_to_dict(path, encoding=encoding) x_desc = list(subfasta_dict.keys())[0] x_seq = list(subfasta_dict.values())[0] if description != x_desc or seq != x_seq: raise FileNotFoundError except FileNotFoundError: with open(fp, 'w', encoding=encoding) as f: f.write(f">{description}\n{seq}") # add fasta path to out_poses: out_poses.append(fp) # return list containing paths to .fa files as poses. return out_poses
[docs] def determine_pose_type(self, pose_col: str = None) -> list: """ Determines the file types of the poses based on their extensions. Parameters ---------- pose_col : str, optional The column in the DataFrame containing the pose file paths (default is 'poses'). Returns ------- list A list of unique file extensions found in the pose file paths. Further Details --------------- This method extracts and identifies the file extensions of the pose file paths in the specified column. It returns a list of unique file extensions, which helps in understanding the types of files being managed. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some poses poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Determine the pose file types pose_types = poses_instance.determine_pose_type() Notes ----- - The method logs a warning if multiple file extensions are found. - If no file extensions are found, it logs a warning indicating the inability to determine file types. - Ensures that the returned list contains only unique file extensions. """ def extract_extension(file_path): _, ext = os.path.splitext(file_path) return ext pose_col = pose_col or 'poses' # extract extensions and create a set containing only unique values ext = list(set(self.df[pose_col].apply(extract_extension).to_list())) if len(ext) > 1: logging.warning(f"Multiple file extensions present in poses: {ext}") return ext if len(ext) == 1: if ext[0] == "": logging.warning(f"Could not determine file extension from poses!") else: logging.info(f"Poses identified as {ext} files") return ext return []
############################################ Input Methods ######################################
[docs] def load_poses(self, poses_path: str) -> "Poses": """ Loads poses from a specified file and updates the Poses instance. Parameters ---------- poses_path : str The path to the file containing the poses to be loaded. Returns ------- Poses The updated Poses instance with poses loaded from the specified file. Further Details --------------- This method reads a file containing poses and updates the Poses instance with the data. The file format is automatically detected based on the file extension, and the corresponding loading function is used to read the data into a DataFrame. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class poses_instance = Poses() # Load poses from a file poses_instance.load_poses('path/to/poses.json') Notes ----- - The method supports various file formats, including JSON, CSV, Pickle, Feather, and Parquet. - Ensures that the loaded DataFrame contains the necessary columns and updates the Poses instance accordingly. """ # read format load_function = get_format(poses_path) # load df from file: self.set_poses(poses=load_function(poses_path)) return self
############################################ Output Methods ######################################
[docs] def save_scores(self, out_path: str = None, out_format: str = None) -> None: """ Saves the scores DataFrame to a specified file path in the desired format. Parameters ---------- out_path : str, optional The file path where the scores will be saved. If not provided, the default scorefile path is used. out_format : str, optional The format in which to save the scores. If not provided, the default storage format is used. Further Details --------------- This method saves the scores DataFrame to the specified file path in the desired format. It ensures that the file name conforms to the specified format by appending the correct file extension if necessary. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses() # Save scores to a specific path in CSV format poses_instance.save_scores(out_path='path/to/scores.csv', out_format='csv') Notes ----- - Supports various file formats, including JSON, CSV, Pickle, Feather, and Parquet. - The method automatically appends the correct file extension if it is not already present in the out_path. - Ensures that the scores are saved in a format suitable for further analysis and processing. """ # setup defaults out_path = out_path or self.scorefile out_format = out_format or self.storage_format temp_df = class_in_df(self.df, ResidueSelection, "import_resselection_cols") # make sure the filename conforms to format if not out_path.endswith(f".{out_format}"): out_path += f".{out_format}" if (save_method_name := FORMAT_STORAGE_DICT.get(out_format.lower())): getattr(temp_df, save_method_name)(out_path)
[docs] def save_poses(self, out_path: str, poses_col: str = "poses", overwrite: bool = True) -> None: """ Saves the poses to a specified directory, with an option to overwrite existing files. Parameters ---------- out_path : str The directory where the poses will be saved. poses_col : str, optional The column in the DataFrame containing the pose file paths (default is 'poses'). overwrite : bool, optional If True, existing files in the target directory will be overwritten (default is True). Further Details --------------- This method saves the pose files to the specified directory. It copies the pose files from their current locations to the new directory, ensuring that the directory structure is maintained. The `overwrite` parameter controls whether existing files in the target directory are overwritten. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some poses poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Save poses to a new directory poses_instance.save_poses(out_path='path/to/new_poses_dir', overwrite=False) Notes ----- - The method ensures that the target directory exists, creating it if necessary. - If `overwrite` is set to False, the method skips saving poses that already exist in the target directory. - Logs the saving process, including any skipped files due to the overwrite setting. """ poses = self.df[poses_col].to_list() new_poses = [os.path.join(out_path, os.path.basename(pose)) for pose in poses] if not os.path.isdir(out_path): os.makedirs(out_path, exist_ok=True) # check if poses are already at out_path, skip if overwrite is set to False if all((os.path.isfile(pose) for pose in new_poses)) and not overwrite: logging.info(f"Poses already found at {out_path} and overwrite is set to 'False'. Skipping save_poses.") return # save poses logging.info(f"Storing poses from column {poses_col} at {out_path}") for pose, new_pose in zip(poses, new_poses): shutil.copy(pose, new_pose)
[docs] def poses_list(self) -> list[str]: """ Returns a list of pose file paths from the DataFrame. Returns ------- list A list of pose file paths. Further Details --------------- This method extracts the pose file paths from the 'poses' column of the DataFrame and returns them as a list. It provides a convenient way to access the stored pose file paths. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some poses poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Get the list of pose file paths pose_paths = poses_instance.poses_list() Notes ----- - The method assumes that the 'poses' column exists in the DataFrame. - Provides a simple way to retrieve all pose file paths managed by the Poses instance. """ return self.df["poses"].to_list()
########################################## Operations ###############################################
[docs] def get_pose(self, pose_description: str, all_models: bool = False) -> Bio.PDB.Model.Model | Bio.PDB.Structure.Structure: """ Retrieves a pose structure based on its description. Parameters ---------- pose_description : str The description of the pose to be retrieved. all_models : bool, optional If all models in the input PDB should be returned (all_models = True) or just the first (all_models = False). If False, a Bio.PDB Model is returned, if True, a Bio.PDB Structure is returned. Returns ------- Bio.PDB.Model.Model or Bio.PDB.Structure.Structure The Bio.PDB Model or Structure object corresponding to the specified pose description. Raises ------ KeyError If the pose description is not found in the poses DataFrame. Further Details --------------- This method locates the pose file based on its description and loads it as a Bio.PDB Structure object. It is useful for accessing specific pose structures for further analysis or manipulation. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some poses poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Retrieve a specific pose structure pose_structure = poses_instance.get_pose('pose1') Notes ----- - The method uses the 'poses_description' column to locate the specified pose. - Ensures that the returned pose is loaded as a Bio.PDB Structure object for further processing. """ if pose_description not in self.df["poses_description"].to_list(): raise KeyError(f"Pose {pose_description} not Found in Poses DataFrame!") return load_structure_from_pdbfile(self.df[self.df["poses_description"] == pose_description]["poses"].values[0], all_models=all_models)
[docs] def reindex_poses(self, prefix:str, group_col:str=None, remove_layers:int=None, force_reindex:bool=False, sep:str="_", overwrite:bool=False) -> None: """ Removes index layers from poses. Saves reindexed poses to an output directory. Parameters ---------- prefix : str The directory where the duplicated poses will be saved and the prefix for the DataFrame columns containing the original paths and descriptions. group_col : str, optional The poses dataframe column on which to group to create new descriptions. Must be a column in 'poses_description' or 'poses' format (e.g. from a previous state, before runners appended index layers) remove_layers : int, optional The number of index layers to remove. force_reindex : bool, optional Add a new index layer to all poses. sep : str, optional The separator used to split the description column into layers. Further Details --------------- This method removes index layers from poses (_0001, _0002, etc). If a group column is provided, the poses are assigned names according to the group. If remove_layers is above 0, subtracts the set number of layers from the description column and groups the poses accordingly. If force_reindex is True, adds one index layer to all poses. Notes ----- - The method creates the output directory if it does not exist. - Raises a KeyError if both group_col and remove_layers are set.. - Raises a RuntimeError if multiple poses with identical description after index layer removal are found and force_reindex is False.. """ if group_col and remove_layers: raise KeyError("<group_col> and <remove_layers> are mutually exclusive!") out_dir = os.path.join(self.work_dir, prefix) os.makedirs(out_dir, exist_ok=True) self.df[f"{prefix}_pre_reindexing_poses_description"] = self.df['poses_description'] self.df[f"{prefix}_pre_reindexing_poses"] = self.df['poses'] if remove_layers == 0: remove_layers = None # create temporary description column with removed index layers if remove_layers: if not isinstance(remove_layers, int): raise TypeError(f"ERROR: only value of type 'int' allowed for remove_layers. You set it to {type(remove_layers)}") self.df["tmp_layer_column"] = self.df['poses_description'].str.split(sep).str[:-1*int(remove_layers)].str.join(sep) elif group_col: col_in_df(self.df, group_col) self.df["tmp_layer_column"] = [description_from_path(path) for path in self.df[group_col]] else: self.df["tmp_layer_column"] = self.df['poses_description'] self.df.sort_values(["tmp_layer_column", "poses_description"], inplace=True) # sort to make sure that all poses are in the same order after grouping # group by temporary description column, reindex out = [] if any([len(group_df.index) > 1 for name, group_df in self.df.groupby("tmp_layer_column", sort=False)]) and not force_reindex: raise RuntimeError(f'Multiple files with identical description found after removing index layers. Set <force_reindex> to True if new index layers should be added.') for name, group_df in self.df.groupby("tmp_layer_column", sort=False): group_df.reset_index(drop=True, inplace=True) # resetting index, otherwise index of original poses df would be used # adding new index layer since multiple for i, ser in group_df.iterrows(): ext = os.path.splitext(ser['poses'])[1] if force_reindex: description = f"{name}{sep}{str(i+1).zfill(4)}" else: description = name path = os.path.join(out_dir, f"{description}{ext}") if overwrite == True or not os.path.isfile(path): shutil.copy(ser['poses'], path) ser['poses'] = path ser['poses_description'] = description out.append(ser) self.df = pd.DataFrame(out) self.df.reset_index(inplace=True, drop=True) # drop temporary description column self.df.drop("tmp_layer_column", inplace=True, axis=1)
[docs] def duplicate_poses(self, output_dir: str, n_duplicates: int, overwrite: bool = False) -> None: """ Duplicates poses a specified number of times and saves them to an output directory. Parameters ---------- output_dir : str The directory where the duplicated poses will be saved. n_duplicates : int The number of duplicates to create for each pose. Further Details --------------- This method creates multiple copies of each pose file and saves them to the specified output directory. The duplicated files are named with an incremented index to distinguish them. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some poses poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Duplicate the poses poses_instance.duplicate_poses(output_dir='path/to/duplicates', n_duplicates=3) Notes ----- - The method creates the output directory if it does not exist. - Ensures that the duplicated files have unique names by appending an index. - Logs the duplication process and verifies the creation of duplicate files. """ def _insert_index_layer(dir_: str, input_path:str, n:int, sep:str="_") -> str: in_file = os.path.basename(input_path) description, extension = os.path.splitext(in_file) out_path = os.path.join(dir_, f"{description}{sep}{str(n).zfill(4)}{extension}") return out_path # create output directory os.makedirs(output_dir, exist_ok=True) # iterate over poses and copy them to new location with one additional index layer duplicates = [] for n in range(1, n_duplicates+1): new_df = self.df.copy(deep=True) new_paths = [_insert_index_layer(output_dir, pose, n, "_") for pose in new_df["poses"].to_list()] new_descriptions = [description_from_path(path) for path in new_paths] for old_pose, new_pose in zip(new_df["poses"].to_list(), new_paths): if overwrite or not os.path.isfile(new_pose): shutil.copy(old_pose, new_pose) new_df["poses"] = new_paths new_df["poses_description"] = new_descriptions duplicates.append(new_df) self.df = pd.concat(duplicates) self.df.reset_index(drop=True, inplace=True)
[docs] def reset_poses(self, new_poses_col: str='input_poses', force_reset_df: bool=False): """ Resets the poses DataFrame to the original input poses, with an option to force reset. Parameters ---------- new_poses_col : str, optional The column in the DataFrame containing the new pose file paths (default is 'input_poses'). force_reset_df : bool, optional If True, forces a reset of the DataFrame even if the number of new poses does not match the original (default is False). Further Details --------------- This method resets the poses DataFrame to use the original input poses. It handles multiline FASTA inputs and ensures that the DataFrame structure is preserved or reset based on the force_reset_df parameter. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some poses poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Reset the poses to the original input poses poses_instance.reset_poses() Notes ----- - The method ensures that the new poses are unique and properly formatted. - Raises a RuntimeError if the number of new poses does not match the original and force_reset_df is False. - Logs warnings and information about the reset process, ensuring data integrity. """ def unique_ordered_list(original_list): seen = set() # Initialize an empty set to track seen elements unique_list = [] for item in original_list: if item not in seen: # Check membership in the set, which is O(1) (faster lookup in sets than in lists) unique_list.append(item) seen.add(item) # Add the item to the set return unique_list col_in_df(self.df, new_poses_col) new_poses = self.df[new_poses_col].to_list() # handle multiline .fa inputs for poses! for pose in new_poses: if not pose.endswith(".fa") and not pose.endswith(".fasta"): continue if len(parse_fasta_to_dict(pose)) > 1: new_poses.remove(pose) new_poses += self.split_multiline_fasta(pose) # create unique poses new_poses = unique_ordered_list(new_poses) if not len(new_poses) == len(self.df.index): logging.warning(f"Different number of new poses ({len(new_poses)}) than number of original poses ({len(self.df.index)})!") if force_reset_df: logging.warning(f"Resetting poses dataframe. Be aware of the consequences like possibly reading in false outputs when reusing prefixes!") self.df = pd.DataFrame({"input_poses": new_poses, "poses": new_poses, "poses_description": self.parse_descriptions(new_poses)}) else: raise RuntimeError(f"Could not preserve original dataframe. You can set <force_reset_df> if you want to delete it, but be aware of the consequences like possibly reading in false outputs when reusing prefixes!") else: self.df['poses'] = new_poses self.df['poses_description'] = self.parse_descriptions(poses=self.df['poses'].to_list())
[docs] def set_motif(self, motif_col: str) -> None: """ Sets a motif column in the poses DataFrame for further analysis. Parameters ---------- motif_col : str The column in the DataFrame containing the motifs to be set. Raises ------ KeyError If the specified motif column is not found in the poses DataFrame. TypeError If the objects in the specified motif column are not of type ResidueSelection. Further Details --------------- This method sets a column in the poses DataFrame to be used as motifs for further analysis. The motifs must be instances of the ResidueSelection class. Example ------- .. code-block:: python from poses import Poses from protflow.residues import ResidueSelection # Initialize the Poses class with some poses poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Assume we have a column 'motifs' with ResidueSelection objects poses_instance.set_motif('motifs') Notes ----- - The method ensures that the specified column exists and contains ResidueSelection objects. - Logs any errors encountered during the process for easier debugging and verification. """ # check if motif_col exists. check if all entries in motif col are ResidueSelection objects. col_in_df(self.df, motif_col) if not all([isinstance(motif, ResidueSelection) for motif in self.df[motif_col].to_list()]): raise TypeError(f"Setting a motif requires the objects in 'motif_col' to be of type ResidueSelection. Check documentation of protflow.residues module for how to create the object (it's simple).") # set motif self.motifs.append(motif_col)
[docs] def convert_pdb_to_fasta(self, prefix: str, update_poses: bool = False, chain_sep: str = ":") -> None: """ Converts PDB pose files to FASTA format and optionally updates the poses. Paths to fasta location are saved in poses dataframe under column <prefix>_fasta_location. Parameters ---------- prefix : str The prefix used for naming the output FASTA files. update_poses : bool, optional If True, updates the poses DataFrame to use the new FASTA files (default is False). chain_sep : str, optional The separator used for chain identifiers in the FASTA file (default is ":"). Raises ------ RuntimeError If the poses are not of type PDB. Further Details --------------- This method converts PDB pose files to FASTA format and stores them in a directory named with the given prefix. It can also update the poses DataFrame to use the new FASTA files if specified. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some PDB poses poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Convert the PDB files to FASTA format poses_instance.convert_pdb_to_fasta(prefix='converted', update_poses=True) Notes ----- - The method checks that the poses are of type PDB before conversion. - Creates a new directory within the working directory to store the FASTA files. - Logs the conversion process and verifies the creation of FASTA files. """ if not self.determine_pose_type() == ['.pdb']: raise RuntimeError(f"Poses must be of type .pdb, not {self.determine_pose_type()}") os.makedirs(fasta_dir := os.path.join(self.work_dir, f'{prefix}_fasta_location'), exist_ok=True) seqs = [get_sequence_from_pose(load_structure_from_pdbfile(path_to_pdb=pose), chain_sep=chain_sep) for pose in self.df['poses'].to_list()] fasta_paths = [] for name, seq in zip(self.df['poses_description'].to_list(), seqs): fasta_path = os.path.join(fasta_dir, f'{name}.fasta') fasta_paths.append(fasta_path) with open(fasta_path, 'w', encoding="UTF-8") as f: f.write(f">{name}\n{seq}") self.df[f'{prefix}_fasta_location'] = fasta_paths if update_poses: self.df['poses'] = fasta_paths
########################################## Filtering ###############################################
[docs] def filter_poses_by_rank(self, n: float, score_col: str, group_col: str = None, remove_layers: int = None, layer_col: str = "poses_description", sep = "_", ascending: bool = True, prefix: str = None, plot: bool = False, plot_cols: list[str] = None, overwrite: bool = True, storage_format: str = None) -> "Poses": """ Filters poses based on their rank in a specified score column, with options to handle layers and generate plots. Parameters ---------- n : float The number of top-ranked poses to keep. If n < 1, it represents a fraction of the total poses. score_col : str The column in the DataFrame containing the scores used for ranking. group_col : str, optional Group dataframe by this column and filter individual groups. remove_layers : int, optional The number of layers to remove from the pose descriptions before ranking. This helps in grouping similar poses. layer_col : str, optional The column used for layer-based grouping of poses (default is "poses_description"). sep : str, optional The separator used in the layer descriptions (default is "_"). ascending : bool, optional If True, ranks poses in ascending order of scores; otherwise, in descending order (default is True). prefix : str, optional The prefix used for naming the output filtered poses file and plot. plot : bool, optional If True, generates a plot comparing scores before and after filtering (default is False). plot_cols : list[str], optional Add additional plotting data to the output filtering plot. overwrite : bool, optional If True, overwrites existing filtered poses files (default is True). storage_format : str, optional The format used for storing the filtered poses (default is None, which uses the existing storage format). Returns ------- Poses The updated Poses instance with filtered poses. Further Details --------------- This method filters the poses DataFrame to retain only the top-ranked poses based on their scores. It supports fractional ranking, layer-based grouping, and optional plot generation for visualizing the filtering process. The filtered poses can be saved to a file with a specified prefix and storage format. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Filter poses by rank poses_instance.filter_poses_by_rank(n=10, score_col='score', prefix='top_poses', plot=True) Notes ----- - The method creates a filtered poses file and an optional plot in the specified working directory. - Ensures that the DataFrame is properly sorted and filtered based on the provided parameters. - Logs the filtering process, including any errors or warnings related to the ranking criteria. """ if group_col and remove_layers: raise KeyError("<group_col> and <remove_layers> are mutually exclusive!") # define filter output if <prefix> is provided, make sure output directory exists if prefix: if self.filter_dir is None: raise AttributeError("Filter directory was not set! Did you set a working directory? work_dir can be set with Poses.set_work_dir() and sets up a filter_dir automatically.") os.makedirs(self.filter_dir, exist_ok=True) # make sure output format is available storage_format = storage_format or self.storage_format if storage_format not in FORMAT_STORAGE_DICT: raise KeyError(f"Format {storage_format} not available. Format must be on of {list(FORMAT_STORAGE_DICT)}") # set filter output name output_name = os.path.join(self.filter_dir, f"{prefix}_filter.{storage_format}") # load previous filter output if it exists and <overwrite> = False, set poses_df as filtered dataframe and return filtered dataframe if not overwrite and os.path.isfile(output_name): filter_df = get_format(output_name)(output_name) self.df = filter_df return filter_df # Filter df down to the number of poses specified with <n> orig_len = str(len(self.df)) filter_df = filter_dataframe_by_rank(df=self.df, col=score_col, n=n, group_col=group_col, remove_layers=remove_layers, layer_col=layer_col, sep=sep, ascending=ascending).reset_index(drop=True) logging.info(f"Filtered poses from {orig_len} to {str(len(filter_df))} poses according to {score_col}.") # save filtered dataframe if prefix is provided if prefix: logging.info(f"Saving filter output to {output_name}.") save_method_name = FORMAT_STORAGE_DICT.get(storage_format) getattr(filter_df, save_method_name)(output_name) # create filter-plots if specified. if plot: if not prefix: raise RuntimeError(f"<prefix> was not set, but is mandatory for plotting!") if self.plots_dir is None: raise AttributeError(f"Plots directory was not set! Did you set a working directory?") os.makedirs(self.plots_dir, exist_ok=True) out_path = os.path.join(self.plots_dir, f"{prefix}_filter.png") logging.info(f"Creating filter plot at {out_path}.") if plot_cols: [col_in_df(self.df, col) for col in plot_cols] cols = [score_col] + plot_cols else: cols = [score_col] plots.violinplot_multiple_cols_dfs( dfs=[self.df, filter_df], df_names=["Before Filtering", "After Filtering"], cols=cols, y_labels=cols, out_path=out_path, show_fig=False ) # update object attributs [df] self.df = filter_df logging.info(f"Filtering completed.") return self
[docs] def filter_poses_by_value(self, score_col: str, value, operator: str, prefix: str = None, plot: bool = False, plot_cols: list[str] = None, overwrite: bool = True, storage_format: str = None, fail_on_empty: bool = True) -> "Poses": """ Filters poses based on a specified value in a score column, with options to generate plots. Parameters ---------- score_col : str The column in the DataFrame containing the scores used for filtering. value : float or int The value used as the threshold for filtering poses. operator : str The comparison operator used for filtering ('>', '>=', '<', '<=', '=', '!='). prefix : str, optional The prefix used for naming the output filtered poses file and plot. plot : bool, optional If True, generates a plot comparing scores before and after filtering (default is False). plot_cols : list[str], optional Add additional plotting data to the output filtering plot. overwrite : bool, optional If True, overwrites existing filtered poses files (default is True). storage_format : str, optional The format used for storing the filtered poses (default is None, which uses the existing storage format). Returns ------- Poses The updated Poses instance with filtered poses. Raises ------ ValueError If all poses are removed based on the filtering criteria. Further Details --------------- This method filters the poses DataFrame based on a specified value in a score column, using the provided comparison operator. It supports optional plot generation for visualizing the filtering process and allows saving the filtered poses to a file with a specified prefix and storage format. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses(poses=['path/to/pose1.pdb', 'path/to/pose2.pdb']) # Filter poses by value poses_instance.filter_poses_by_value(score_col='score', value=0.5, operator='>', prefix='filtered_poses', plot=True) Notes ----- - The method creates a filtered poses file and an optional plot in the specified working directory. - Ensures that the DataFrame is properly filtered based on the provided criteria. - Logs the filtering process, including any errors or warnings related to the filtering criteria. - Raises a ValueError if the filtering criteria remove all poses, ensuring that the Poses instance retains valid data. """ logging.info(f"Filtering poses according to column {score_col} with operator {operator} and target value {value}") # define filter output if <prefix> is provided, make sure output directory exists if prefix: if self.filter_dir is None: raise AttributeError(f"Filter directory was not set! Did you set a working directory?") os.makedirs(self.filter_dir, exist_ok=True) # make sure output format is available storage_format = storage_format or self.storage_format if storage_format not in FORMAT_STORAGE_DICT: raise KeyError(f"Format {storage_format} not available. Format must be one of {list(FORMAT_STORAGE_DICT)}") # set filter output name output_name = os.path.join(self.filter_dir, f"{prefix}_filter.{storage_format}") # load previous filter output if it exists and <overwrite> = False, set poses_df as filtered dataframe and return filtered dataframe if not overwrite and os.path.isfile(output_name): filter_df = get_format(output_name)(output_name) self.df = filter_df return filter_df # Filter df down to the number of poses specified with <n> orig_len = len(self.df) filter_df = filter_dataframe_by_value(df=self.df, col=score_col, value=value, operator=operator).reset_index(drop=True) # make sure there are still poses left in the Poses class. if len(filter_df) == 0: logging.warning(f"All poses removed from Poses object. No pose fullfills the filtering criterium {operator} {value} for score {score_col}") if fail_on_empty: raise ValueError(f"None of your poses pass the filter {score_col}{operator}{value}. This filter operation removes all poses from your poses.df! Either choose a less stringent filter, or set the parameter 'reject_zero=False'.") logging.info(f"Filtered poses from {orig_len} to {len(filter_df.index)} poses.") # save filtered dataframe if prefix is provided if prefix: logging.info(f"Saving filter output to {output_name}.") save_method_name = FORMAT_STORAGE_DICT.get(storage_format) getattr(filter_df, save_method_name)(output_name) if plot: if not prefix: raise RuntimeError(f"<prefix> was not set, but is mandatory for plotting!") if self.plots_dir is None: raise AttributeError(f"Plots directory was not set! Did you set a working directory?") os.makedirs(self.plots_dir, exist_ok=True) out_path = os.path.join(self.plots_dir, f"{prefix}_filter.png") logging.info(f"Creating filter plot at {out_path}.") if plot_cols: [col_in_df(self.df, col) for col in plot_cols] cols = [score_col] + plot_cols else: cols = [score_col] plots.violinplot_multiple_cols_dfs( dfs=[self.df, filter_df], df_names=["Before Filtering", "After Filtering"], cols=cols, y_labels=cols, out_path=out_path, show_fig=False ) # update object attributs [df] self.df = filter_df logging.info(f"Filtering completed.") return self
########################################## Score manipulation ###############################################
[docs] def calculate_composite_score(self, name: str, scoreterms: list[str], weights: list[float], plot: bool = False, scale_output: bool = False) -> "Poses": """ Calculates a composite score from specified score columns, applying weights and normalization, and optionally generates a plot. Parameters ---------- name : str The name of the new composite score column to be created. scoreterms : list[str] The list of score columns to be included in the composite score. weights : list[float] The list of weights corresponding to each score column. plot : bool, optional If True, generates a plot of the composite score and the individual score terms (default is False). scale_output : bool, optional If True, scales the composite score to a range between 0 and 1 (default is False). Returns ------- Poses The updated Poses instance with the new composite score column. Raises ------ ValueError If the number of scoreterms and weights do not match. TypeError If any score column contains non-numeric values. Further Details --------------- This method calculates a composite score from multiple score columns by applying the specified weights and normalizing the columns. The normalization process involves subtracting the median and dividing by the standard deviation for each score column. Optionally, the composite score can be scaled to a range between 0 and 1. The method ensures that each score column contains numeric values and applies the normalization process as follows: 1. Calculate the median and standard deviation of each score column. 2. Normalize the column by subtracting the median and dividing by the standard deviation. 3. Optionally scale the normalized values to a range between 0 and 1. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses() # Calculate a composite score poses_instance.calculate_composite_score( name='composite_score', scoreterms=['score1', 'score2'], weights=[0.5, 0.5], plot=True, scale_output=True ) Notes ----- - The method ensures that the number of scoreterms and weights match. - Normalization helps in making the scores comparable by removing scale differences. - Generates a violin plot if the plot parameter is set to True, showing the distribution of the composite score and individual score terms. """ logging.info(f"Creating composite score {name} for scoreterms {scoreterms} with weights {weights}") # check if output column already exists in dataframe if name in self.df: logging.warning(f"Column {name} already exists in poses dataframe! It will be overwritten!") # calculate composite score self.df[name] = combine_dataframe_score_columns(df=self.df, scoreterms=scoreterms, weights=weights, scale=scale_output) if plot: if self.plots_dir is None: raise AttributeError(f"Plots directory was not set! Did you set a working directory?") os.makedirs(self.plots_dir, exist_ok=True) out_path = os.path.join(self.plots_dir, f"{name}_comp_score.png") logging.info(f"Creating composite score plot at {out_path}.") plot_scoreterms = scoreterms + [name] plots.violinplot_multiple_cols( dataframe=self.df, cols=plot_scoreterms, titles=plot_scoreterms, y_labels=plot_scoreterms, dims=None, out_path=out_path, show_fig=False ) self.save_scores() logging.info("Composite score creation completed.") return self
[docs] def calculate_mean_score(self, name: str, score_col: str, skipna: bool = False, remove_layers: int = None, sep: str = "_"): """ Calculate the mean score of the selected score column. If remove_layers is set, calculates mean scores over poses grouped by the description column with the set number of index layers removed. Parameters ---------- name : str The name of the new column where the mean scores will be stored. score_col : str The name of the column from which to calculate the mean scores. skipna : bool, optional Whether to skip NA/null values. Default is False. remove_layers : int, optional The number of layers to remove from the index for grouping. If None, no layers are removed. Default is None. sep : str, optional The separator used in the 'poses_description' column for splitting and joining layers. Default is "_". Returns ------- self The instance of the class with the mean scores added to the DataFrame. Raises ------ TypeError If `remove_layers` is not an integer. ValueError If `score_col` does not exist in the DataFrame. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses() # Calculate the mean score poses_instance.calculate_mean_score( name='mean_score1', score_col='score1', skipna=True, remove_layers=1, ) """ col_in_df(self.df, score_col) df_layers = self.df.copy() if remove_layers == 0: remove_layers = None # create temporary description column with removed index layers if remove_layers: if not isinstance(remove_layers, int): raise TypeError(f"ERROR: only value of type 'int' allowed for remove_layers. You set it to {type(remove_layers)}") df_layers["tmp_layer_column"] = df_layers['poses_description'].str.split(sep).str[:-1*int(remove_layers)].str.join(sep) else: self.df["tmp_layer_column"] = self.df['poses_description'] df = [] for _, group_df in df_layers.groupby("tmp_layer_column", sort=False): group_df[name] = group_df[score_col].mean(skipna=skipna) df.append(group_df) df = pd.concat(df).reset_index(drop=True) df = df[['poses_description', name]] # drop temporary description column self.df = self.df.merge(df, on='poses_description') return self
[docs] def calculate_median_score(self, name: str, score_col: str, skipna: bool = False, remove_layers: int = None, sep: str = "_"): """ Calculate the median score of the selected score column. If remove_layers is set, calculates median scores over poses grouped by the description column with the set number of index layers removed. Parameters ---------- name : str The name of the new column where the mean scores will be stored. score_col : str The name of the column from which to calculate the median scores. skipna : bool, optional Whether to skip NA/null values. Default is False. remove_layers : int, optional The number of layers to remove from the index for grouping. If None, no layers are removed. Default is None. sep : str, optional The separator used in the 'poses_description' column for splitting and joining layers. Default is "_". Returns ------- self The instance of the class with the mean scores added to the DataFrame. Raises ------ TypeError If `remove_layers` is not an integer. ValueError If `score_col` does not exist in the DataFrame. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses() # Calculate the median score poses_instance.calculate_median_score( name='median_score1', score_col='score1', skipna=True, remove_layers=1, ) """ col_in_df(self.df, score_col) df_layers = self.df.copy() if remove_layers == 0: remove_layers = None # create temporary description column with removed index layers if remove_layers: if not isinstance(remove_layers, int): raise TypeError(f"ERROR: only value of type 'int' allowed for remove_layers. You set it to {type(remove_layers)}") df_layers["tmp_layer_column"] = df_layers['poses_description'].str.split(sep).str[:-1*int(remove_layers)].str.join(sep) else: self.df["tmp_layer_column"] = self.df['poses_description'] df = [] for _, group_df in df_layers.groupby("tmp_layer_column", sort=False): group_df[name] = group_df[score_col].median(skipna=skipna) df.append(group_df) df = pd.concat(df).reset_index(drop=True) df = df[['poses_description', name]] # drop temporary description column self.df = self.df.merge(df, on='poses_description') return self
[docs] def calculate_std_score(self, name: str, score_col: str, skipna: bool = False, remove_layers: int = None, sep: str = "_"): """ Calculate the standard deviation of the selected score column. If remove_layers is set, calculates standard deviations over poses grouped by the description column with the set number of index layers removed. Parameters ---------- name : str The name of the new column where the mean scores will be stored. score_col : str The name of the column from which to calculate the standard deviation. skipna : bool, optional Whether to skip NA/null values. Default is False. remove_layers : int, optional The number of layers to remove from the index for grouping. If None, no layers are removed. Default is None. sep : str, optional The separator used in the 'poses_description' column for splitting and joining layers. Default is "_". Returns ------- self The instance of the class with the mean scores added to the DataFrame. Raises ------ TypeError If `remove_layers` is not an integer. ValueError If `score_col` does not exist in the DataFrame. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses() # Calculate the standard deviation poses_instance.calculate_std_score( name='mean_score1', score_col='score1', skipna=True, remove_layers=1, ) """ col_in_df(self.df, score_col) df_layers = self.df.copy() if remove_layers == 0: remove_layers = None # create temporary description column with removed index layers if remove_layers: if not isinstance(remove_layers, int): raise TypeError(f"ERROR: only value of type 'int' allowed for remove_layers. You set it to {type(remove_layers)}") df_layers["tmp_layer_column"] = df_layers['poses_description'].str.split(sep).str[:-1*int(remove_layers)].str.join(sep) else: self.df["tmp_layer_column"] = self.df['poses_description'] df = [] for _, group_df in df_layers.groupby("tmp_layer_column", sort=False): group_df[name] = group_df[score_col].std(skipna=skipna) df.append(group_df) df = pd.concat(df).reset_index(drop=True) df = df[['poses_description', name]] # drop temporary description column self.df = self.df.merge(df, on='poses_description') return self
[docs] def calculate_max_score(self, name: str, score_col: str, skipna: bool = False, remove_layers: int = None, sep: str = "_"): """ Calculate the maximum value of the selected score column. If remove_layers is set, calculates the maximum value over poses grouped by the description column with the set number of index layers removed. Parameters ---------- name : str The name of the new column where the maximum values will be stored. score_col : str The name of the column from which to calculate the maximum value. skipna : bool, optional Whether to skip NA/null values. Default is False. remove_layers : int, optional The number of layers to remove from the index for grouping. If None, no layers are removed. Default is None. sep : str, optional The separator used in the 'poses_description' column for splitting and joining layers. Default is "_". Returns ------- self The instance of the class with the maximum values added to the DataFrame. Raises ------ TypeError If `remove_layers` is not an integer. ValueError If `score_col` does not exist in the DataFrame. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses() # Calculate the maximum values poses_instance.calculate_max_score( name='max_score1', score_col='score1', skipna=True, remove_layers=1, ) """ col_in_df(self.df, score_col) df_layers = self.df.copy() if remove_layers == 0: remove_layers = None # create temporary description column with removed index layers if remove_layers: if not isinstance(remove_layers, int): raise TypeError(f"ERROR: only value of type 'int' allowed for remove_layers. You set it to {type(remove_layers)}") df_layers["tmp_layer_column"] = df_layers['poses_description'].str.split(sep).str[:-1*int(remove_layers)].str.join(sep) else: self.df["tmp_layer_column"] = self.df['poses_description'] df = [] for _, group_df in df_layers.groupby("tmp_layer_column", sort=False): group_df[name] = group_df[score_col].max(skipna=skipna) df.append(group_df) df = pd.concat(df).reset_index(drop=True) df = df[['poses_description', name]] # drop temporary description column self.df = self.df.merge(df, on='poses_description') return self
[docs] def calculate_min_score(self, name: str, score_col: str, skipna: bool = False, remove_layers: int = None, sep: str = "_"): """ Calculate the minimum value of the selected score column. If remove_layers is set, calculates the maximum value over poses grouped by the description column with the set number of index layers removed. Parameters ---------- name : str The name of the new column where the minimum values will be stored. score_col : str The name of the column from which to calculate the minimum value. skipna : bool, optional Whether to skip NA/null values. Default is False. remove_layers : int, optional The number of layers to remove from the index for grouping. If None, no layers are removed. Default is None. sep : str, optional The separator used in the 'poses_description' column for splitting and joining layers. Default is "_". Returns ------- self The instance of the class with the minimum values added to the DataFrame. Raises ------ TypeError If `remove_layers` is not an integer. ValueError If `score_col` does not exist in the DataFrame. Example ------- .. code-block:: python from poses import Poses # Initialize the Poses class with some scores poses_instance = Poses() # Calculate the minimum values poses_instance.calculate_min_score( name='min_score1', score_col='score1', skipna=True, remove_layers=1, ) """ col_in_df(self.df, score_col) df_layers = self.df.copy() if remove_layers == 0: remove_layers = None # create temporary description column with removed index layers if remove_layers: if not isinstance(remove_layers, int): raise TypeError(f"ERROR: only value of type 'int' allowed for remove_layers. You set it to {type(remove_layers)}") df_layers["tmp_layer_column"] = df_layers['poses_description'].str.split(sep).str[:-1*int(remove_layers)].str.join(sep) else: self.df["tmp_layer_column"] = self.df['poses_description'] df = [] for _, group_df in df_layers.groupby("tmp_layer_column", sort=False): group_df[name] = group_df[score_col].min(skipna=skipna) df.append(group_df) df = pd.concat(df).reset_index(drop=True) df = df[['poses_description', name]] # drop temporary description column self.df = self.df.merge(on='poses_description') return self
[docs] def normalize_series(ser: pd.Series, scale: bool = False) -> pd.Series: """ Normalizes a pandas Series by subtracting the median and dividing by the standard deviation, with an option to scale the values. Parameters ---------- ser : pd.Series The pandas Series to be normalized. scale : bool, optional If True, scales the normalized values to a range between 0 and 1 (default is False). Returns ------- pd.Series The normalized (and optionally scaled) Series. Further Details --------------- This function normalizes a pandas Series by first subtracting the median and then dividing by the standard deviation. If the `scale` parameter is set to True, the normalized values are further scaled to a range between 0 and 1. This normalization process centers the data around zero and adjusts for variability, making the values comparable. Example ------- .. code-block:: python import pandas as pd from poses import normalize_series # Create a sample pandas Series sample_series = pd.Series([10, 20, 30, 40, 50]) # Normalize the Series normalized_series = normalize_series(sample_series, scale=True) Notes ----- - If all values in the Series are the same, the function returns a Series of zeros. - The optional scaling step ensures that the values are adjusted to a standardized range. """ ser = ser.copy() # calculate median and standard deviation median = ser.median() std = ser.std() # check if all values in <score_col> are the same, return 0 if yes if ser.nunique() == 1: ser[:] = 0 return ser # normalize score by subtracting median and dividing by standard deviation ser = (ser - median) / std # scale output to values between 0 and 1 if scale: ser = scale_series(ser) return ser
[docs] def scale_series(ser: pd.Series) -> pd.Series: """ Scales a pandas Series to a range between 0 and 1. Parameters ---------- ser : pd.Series The pandas Series to be scaled. Returns ------- pd.Series The scaled Series with values between 0 and 1. Further Details --------------- This function scales a pandas Series to a range between 0 and 1. It ensures that the minimum value in the Series becomes 0 and the maximum value becomes 1, with all other values adjusted proportionately. Example ------- .. code-block:: python import pandas as pd from poses import scale_series # Create a sample pandas Series sample_series = pd.Series([10, 20, 30, 40, 50]) # Scale the Series scaled_series = scale_series(sample_series) Notes ----- - If all values in the Series are the same, the function returns a Series of zeros. - The scaling process adjusts the values to fit within a standardized range, making them comparable. """ ser = ser.copy() # check if all values in <score_col> are the same, set all values to 0 if yes as no scaling is possible if ser.nunique() == 1: return pd.Series(0, index=ser.index) # scale all values between 0 and 1 return (ser - ser.min()) / (ser.max() - ser.min())
[docs] def combine_dataframe_score_columns(df: pd.DataFrame, scoreterms: list[str], weights: list[float], scale: bool = False) -> pd.Series: """ Combines multiple score columns in a DataFrame into a single composite score, applying weights and normalization. Parameters ---------- df : pd.DataFrame The DataFrame containing the score columns. scoreterms : list[str] The list of score columns to be combined. weights : list[float] The list of weights corresponding to each score column. scale : bool, optional If True, scales the composite score to a range between 0 and 1 (default is False). Returns ------- pd.Series The composite score as a pandas Series. Raises ------ ValueError If the number of scoreterms and weights do not match. TypeError If any score column contains non-numeric values. Further Details --------------- This function combines multiple score columns in a DataFrame into a single composite score. Each score column is normalized by subtracting the median and dividing by the standard deviation. The normalized scores are then weighted according to the specified weights and summed to create the composite score. Optionally, the composite score can be scaled to a range between 0 and 1. Example ------- .. code-block:: python import pandas as pd from poses import combine_dataframe_score_columns # Create a sample DataFrame data = { 'score1': [10, 20, 30, 40, 50], 'score2': [15, 25, 35, 45, 55] } df = pd.DataFrame(data) # Combine score columns into a composite score composite_score = combine_dataframe_score_columns(df, scoreterms=['score1', 'score2'], weights=[0.5, 0.5], scale=True) Notes ----- - The method ensures that the number of scoreterms and weights match. - Normalization helps in making the scores comparable by removing scale differences. - Raises a ValueError if the number of scoreterms and weights do not match, ensuring correct input. - The optional scaling step ensures that the composite score remains within a standardized range. """ if not len(scoreterms) == len(weights): raise ValueError(f"Number of scoreterms ({len(scoreterms)}) and weights ({len(weights)}) must be equal!") df = df.copy() for col in scoreterms: # check if column contains only floats or integers, raise an error otherwise df[col] = pd.to_numeric(df[col], errors='coerce') if df[col].isna().any(): raise ValueError(f"Column {col} must only contain float or integers!") # normalize scoreterm df[col] = normalize_series(ser=df[col], scale=False) # combine weighted scores combined_col = sum((df[col]*weight for col, weight in zip(scoreterms, weights))) return scale_series(combined_col) if scale else combined_col
[docs] def get_format(path: str): """ Returns the appropriate pandas function to load a file based on its extension. Parameters ---------- path : str The path to the file whose format needs to be determined. Returns ------- function The pandas function corresponding to the file format (e.g., pd.read_json, pd.read_csv). Further Details --------------- This function determines the appropriate pandas function to use for loading a file based on its extension. It supports various file formats, including JSON, CSV, Pickle, Feather, and Parquet. Example ------- .. code-block:: python import pandas as pd from poses import get_format # Determine the format function for a JSON file load_function = get_format('path/to/data.json') # Use the function to load the data df = load_function('path/to/data.json') Notes ----- - Raises a KeyError if the file format is not supported. - Ensures that the appropriate pandas function is returned based on the file extension. """ loading_function_dict = { "json": pd.read_json, "csv": pd.read_csv, "pkl": pd.read_pickle, "pickle": pd.read_pickle } return loading_function_dict[path.split(".")[-1].lower()]
[docs] def load_poses(poses_path: str) -> Poses: """ Loads poses from a specified file and returns a Poses instance. Parameters ---------- poses_path : str The path to the file containing the poses to be loaded. Returns ------- Poses A Poses instance with poses loaded from the specified file. Further Details --------------- This function reads a file containing poses and returns a Poses instance with the data. The file format is automatically detected based on the file extension, and the corresponding loading function is used to read the data into a DataFrame. Example ------- .. code-block:: python from poses import Poses, load_poses # Load poses from a file poses_instance = load_poses('path/to/poses.json') Notes ----- - The function supports various file formats, including JSON, CSV, Pickle, Feather, and Parquet. - Ensures that the loaded DataFrame contains the necessary columns and updates the Poses instance accordingly. """ return Poses().load_poses(poses_path)
[docs] def col_in_df(df: pd.DataFrame, column: str|list[str]) -> None: """ Checks if the specified column(s) exist in the DataFrame. Parameters ---------- df : pd.DataFrame The DataFrame to be checked. column : str or list[str] The column name or list of column names to check for existence in the DataFrame. Raises ------ KeyError If any of the specified columns are not found in the DataFrame. Further Details --------------- This function checks whether the specified column or list of columns exist in the given DataFrame. It is useful for ensuring that the DataFrame contains the necessary columns before performing further operations. Example ------- .. code-block:: python import pandas as pd from poses import col_in_df # Create a sample DataFrame df = pd.DataFrame({ 'col1': [1, 2, 3], 'col2': [4, 5, 6] }) # Check if a column exists col_in_df(df, 'col1') # Check if multiple columns exist col_in_df(df, ['col1', 'col2']) Notes ----- - The function raises a KeyError if any of the specified columns are not found in the DataFrame. - Ensures that the DataFrame contains the necessary columns for subsequent operations. """ if isinstance(column, list): for col in column: col_in_df(df, col) else: if not column in df.columns: raise KeyError(f"Could not find {column} in poses dataframe! Are you sure you provided the right column name?")
[docs] def filter_dataframe_by_rank(df: pd.DataFrame, col: str, n: float|int, group_col: str = None, remove_layers: int = None, layer_col: str = "poses_description", sep: str = "_", ascending: bool = True) -> pd.DataFrame: """ Filters the DataFrame to retain only the top-ranked rows based on a specified column. Parameters ---------- df : pd.DataFrame The DataFrame to be filtered. col : str The column in the DataFrame used for ranking. n : Union[float, int] The number of top-ranked rows to retain. If n < 1, it represents a fraction of the total rows. group_col : str, optional Group dataframe by this column, then filter individual groups. remove_layers : int, optional The number of layers to remove from the column values before ranking. This helps in grouping similar rows. layer_col : str, optional The column used for layer-based grouping of rows (default is "poses_description"). sep : str, optional The separator used in the layer descriptions (default is "_"). ascending : bool, optional If True, ranks rows in ascending order; otherwise, in descending order (default is True). Returns ------- pd.DataFrame The filtered DataFrame containing only the top-ranked rows. Further Details --------------- This function filters the DataFrame to retain only the top-ranked rows based on the values in a specified column. It supports fractional ranking, layer-based grouping, and sorting in ascending or descending order. The function also allows for removing layers from column values before ranking to handle grouped data. Example ------- .. code-block:: python import pandas as pd from poses import filter_dataframe_by_rank # Create a sample DataFrame data = { 'poses_description': ['pose1', 'pose2', 'pose3', 'pose4', 'pose5'], 'score': [10, 20, 30, 40, 50] } df = pd.DataFrame(data) # Filter the DataFrame to retain the top 3 rows based on the score column filtered_df = filter_dataframe_by_rank(df, col='score', n=3) Notes ----- - The function raises a KeyError if the specified column is not found in the DataFrame. - Ensures that the DataFrame is properly sorted and filtered based on the provided parameters. """ def determine_filter_n(df: pd.DataFrame, n: float) -> int: ''' determines if n is a fraction or an integer and sets cutoff for dataframe filtering accordingly. ''' filter_n = float(n) if filter_n <= 0: raise ValueError(f"ERROR: Argument <n> of filter functions cannot be smaller than 0. It has to be positive number. If n < 1, the top n fraction is taken from the DataFrame. if n > 1, the top n rows are taken from the DataFrame") elif filter_n < 1: filter_n = round(len(df) * filter_n) return int(filter_n) def filter_groups(df: pd.DataFrame, col: str, n: float, group_col: str, ascending: bool) -> pd.DataFrame: ''' groups a dataframe, then filters individual groups. ''' # check if group col exists in df col_in_df(df, group_col) # group by group column, filter top n rows per group filtered = [] for _, group_df in df.groupby(group_col, sort=False): filtered.append(group_df.sort_values(by=col, ascending=ascending).head(determine_filter_n(group_df, n))) return pd.concat(filtered).reset_index(drop=True) # make sure <col> exists columns in <df> col_in_df(df, col) if group_col and remove_layers: raise KeyError("<group_col> and <remove_layers> are mutually exclusive!") # if group_col is set, group dataframe and filter individual groups if group_col: filtered_df = filter_groups(df=df, col=col, n=n, group_col=group_col, ascending=ascending) # if remove_layers is set, compile list of unique pose descriptions after removing one index layer: elif remove_layers: if not isinstance(remove_layers, int): raise TypeError(f"ERROR: only value of type 'int' allowed for remove_layers. You set it to {type(remove_layers)}") # make sure <layer_col> exists in df col_in_df(df, layer_col) # create temporary description column with removed index layers df["tmp_layer_column"] = df[layer_col].str.split(sep).str[:-1*int(remove_layers)].str.join(sep) # group by temporary description column, filter top n rows per group filtered_df = filter_groups(df=df, col=col, n=n, group_col="tmp_layer_column", ascending=ascending) #drop temporary description column filtered_df.drop("tmp_layer_column", axis=1, inplace=True) else: filtered_df = df.sort_values(by=col, ascending=ascending).head(determine_filter_n(df, n)) return filtered_df
[docs] def filter_dataframe_by_value(df: pd.DataFrame, col: str, value: float|int, operator: str) -> pd.DataFrame: """ Filters the DataFrame based on a specified value in a column using the provided comparison operator. Parameters ---------- df : pd.DataFrame The DataFrame to be filtered. col : str The column in the DataFrame used for filtering. value : Union[float, int] The value used as the threshold for filtering rows. operator : str The comparison operator used for filtering ('>', '>=', '<', '<=', '=', '!='). Returns ------- pd.DataFrame The filtered DataFrame containing only the rows that meet the filtering criteria. Further Details --------------- This function filters the DataFrame based on a specified value in a column, using the provided comparison operator. It supports various comparison operators such as greater than, less than, equal to, and not equal to. Example ------- .. code-block:: python import pandas as pd from poses import filter_dataframe_by_value # Create a sample DataFrame data = { 'poses_description': ['pose1', 'pose2', 'pose3', 'pose4', 'pose5'], 'score': [10, 20, 30, 40, 50] } df = pd.DataFrame(data) # Filter the DataFrame to retain rows where the score is greater than 30 filtered_df = filter_dataframe_by_value(df, col='score', value=30, operator='>') Notes ----- - The function raises a KeyError if the specified column is not found in the DataFrame. - Ensures that the DataFrame is properly filtered based on the provided criteria. """ # make sure <col> exists columns in <df> col_in_df(df, col) # Define the comparison based on the operator if operator == '>': filtered_df = df[df[col] > value] elif operator == '>=': filtered_df = df[df[col] >= value] elif operator == '<': filtered_df = df[df[col] < value] elif operator == '<=': filtered_df = df[df[col] <= value] elif operator == '=': filtered_df = df[df[col] == value] elif operator == '!=': filtered_df = df[df[col] != value] else: raise KeyError("Invalid operator. Supported operators are '>','>=', '<', '<=', '=', '!='.") return filtered_df
[docs] def description_from_path(path: str) -> str: '''Extracts "description" from a pose path.''' return os.path.splitext(os.path.basename(path))[0]
[docs] def class_in_df(df: pd.DataFrame, cls, out_col:str) -> pd.DataFrame: """ Return a copy of ``df`` with a column listing, for each row, the names of columns whose values are instances of a given class (or classes). If **no** cells in the DataFrame match ``cls``, the function returns a copy of ``df`` **without** adding ``out_col``. Empty DataFrames are returned unchanged. Elementwise checks use :meth:`pandas.DataFrame.map` (pandas ≥ 2.2). Parameters ---------- df : pandas.DataFrame Input DataFrame to inspect. cls : type or tuple[type, ...] Class (or tuple of classes) to test against, as in :func:`isinstance`. Examples: ``dict`` or ``(dict, list)``. out_col : str Name of the output column to add. Each entry will be a ``list[str]`` of column names whose values in that row are instances of ``cls``. The column is only created if at least one match exists anywhere in ``df``. Returns ------- pandas.DataFrame A copy of ``df``. If any matches are found, the copy contains an added column ``out_col`` with per-row lists of matching column names. If no matches are found (or ``df`` is empty), the copy is returned unchanged. Notes ----- - This function does **not** mutate ``df``; it returns a modified copy. - ``cls`` behaves exactly like the second argument to :func:`isinstance`. - To convert the list results to a delimiter-separated string, you can post-process with: ``out[out_col] = out[out_col].apply('|'.join)``. Examples -------- .. code-block:: python import pandas as pd df = pd.DataFrame({ 'a': [1, {'x': 1}, 3], 'b': [{'y': 2}, 5, [1, 2]], 'c': ['hi', 'there', 'world'], }) check_cols_for_class(df, dict, 'resselector_cols') """ check_df = df.copy() if check_df.empty: # prevents crash return check_df # check for target class mask = check_df.map(lambda x: isinstance(x, cls)) # if no rows with target class are found, return original dataframe if not mask.to_numpy().any(): return check_df # write output col with col names containing target class check_df[out_col] = mask.apply(lambda row: row.index[row].tolist(), axis=1) return check_df