"""
General Utility Functions for ProtFlow
This module provides a collection of general utility functions designed to support various
operations within the ProtFlow package. These utilities include functions for parsing data files,
calculating molecular interactions, and other common tasks needed in bioinformatics and structural
biology workflows.
Examples:
Here is an example of how to use the `parse_fasta_to_dict` function:
.. code-block:: python
# Parse a FASTA file
fasta_dict = parse_fasta_to_dict('example.fasta')
for desc, seq in fasta_dict.items():
print(f"{desc}: {seq}")
This module is designed to provide essential utilities for common tasks encountered in
bioinformatics and structural biology, facilitating the development of more complex workflows
within the ProtFlow package.
Authors
-------
Markus Braun, Adrian Tripp
"""
# imports
import os
import pandas as pd
[docs]
def parse_fasta_to_dict(fasta_path: str, encoding:str="UTF-8") -> dict[str:str]:
'''
Parses a FASTA file, converting it into a dictionary mapping sequence descriptions to sequences.
This function opens and reads a FASTA file from the given path, then parses the contents
to create a dictionary. Each entry in the FASTA file should start with a '>' character,
followed by the description line. The subsequent lines until the next '>' character are
considered as the sequence associated with that description. The sequence is concatenated
into a single string if it spans multiple lines.
Parameters
----------
fasta_path : str
The file path to the FASTA file that needs to be parsed. The path should be a valid
path to a file that exists and is readable. If the file cannot be found or opened,
a `FileNotFoundError` will be raised.
encoding : str, optional
The character encoding of the FASTA file. This is useful for files that might have
been created in non-UTF-8 encoding. Defaults to "UTF-8".
Returns
-------
dict[str, str]
A dictionary where the keys are the descriptions of sequences (without the '>' character),
and the values are the sequences themselves. Sequences that span multiple lines in the
FASTA file are concatenated into a single string.
Examples
--------
Assuming we have a FASTA file `example.fasta` with the following content:
>seq1
AGTCAGTC
>seq2
GTCAACGT
Parsing this file:
>>> fasta_dict = parse_fasta_to_dict('example.fasta')
>>> fasta_dict['seq1']
'AGTCAGTC'
>>> fasta_dict['seq2']
'GTCAACGT'
'''
with open(fasta_path, 'r', encoding=encoding) as f:
fastas = f.read()
# split along > (separator)
raw_fasta_list = [x.strip().split("\n") for x in fastas.split(">") if x]
# parse into dictionary {description: sequence}
fasta_dict = {x[0]: "".join(x[1:]) for x in raw_fasta_list if len(x) > 1}
return fasta_dict
[docs]
def sequence_dict_to_fasta(seq_dict: dict, out_path: str, combined_filename: str = None) -> None:
'''Writes protein sequences stored into seq_dict {'description': seq, ...} to .fa files. If combined_filename is specified, all sequences will be written into one file.'''
# make sure out_path exists
os.makedirs(out_path, exist_ok=True)
# if combined_filename is specified, write everything into one .fa file.
if combined_filename:
with open(f"{out_path}/{combined_filename}", 'w', encoding="UTF-8") as f:
f.write("\n".join([f">{desc}\n{seq}" for desc, seq in seq_dict.items()]) + "\n")
return
# otherwise, write every sequence into its own .fa file, named after the 'description' (will also be put next to >)
for description, seq in seq_dict.items():
with open(f"{out_path}/{description}.fa", 'w', encoding="UTF-8") as f:
f.write(f">{description}\n{seq}\n")
[docs]
def vdw_radii() -> dict[str:float]:
'''
from https://en.wikipedia.org/wiki/Atomic_radii_of_the_elements_(data_page), accessed 30.1.2023
'''
vdw_radii = {
"h":1.2,
"he":1.4,
"li":1.82,
"be":1.53,
"b":1.92,
"c":1.7,
"n":1.55,
"o":1.52,
"f":1.47,
"ne":1.54,
"na":2.27,
"mg":1.73,
"al":1.84,
"si":2.1,
"p":1.8,
"s":1.8,
"cl":1.75,
"ar":0.71,
"k":2.75,
"ca":2.31,
"sc":2.11,
"ti":None,
"v":0, # set to zero because Rosetta uses V as virtual atoms
"cr":None,
"mn":None,
"fe":2.44,
"co":None,
"ni":1.63,
"cu":1.4,
"zn":1.39,
"ga":1.87,
"ge":2.11,
"as":1.85,
"se":1.9,
"br":1.85,
"kr":2.02,
"rb":3.03,
"sr":2.49,
"y":None,
"zr":None,
"nb":None,
"mo":2.45,
"tc":None,
"ru":1.46,
"rh":None,
"pd":1.63,
"ag":1.72,
"cd":1.58,
"in":1.93,
"sn":2.17,
"sb":2.06,
"te":2.06,
"i":1.98,
"xe":2.16,
"cs":3.43,
"ba":2.68,
"la":None,
"ce":None,
"pr":None,
"nd":None,
"pm":None,
"sm":None,
"eu":None,
"gd":None,
"tb":None,
"dy":None,
"ho":None,
"er":None,
"tm":None,
"yb":None,
"lu":None,
"hf":None,
"ta":None,
"w":None,
"re":None,
"os":None,
"ir":None,
"pt":1.75,
"au":1.66,
"hg":1.55,
"tl":1.96,
"pb":2.02,
"bi":2.07,
"po":1.97,
"at":2.02,
"rn":2.2,
"fr":3.48,
"ra":2.83,
"ac":None
}
return vdw_radii
def _mutually_exclusive(opt_a, name_a: str, opt_b, name_b: str, none_ok: bool = False):
if opt_a and opt_b:
raise ValueError(f"Paramters '{name_a}' and '{name_b}' are mutually exclusive. Specify either one of them, but not both.")
if not (opt_a or opt_b or none_ok):
raise ValueError(f"At least one of parameters {name_a} or {name_b} must be set.")
[docs]
def add_group_statistics(df: pd.DataFrame, group_col: str, prefix: str, statistics: list = ('min', 'mean', 'median', 'max', 'std')) -> pd.DataFrame:
"""
Add group-based statistical features to the DataFrame for numeric columns only.
This function groups the DataFrame by the specified ``group_col`` and computes
the specified statistics (default: ``min``, ``mean``, ``median``, ``max``, ``std``)
for all numeric columns whose names start with the given ``prefix``. The
computed statistics exclude NaN values and are merged back into the original
DataFrame, with new columns named ``<original_column>_<statistic>``.
Parameters
----------
df : pandas.DataFrame
The input DataFrame to process.
group_col : str
The name of the column to group by.
prefix : str
Only numeric columns whose names start with this prefix will be considered.
statistics : list of str, optional
Statistical functions to apply. Supported values include
``min``, ``mean``, ``median``, ``max``, ``std``, ``sum``, ``count``.
Defaults to [``min``, ``mean``, ``median``, ``max``, ``std``].
Returns
-------
pandas.DataFrame
A new DataFrame containing the original data plus one new column per
statistic for each selected column. New columns use the format
``<original_column>_<statistic>``.
Raises
------
ValueError
If ``group_col`` is not in ``df``.
ValueError
If no numeric columns match ``prefix``.
ValueError
If any entry of ``statistics`` is not supported by pandas.
Examples
--------
>>> import pandas as pd
>>> data = {
... "group_col": ["A", "A", "B", "B", "B"],
... "start_str1": [10, 20, 30, 40, 50],
... "start_str2": [5, 15, 25, 35, 45],
... "start_str3": ["x", "y", "z", "w", "v"],
... "other_col": [100, 200, 300, 400, 500],
... }
>>> df = pd.DataFrame(data)
>>> result = add_group_statistics(
... df,
... group_col="group_col",
... prefix="start_str",
... statistics=["min", "mean", "max"]
... )
>>> print(result)
group_col start_str1 start_str2 other_col start_str1_min start_str1_mean start_str1_max start_str2_min start_str2_mean start_str2_max
0 A 10 5 100 10 15.0 20 5 10.0 15
1 A 20 15 200 10 15.0 20 5 10.0 15
2 B 30 25 300 30 40.0 50 25 35.0 45
3 B 40 35 400 30 40.0 50 25 35.0 45
4 B 50 45 500 30 40.0 50 25 35.0 45
Notes
-----
- Only numeric columns beginning with ``prefix`` are included; others are ignored.
- NaN values are dropped before computing each statistic.
- If any new column name collides with an existing one, it will overwrite it.
"""
# Check if group_col exists in the DataFrame
if group_col not in df.columns:
raise ValueError(f"The group_col '{group_col}' does not exist in the DataFrame.")
# Select columns that start with the given prefix and are numeric
cols_with_prefix = [col for col in df.columns if col.startswith(prefix)]
numeric_cols = df[cols_with_prefix].select_dtypes(include='number').columns.tolist()
if not numeric_cols:
raise ValueError(f"No numeric columns start with the prefix '{prefix}'.")
# Verify that all specified statistics are supported by pandas
supported_stats = {'min', 'mean', 'median', 'max', 'std', 'sum', 'count', 'var', 'prod', 'size'}
if not set(statistics).issubset(supported_stats):
unsupported = set(statistics) - supported_stats
raise ValueError(f"Unsupported statistics provided: {unsupported}. "
f"Supported statistics are: {supported_stats}")
# Group by the group_col and compute the statistics, excluding NaN values
grouped = df.groupby(group_col)[numeric_cols].agg(statistics)
# Flatten the MultiIndex columns
grouped.columns = [f"{col}_{stat}" for col, stat in grouped.columns]
# Reset index to prepare for merging
grouped = grouped.reset_index()
# Merge the aggregated statistics back to the original DataFrame
df_merged = df.merge(grouped, on=group_col, how='left', suffixes=('', '_group'))
return df_merged