Module epiclass.utils.notebooks.paper.paper_utilities

Utility functions for the paper notebooks.

Functions

def add_second_highest_prediction(df: pd.DataFrame, pred_cols: List[str]) ‑> pandas.core.frame.DataFrame

Return the DataFrame with a columns for the second highest prediction class.

Adds columns: - '2nd pred class': The class with the second highest prediction. - '1rst/2nd prob diff': The difference between the highest and second highest prediction probabilities. - '1rst/2nd prob ratio': The ratio of the highest to second highest prediction probabilities.

def check_label_coherence(dataframe, categories, column_templates: Dict[str, str] | None = None)

Check that the predicted and true labels for each category are consistent.

Args

dataframe
DataFrame containing the predictions and true classes.
categories
List of categories to check.
column_templates
Dictionary mapping column types to template strings. Requires "True", "Predicted" If None, uses default templates.

Raises ValueError if the predicted and true labels are not consistent.

def create_mislabel_corrector(paper_dir: Path) ‑> Tuple[Dict[str, str], Dict[str, Dict[str, str]]]

Obtain information necessary to correct sex and life_stage mislabels.

Returns: - Dict[str, str]: {md5sum: EpiRR_no-v} - Dict[str, Dict[str, str]]: {label_category: {EpiRR_no-v: corrected_label}}

def display_perc(df: pd.DataFrame | pd.Series)

Display a DataFrame with percentages.

def extract_data_from_files(parent_folder: Path, file_pattern: str, search_line: str, extract_pattern: str, type_cast: type = builtins.int, unique: bool = True) ‑> Dict[str, Set[Any]]

Extracts data from files matching a specific pattern within each directory of a parent folder.

Args

parent_folder : Path
The directory containing subdirectories to search.
file_pattern : str
Glob pattern for files to search within each subdirectory.
search_line : str
Line identifier to search for data extraction.
extract_pattern : str
Regex pattern to extract the desired data from the identified line.
type_cast : type
Type to cast the extracted data to.

Returns

Dict[str, Set[type]]
A dictionary with subdirectory names as keys and sets of extracted data as values.
def extract_experiment_keys_from_output_files(parent_folder: Path) ‑> Dict[str, Set[str]]

Extracts experiment keys from output (.o) files.

def extract_input_sizes_from_output_files(parent_folder: Path) ‑> Dict[str, Set[int]]

Extracts model input sizes from output (.o) files.

def extract_node_jobs_from_error_files(parent_folder: Path) ‑> Dict[str, Set[int]]

Extracts SLURM job IDs from error (.e) files.

def filter_biomat_LS(df: pd.DataFrame, biomaterial_cat_name: str, col_templates: Dict[str, str], predScore_threshold: float = 0.8, verbose: bool = True) ‑> pandas.core.frame.DataFrame

Filter biomaterials for to enable more life stage predictions.

Remove samples expected to be from cell lines.

For unknown biomat, retain samples where biomat predictions are not cell line with predScore > predScore_threshold.

Args

df
DataFrame with biomaterial type predictions.
biomaterial_cat_name
metadata category label (e.g. 'biomaterial_type')
col_templates
Column templates for 'true_col', 'pred_col', and 'max_pred' columns.
predScore_threshold
threshold for minimum prediction score
verbose
Print intermediate results
def find_target_recall(df: pd.DataFrame, category_name: str, col_templates: Dict[str, str], class_of_interest: str, target_recall: float = 0.9, verbose: bool = False, iterations: int = 250, minimum_threshold: float = 0.5)

Find the first threshold such that recall ≥ target_recall among predictions with prediction score ≥ threshold.

Args

df
DataFrame
category_name
metadata category (e.g. 'biomaterial_type')
col_templates
Column templates for 'true_col', 'pred_col', and 'max_pred' columns.
class_of_interest
Label of class within category
target_recall
The target recall value
verbose
Print intermediate results
iterations
Number of thresholds to try in between minimum_threshold and 1

Returns

threshold, recall

def format_labels(df: pd.DataFrame, columns: List[str]) ‑> pandas.core.frame.DataFrame

For given column label, format the labels by replacing spaces by underscores, and converting to lowercase.

def merge_life_stages(df: pd.DataFrame, lifestage_column_name: str = 'harmonized_donor_life_stage', column_name_templates: List[str] | None = None, exact_replace: bool = False, verbose: bool = True)

Merge perinatal stages into one category, for given column names.

New columns for f"{LIFE_STAGE}_merged" will be added for each column name template.

Args

df : pd.DataFrame
DataFrame to merge columns in.
lifestage_column_name : str
Name of the life stage category.
column_name_templates (List[str] | None): List of column name templates to merge.
ex: ["{}", "True class ({})", "Predicted class ({})"]
exact_replace : bool
If True, raise error if column not found or replacement already exists.
verbose : bool
If True, print the column names that are being merged or skipped.

Returns

pd.DataFrame
DataFrame with merged columns.

Raises

KeyError
If exact_replace=True and column operations fail.
ValueError
If remapping fails due to unmapped values.
def merge_similar_assays(df: pd.DataFrame) ‑> pandas.core.frame.DataFrame

Attempt to merge rna-seq/wgbs categories, included prediction score.

Expects the dataframe to have the following columns: - rna_seq - mrna_seq - wgbs-standard - wgbs-pbat

The output dataframe will have the following columns: - rna_seq - wgbs

The dataframe is modified in place.

Expecteds "True class" or "Expected class", and "Predicted class" to be present.

A ValueError is raised if the columns are not present.

def print_column_content(df: pd.DataFrame, col: str) ‑> None

Print absolute and relative count of a string column.

def rename_columns(df, remapper: Dict[str, str], exact_match: bool = False, verbose: bool = False) ‑> pandas.core.frame.DataFrame

Rename columns in a DataFrame using a dictionary of old to new column names.

If exact_match is True, the columns will be renamed exactly as specified in the dictionary. If exact_match is False, the columns will be renamed using a regular expression to replace the all occurrences of a string to a new string.

def set_file_id(df: pd.DataFrame, input_col: str = 'Unnamed: 0', output_col: str = 'md5sum') ‑> pandas.core.frame.DataFrame

Standardizes a filename column by extracting the prefix and ensuring it is the first column.

This function renames a given column (input_col) by extracting the prefix (before _) and moves it to the first position in the DataFrame. If input_col and output_col are the same, the column is updated in place and repositioned.

Handled filename cases: - recount3: sra.base_sums.SRP076599_SRR3669968.ALL_[resolution][filters].hdf5 -> SRR3669968 - ENCODE/ChIP-Atlas: [file_db_accession][resolution][filters].hdf5 -> [file_db_accession] - EpiATLAS: [md5sum][resolution]_[filters].hdf5 -> [md5sum]

Args

df : pd.DataFrame
The input DataFrame.
input_col : str, optional
The name of the column to process. Defaults to "Unnamed: 0".
output_col : str, optional
The name of the resulting column. Defaults to "md5sum".

Returns

pd.DataFrame
A modified DataFrame with the processed column as the first column.

Raises

KeyError
If input_col is not found in the DataFrame.

Classes

class IHECColorMap (base_fig_dir: Path)

Class to handle IHEC color map.

Expand source code
class IHECColorMap:
    """Class to handle IHEC color map."""

    def __init__(self, base_fig_dir: Path):
        self.base_fig_dir = base_fig_dir
        self.ihec_colormap_name = "IHEC_EpiATLAS_IA_colors_Apl01_2024.json"
        self.ihec_color_map = self.get_IHEC_color_map(
            base_fig_dir, self.ihec_colormap_name
        )
        self.assay_color_map = self.create_assay_color_map()
        self.cell_type_color_map = self.create_cell_type_color_map()
        self.sex_color_map = self.create_sex_color_map()

    @staticmethod
    def get_IHEC_color_map(folder: Path, name: str) -> List[Dict]:
        """Get the IHEC color map."""
        color_map_path = folder / name
        with open(color_map_path, "r", encoding="utf8") as color_map_file:
            ihec_color_map = json.load(color_map_file)
        return ihec_color_map

    def _create_color_map(self, label_category: str) -> Dict[str, str]:
        """Create a rbg color map from IHEC rgb strings"""
        color_dict = [elem for elem in self.ihec_color_map if label_category in elem][0][
            label_category
        ][0]
        for name, color in list(color_dict.items()):
            rbg = color.split(",")
            color_dict[name] = f"rgb({rbg[0]},{rbg[1]},{rbg[2]})"
            color_dict[name.lower()] = color_dict[name]
            color_dict[name.lower().replace("-", "_")] = color_dict[name]
        return color_dict

    def create_sex_color_map(self) -> Dict[str, str]:
        """Create a rbg color map for ihec sex label category."""
        color_dict = self._create_color_map(SEX)
        return color_dict

    def create_assay_color_map(self) -> Dict[str, str]:
        """Create a rbg color map for ihec assays."""
        category_label = "experiment"
        color_dict = self._create_color_map(category_label)
        color_dict["mrna_seq"] = color_dict["rna_seq"]
        for assay in ["wgbs-pbat", "wgbs-standard"]:
            color_dict[assay] = color_dict["wgbs"]
            color_dict[assay.replace("-", "_")] = color_dict["wgbs"]
        return color_dict

    def create_cell_type_color_map(self) -> Dict[str, str]:
        """Create the rbg color map for ihec cell types."""
        return self._create_color_map(CELL_TYPE)

Static methods

def get_IHEC_color_map(folder: Path, name: str) ‑> List[Dict[~KT, ~VT]]

Get the IHEC color map.

Methods

def create_assay_color_map(self) ‑> Dict[str, str]

Create a rbg color map for ihec assays.

def create_cell_type_color_map(self) ‑> Dict[str, str]

Create the rbg color map for ihec cell types.

def create_sex_color_map(self) ‑> Dict[str, str]

Create a rbg color map for ihec sex label category.

class MetadataHandler (paper_dir: Path | str)

Class to handle Metadata objects.

Expand source code
class MetadataHandler:
    """Class to handle Metadata objects."""

    def __init__(self, paper_dir: Path | str):
        self.paper_dir = Path(paper_dir)
        self.data_dir = self.paper_dir / "data"

        self.version_names = {
            "v1": "hg38_2023-epiatlas_dfreeze_formatted_JR.json",
            "v2": "hg38_2023-epiatlas-dfreeze-pospurge-nodup_filterCtl.json",
            "v2-encode": "hg38_2023-epiatlas-dfreeze_v2.1_w_encode_noncore_2.json",
        }

    def load_metadata(self, version: str) -> Metadata:
        """Return metadata for a specific version.

        Args:
            version (str): The version of the metadata to load.
            One of 'v1', 'v2', 'v2-encode'.

        Example of epiRR unique to v1: IHECRE00003355.2
        """
        if version not in self.version_names:
            raise ValueError(f"Version must be one of {self.version_names.keys()}")

        metadata = Metadata(
            self.data_dir / "metadata" / "epiatlas" / self.version_names[version]
        )
        return metadata

    def load_any_metadata(
        self, path: Path | str, as_dataframe: bool = False
    ) -> Metadata | pd.DataFrame:
        """Return metadata for a specific file."""
        metadata = Metadata(Path(path))
        if as_dataframe:
            metadata = metadata.to_df()
        return metadata

    def load_metadata_df(self, version: str, merge_assays: bool = True) -> pd.DataFrame:
        """Load a metadata dataframe for a given version.

        merge_assays: Merge similar assays (rna 2x / wgb 2x)
        """
        metadata = self.load_metadata(version)
        metadata_df = self.metadata_to_df(metadata, merge_assays)
        return metadata_df

    @staticmethod
    def metadata_to_df(metadata: Metadata, merge_assays: bool = True) -> pd.DataFrame:
        """Convert the metadata to a dataframe.

        merge_assays: Merge similar assays (rna 2x / wgb 2x)
        """
        metadata_df = pd.DataFrame.from_records(list(metadata.datasets))
        metadata_df.set_index("md5sum", inplace=True)
        if merge_assays:
            metadata_df.replace({ASSAY: ASSAY_MERGE_DICT}, inplace=True)
        return metadata_df

    @staticmethod
    def join_metadata(df: pd.DataFrame, metadata: Metadata) -> pd.DataFrame:
        """Join the metadata to the results dataframe."""
        metadata_df = pd.DataFrame(metadata.datasets)
        metadata_df.set_index("md5sum", inplace=True)
        metadata_df["md5sum"] = metadata_df.index

        diff_set = set(df.index) - set(metadata_df.index)
        if diff_set:
            err_df = pd.DataFrame(diff_set, columns=["md5sum"])
            print(err_df.to_string(), file=sys.stderr)
            raise AssertionError(
                f"{len(diff_set)} md5sums in the results dataframe are not present in the metadata dataframe. Saved error md5sums to join_missing_md5sums.csv."
            )

        merged_df = df.merge(
            metadata_df,
            how="left",
            left_index=True,
            right_index=True,
            suffixes=(None, "_delete"),
        )
        if len(merged_df) != len(df):
            raise AssertionError(
                "Merged dataframe has different length than original dataframe"
            )
        to_drop = [col for col in merged_df.columns if "_delete" in col]
        merged_df.drop(columns=to_drop, inplace=True)

        return merged_df

    @staticmethod
    def uniformize_metadata_for_plotting(
        epiatlas_metadata: Metadata,
        ca_pred_df: pd.DataFrame | None = None,
        enc_pred_df: pd.DataFrame | None = None,
        recount3_metadata: pd.DataFrame | None = None,
    ) -> pd.DataFrame:
        """Simplify metadata with chip-atlas metadata for plotting."""
        columns_to_keep = ["id", ASSAY, "track_type", "source", "plot_label"]
        to_concat = []

        epiatlas_df = pd.DataFrame.from_records(list(epiatlas_metadata.datasets))
        epiatlas_df["source"] = ["epiatlas"] * len(epiatlas_df)
        epiatlas_df["id"] = epiatlas_df["md5sum"]
        if "plot_label" not in epiatlas_df.columns:
            epiatlas_df["plot_label"] = [None] * len(epiatlas_df)
        epiatlas_df = epiatlas_df[columns_to_keep]

        to_concat.append(epiatlas_df)

        if ca_pred_df is not None:
            ca_df = ca_pred_df.copy(deep=True)
            ca_df["source"] = ["C-A"] * len(ca_df)
            ca_df[ASSAY] = ca_df["manual_target_consensus"].str.lower()
            ca_df["track_type"] = ["raw"] * len(ca_df)
            ca_df["id"] = ca_df["Experimental-id"]
            if "plot_label" not in ca_df.columns:
                ca_df["plot_label"] = [None] * len(ca_df)

            ca_df = ca_df[columns_to_keep]
            to_concat.append(ca_df)

        if enc_pred_df is not None:
            enc_df = enc_pred_df.copy(deep=True)
            enc_df["source"] = ["encode"] * len(enc_df)
            enc_df[ASSAY] = enc_df[ASSAY].str.lower()
            enc_df["track_type"] = ["pval"] * len(enc_df)
            enc_df["id"] = enc_df["FILE_accession"]
            if "plot_label" not in enc_df.columns:
                enc_df["plot_label"] = [None] * len(enc_df)

            enc_df = enc_df[columns_to_keep]
            to_concat.append(enc_df)

        if recount3_metadata is not None:
            recount3_df = recount3_metadata.copy(deep=True)
            recount3_df["source"] = ["recount3"] * len(recount3_df)
            recount3_df[ASSAY] = recount3_df["harmonized_assay"].str.lower()
            recount3_df["track_type"] = ["unique_raw"] * len(recount3_df)
            recount3_df["id"] = recount3_df["ID"]
            if "plot_label" not in recount3_df.columns:
                recount3_df["plot_label"] = [None] * len(recount3_df)

            recount3_df = recount3_df[columns_to_keep]
            to_concat.append(recount3_df)

        return pd.concat(to_concat)

Static methods

def join_metadata(df: pd.DataFrame, metadata: Metadata) ‑> pandas.core.frame.DataFrame

Join the metadata to the results dataframe.

def metadata_to_df(metadata: Metadata, merge_assays: bool = True) ‑> pandas.core.frame.DataFrame

Convert the metadata to a dataframe.

merge_assays: Merge similar assays (rna 2x / wgb 2x)

def uniformize_metadata_for_plotting(epiatlas_metadata: Metadata, ca_pred_df: pd.DataFrame | None = None, enc_pred_df: pd.DataFrame | None = None, recount3_metadata: pd.DataFrame | None = None)

Simplify metadata with chip-atlas metadata for plotting.

Methods

def load_any_metadata(self, path: Path | str, as_dataframe: bool = False)

Return metadata for a specific file.

def load_metadata(self, version: str) ‑> Metadata

Return metadata for a specific version.

Args

version : str
The version of the metadata to load.

One of 'v1', 'v2', 'v2-encode'. Example of epiRR unique to v1: IHECRE00003355.2

def load_metadata_df(self, version: str, merge_assays: bool = True) ‑> pandas.core.frame.DataFrame

Load a metadata dataframe for a given version.

merge_assays: Merge similar assays (rna 2x / wgb 2x)

class SplitResultsHandler

Class to handle split results.

Expand source code
class SplitResultsHandler:
    """Class to handle split results."""

    @staticmethod
    def add_max_pred(
        df: pd.DataFrame,
        target_label: str = "True class",
        expected_classes: List[str] | None = None,
    ) -> pd.DataFrame:
        """Add the max prediction column ("Max pred") to the results dataframe.

        The dataframe needs to not contain extra metadata columns.
        target_label: Column to ascertain output classes columns
        """
        if "Max pred" not in df.columns:
            df = df.copy(deep=True)
            classes_test = (
                df[target_label].astype(str).unique().tolist()
                + df["Predicted class"].astype(str).unique().tolist()
            )
            classes_test = list(set(classes_test))

            if expected_classes:
                classes_test = expected_classes

            classes = list(df.columns[2:])
            if any(label in classes for label in ["TRUE", "FALSE"]):
                print(
                    "WARNING: Found TRUE or FALSE in pred vector columns. Changing column names."
                )
                df.rename(columns={"TRUE": "True", "FALSE": "False"}, inplace=True)
                classes = list(df.columns[2:])

            for col in ["md5sum", "split", "Same?", "Predicted class", "True class"]:
                try:
                    classes.remove(col)
                except ValueError:
                    pass
            for class_label in classes:
                if class_label not in classes_test:
                    raise ValueError(
                        f"""Dataframe contains extra metadata columns, cannot ascertain classes: {classes}.
                        Column '{class_label}' is not in '{classes_test}'"""
                    )
            df["Max pred"] = df[classes].max(axis=1)
        return df

    @staticmethod
    def verify_md5_consistency(dfs: Dict[str, pd.DataFrame]) -> None:
        """Verify that all dataframes have the same md5sums.

        Used to compare the md5sums of the same split across different classifiers methods.

        Args:
            dfs (Dict[str, pd.DataFrame]): {classifier_name: results_df}
                Results dataframes need to have md5sums as index.
        """
        md5s = {}
        for key, df in dfs.items():
            md5s[key] = set(df.index)

        first_key = list(dfs.keys())[0]
        base_md5s = set(dfs[first_key].index)
        if not base_md5s.intersection(*list(md5s.values())) == base_md5s:
            raise AssertionError("Not all dataframes have the same md5sums")

    @staticmethod
    def compute_acc_per_assay(
        split_results: Dict[str, pd.DataFrame], metadata_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Compute accuracy per assay for each split.

        Args:
        - split_results: {split_name: results_df}.
        - metadata_df: The metadata dataframe.
        """

        assay_acc = defaultdict(dict)
        for split_name, split_result_df in split_results.items():
            # Merge metadata
            split_result_df = split_result_df.merge(
                metadata_df, left_index=True, right_index=True
            )

            # Compute accuracy per assay
            assay_groupby = split_result_df.groupby(ASSAY)
            for assay, assay_df in assay_groupby:
                assay_acc[assay][split_name] = np.mean(
                    assay_df["True class"].astype(str).str.lower()
                    == assay_df["Predicted class"].astype(str).str.lower()
                )

        return pd.DataFrame(assay_acc)

    @staticmethod
    def gather_split_results_across_methods(
        results_dir: Path, label_category: str, only_NN: bool = False
    ) -> Dict[str, Dict[str, pd.DataFrame]]:
        """Gather split results for each classifier type.

        Args:
            results_dir: The directory containing the results. Child directories should be task/classifer names.
            label_category: The label category for the results.
            only_NN: A boolean flag to only gather the NN results.

        Returns:
            Dict[str, Dict[str, pd.DataFrame]]: {split_name:{classifier_name: results_df}}
        """
        base_NN_path = results_dir / f"{label_category}_1l_3000n"
        if label_category == ASSAY:
            NN_csv_path_template = base_NN_path / "11c"
            if not NN_csv_path_template.exists():
                NN_csv_path_template = base_NN_path
        elif label_category == CELL_TYPE:
            NN_csv_path_template = base_NN_path
        else:
            raise ValueError("Label category not supported.")

        NN_csv_path_template = str(
            NN_csv_path_template
            / "10fold-oversampling"
            / "{split}"
            / "validation_prediction.csv"
        )
        all_split_dfs = {}
        for split in [f"split{i}" for i in range(10)]:
            # Get the csv paths
            NN_csv_path = Path(NN_csv_path_template.format(split=split))  # type: ignore
            other_csv_root = results_dir / f"{label_category}" / "predict-10fold"

            if not only_NN:
                if not other_csv_root.exists():
                    raise FileNotFoundError(f"Could not find {other_csv_root}")
                other_csv_paths = other_csv_root.glob(
                    f"*/*_{split}_validation_prediction.csv"
                )

                other_csv_paths = list(other_csv_paths)
                if len(other_csv_paths) != 4:
                    raise AssertionError(
                        f"Expected 4 other_csv_paths, got {len(other_csv_paths)}"
                    )

            # Load the dataframes
            dfs: Dict[str, pd.DataFrame] = {}
            dfs["NN"] = pd.read_csv(NN_csv_path, header=0, index_col=0, low_memory=False)

            if not only_NN:
                for path in other_csv_paths:
                    category = path.name.split("_", maxsplit=1)[0]
                    dfs[category] = pd.read_csv(
                        path, header=0, index_col=0, low_memory=False
                    )

                # Verify md5sum consistency
                SplitResultsHandler.verify_md5_consistency(dfs)

            all_split_dfs[split] = dfs

        return all_split_dfs

    @staticmethod
    def read_split_results(parent_dir: Path) -> Dict[str, pd.DataFrame]:
        """Read split results from the given parent directory.

        Args:
            parent_dir: The parent directory containing the split results.

        Returns:
            Dict[str, pd.DataFrame]: {split_name: results_df}
        """
        csv_path_template = "split*/validation_prediction.csv"
        experiment_dict = {}
        for split_result_csv in parent_dir.glob(csv_path_template):
            split_name = split_result_csv.parent.name
            df = pd.read_csv(split_result_csv, header=0, index_col=0, low_memory=False)
            experiment_dict[split_name] = df
        return experiment_dict

    @staticmethod
    def gather_split_results_across_categories(
        parent_results_dir: Path, verbose: bool = False
    ) -> Dict[str, Dict[str, pd.DataFrame]]:
        """Gather NN split results for each classification task in the given folder children.

        Returns:
            Dict[str, Dict[str, pd.DataFrame]]: {general_name:{split_name: results_df}}
        """
        all_dfs = {}

        for category_dir in parent_results_dir.iterdir():
            if not category_dir.is_dir():
                continue
            for experiment_dir in category_dir.iterdir():
                if not experiment_dir.is_dir():
                    continue
                experiment_name = experiment_dir.name
                category_name = category_dir.name
                general_name = f"{category_name}_{experiment_name}"
                if verbose:
                    print(f"Reading {general_name} from {experiment_dir}")

                experiment_dict = SplitResultsHandler.read_split_results(experiment_dir)
                if verbose:
                    print(
                        f"Found {len(experiment_dict)} split results for {general_name}"
                    )

                all_dfs[general_name] = experiment_dict

        return all_dfs

    @staticmethod
    def concatenate_split_results(
        split_dfs: Dict[str, Dict[str, pd.DataFrame]] | Dict[str, pd.DataFrame],
        concat_first_level: bool = False,
        depth: int = 2,
    ) -> Dict[str, pd.DataFrame] | pd.DataFrame:
        """Concatenate split results for each different classifier or split name based on the order.

        This method supports two structures of input dictionaries:
        1. {split_name: {classifier_name: results_df}} when concat_first_level is False.
        2. {classifier_name: {split_name: results_df}} when concat_first_level is True.

        Args:
            split_dfs: A nested dictionary of DataFrames to be concatenated. The structure
                       depends on the concat_first_level flag.
            concat_first_level: A boolean flag that indicates the structure of the split_dfs dictionary.
                                If True, the first level is the classifier name. Otherwise, the first
                                level is the split name.
            depth: The depth of the dictionary structure. Must be 1 or 2.

        Returns:
            Dict[str, pd.DataFrame] : {classifier_name: concatenated_dataframe}

        Raises:
            AssertionError: If the index of any concatenated DataFrame is not of type str, indicating
                            an unexpected index format.
        """
        if depth not in [1, 2]:
            raise ValueError(f"Depth must be 1 or 2, got {depth}")

        # Handle basic case, code wasn't made for that, so a hack is done
        if depth == 1:
            to_concat = {"classifier": split_dfs}
            return SplitResultsHandler.concatenate_split_results(to_concat, concat_first_level=True, depth=2)["classifier"]  # type: ignore

        # Check for user error
        if depth == 2 and isinstance(next(iter(split_dfs.values())), pd.DataFrame):
            raise ValueError(
                "Depth given is two, but the input is not a nested dictionary."
            )

        if concat_first_level:
            # Reverse the nesting of the dictionary if concatenating by the first level.
            reversed_dfs = defaultdict(dict)
            for outer_key, inner_dict in split_dfs.items():
                for inner_key, df in inner_dict.items():
                    reversed_dfs[inner_key][outer_key] = df
            split_dfs = reversed_dfs

        to_concat_dfs = defaultdict(list)
        for split_name, dfs in split_dfs.items():
            for classifier, df in dfs.items():
                try:
                    df = df.assign(split=int(split_name.split("split")[-1]))
                except ValueError as e:
                    if "base 10" in str(e):
                        raise ValueError(
                            "Wrong concat_first_level value was probably used"
                        ) from e
                    raise e
                to_concat_dfs[classifier].append(df)

        concatenated_dfs = {
            classifier: pd.concat(dfs, axis=0)
            for classifier, dfs in to_concat_dfs.items()
        }

        # Verify index is still md5sum
        for df in concatenated_dfs.values():
            if not isinstance(df.index[0], str):
                raise AssertionError("Index is not md5sum")
            df["md5sum"] = df.index
            df.index.name = "md5sum"

        return concatenated_dfs

    @staticmethod
    def calculate_metrics_for_single_df(
        df: pd.DataFrame,
        task_name: str = "unknown task",
        logging_name: str = "df",
    ) -> Dict[str, float]:
        """Compute metrics for a single task's DataFrame.

        Expects a DataFrame with the following columns:
            md5sum, True class, predicted class, predicted probabilities

        Output is a dictionary of metrics with the following the following keys:
            Accuracy, F1_macro, AUC_micro, AUC_macro, count
        """
        df = df.copy()

        # Ensure 'True class' labels match (e.g. int or bool class labels)
        df["True class"] = df["True class"].astype(str).str.lower()
        df.columns = list(df.columns[:2]) + [
            label.lower() for i, label in enumerate(df.columns.str.lower()) if i > 1
        ]
        # One hot encode true class and get predicted probabilities
        # Reindex to ensure that the order of classes is consistent
        classes_order = df.columns[2:]
        onehot_true = (
            pd.get_dummies(df["True class"], dtype=int)
            .reindex(columns=classes_order, fill_value=0)
            .values
        )

        pred_probs = df[classes_order].values

        # Argmax encodings for accuracy/F1
        ravel_true = np.argmax(onehot_true, axis=1)
        ravel_pred = np.argmax(pred_probs, axis=1)

        # fmt: off
        # Core metrics
        task_metrics_dict: Dict[str, float|int] = {
            "Accuracy": accuracy_score(ravel_true, ravel_pred),
            "F1_macro": f1_score(ravel_true, ravel_pred, average="macro", zero_division="warn"), # type: ignore
            "count": len(df),
            }

        # --- AUC Handling ---
        unique_classes = np.unique(ravel_true)
        if unique_classes.shape[0] < 2:
            # Not enough classes ->- AUC undefined
            logging.warning(
                "Cannot compute ROC AUC: only one class present in %s for %s (%s)",
                logging_name,
                task_name,
                df["True class"].unique(),
            )
            task_metrics_dict.update({"AUC_micro": np.nan, "AUC_macro": np.nan})
            return task_metrics_dict

        # Try to compute AUC safely
        try:
            # Suppress sklearn warnings explicitly inside block
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
                task_metrics_dict.update(
                    {
                        "AUC_micro": roc_auc_score(
                            onehot_true, pred_probs, multi_class="ovr", average="micro" # type: ignore
                        ),
                        "AUC_macro": roc_auc_score(
                            onehot_true, pred_probs, multi_class="ovr", average="macro"
                        ),
                    }
                )
        except ValueError as err:
            if "multiclass" in str(err) or "Only one class" in str(err):
                logging.warning(
                    "Cannot compute ROC AUC for %s/%s: %s",
                    logging_name,
                    task_name,
                    err,
                )
                task_metrics_dict.update({"AUC_micro": np.nan, "AUC_macro": np.nan})
            else:
                err_msg = f"Unexpected AUC error in {logging_name} for {task_name}."
                logging.error(err_msg)
                logging.debug("columns: %s", df.columns)
                logging.debug("True class values: %s\n", df["True class"].value_counts())
                raise ValueError(err_msg) from err

        return task_metrics_dict

    @staticmethod
    def compute_split_metrics(
        all_split_dfs: Dict[str, Dict[str, pd.DataFrame]],
        concat_first_level: bool = False,
    ) -> Dict[str, Dict[str, Dict[str, float]]]:
        """Compute desired metrics for each split and classifier, accommodating different dictionary structures.

        This method supports two structures of input dictionaries:
        1. {split_name: {classifier_name: results_df}} when concat_first_level is False.
        2. {classifier_name: {split_name: results_df}} when concat_first_level is True.

        Args:
            all_split_dfs: A nested dictionary of DataFrames. The structure
                           depends on the concat_first_level flag. (Docstring corrected to use all_split_dfs)
            concat_first_level: A boolean flag that indicates the structure of the all_split_dfs dictionary.
                                If True, the first level is the classifier name. Otherwise, the first
                                level is the split name.

        Returns:
            A nested dictionary with metrics computed for each classifier and split. The structure is
            {split_name: {classifier_name: {metric_name: value}}}.
        """
        all_split_dfs = copy.deepcopy(all_split_dfs)
        split_metrics: Dict[str, Dict[str, Dict[str, float]]] = {}

        # Reorganize the dictionary to always work with {split_name: {classifier_name: DataFrame}}
        if concat_first_level:
            temp_dict: Dict[str, Dict[str, pd.DataFrame]] = {}
            for classifier, splits in all_split_dfs.items():
                for split_val, df_val in splits.items():
                    if split_val not in temp_dict:
                        temp_dict[split_val] = {}
                    temp_dict[split_val][classifier] = df_val

            all_split_dfs = temp_dict

        for split in [f"split{i}" for i in range(10)]:
            dfs = all_split_dfs[split]

            metrics: Dict[str, Dict[str, float]] = {}

            for task_name, df in dfs.items():
                metrics[
                    task_name
                ] = SplitResultsHandler().calculate_metrics_for_single_df(
                    df=df, task_name=task_name, logging_name=split
                )
                # --- END OF REFACTORED PART ---

            split_metrics[split] = metrics

        return split_metrics

    @staticmethod
    def invert_metrics_dict(
        metrics: Dict[str, Dict[str, Dict[str, float]]]
    ) -> Dict[str, Dict[str, Dict[str, float]]]:
        """Invert metrics dict so classifier name is parent key.
        Before: {split_name:{classifier_name:{metric_name:val}}}
        After: {classifier_name:{split_name:{metric_name:val}}}
        """
        # Check for correct input
        first_keys = list(metrics.keys())
        if not all("split" in key for key in first_keys):
            raise ValueError("Wrong input dict: first level keys don't contain 'split'.")

        # Invert
        new_metrics = defaultdict(dict)
        for split_name, split_metrics in metrics.items():
            for classifier_name, classifier_metrics in split_metrics.items():
                new_metrics[classifier_name][split_name] = classifier_metrics
        return dict(new_metrics)

    @staticmethod
    def extract_count_from_metrics(metrics) -> Dict[str, int]:
        """Extract total file count from metrics dict (sums on each split).

        Returns a dict {classifier_name: count}
        """
        new_metrics = copy.deepcopy(metrics)

        # Check for correct input
        first_keys = list(new_metrics.keys())
        if all("split" in key for key in first_keys):
            new_metrics = SplitResultsHandler.invert_metrics_dict(new_metrics)

        counts = defaultdict(int)
        for classifier_name, all_split_metrics in new_metrics.items():
            for _, split_metrics in all_split_metrics.items():
                counts[classifier_name] += split_metrics["count"]  # type: ignore

        return dict(counts)

    @staticmethod
    def general_split_metrics(
        results_dir: Path,
        merge_assays: bool,
        exclude_categories: List[str] | None = None,
        exclude_names: List[str] | None = None,
        include_categories: List[str] | None = None,
        include_names: List[str] | None = None,
        return_type: str = "both",
        mislabel_corrections: Tuple[Dict[str, str], Dict[str, Dict[str, str]]]
        | None = None,
        oversampled_only: bool | None = True,
        verbose: bool | None = False,
        min_pred_score: float | None = None,
    ) -> (
        Dict[str, Dict[str, Dict[str, float]]]
        | Dict[str, Dict[str, pd.DataFrame]]
        | Tuple[
            Dict[str, Dict[str, Dict[str, float]]], Dict[str, Dict[str, pd.DataFrame]]
        ]
    ):
        """Create the content data for figure 2a. (get metrics for each task)

        Args:
            results_dir (Path): Directory containing the results. Needs to be parent over category folders.
            merge_assays (bool): Merge similar assays (rna-seq x2, wgbs x2)
            exclude_categories (List[str]): Task categories to exclude (first level directory names).
            exclude_names (List[str]): Names of folders to exclude (ex: 7c or no-mix).
            include_categories (List[str]): Task categories to include (first level directory names).
            include_names (List[str]): Names of folders to include (ex: 7c or no-mix).
            return_type (str): Type of data to return ('metrics', 'split_results', 'both').
            mislabel_corrections (Tuple[Dict[str, str], Dict[str, Dict[str, str]]]): ({md5sum: EpiRR_no-v},{label_category: {EpiRR_no-v: corrected_label}})
            oversampled_only (bool): Only include oversampled runs.
            verbose (bool): Print additional information.
            min_pred_score (float): Minimum prediction score to consider. Affects the metrics. Defaults to None.

        Returns:
            Union[Dict[str, Dict[str, Dict[str, float]]],
                Dict[str, Dict[str, pd.DataFrame]],
                Tuple[Dict[str, Dict[str, Dict[str, float]]], Dict[str, Dict[str, pd.DataFrame]]]]
                Depending on return_type, it returns:
                - 'metrics': A metrics dictionary with the structure {split_name: {task_name: metrics_dict}}
                - 'split_results': A split results dictionary with the structure {task_name: {split_name: split_results_df}}
                - 'both': A tuple with both dictionaries described above
        """
        if return_type not in ["metrics", "split_results", "both"]:
            raise ValueError(
                f"Invalid return_type: {return_type}. Choose from 'metrics', 'split_results', or 'both'."
            )

        all_split_results = {}
        split_results_handler = SplitResultsHandler()

        if mislabel_corrections:
            md5sum_to_epirr, epirr_to_corrections = mislabel_corrections
        else:
            md5sum_to_epirr = {}
            epirr_to_corrections = {}

        for parent, _, _ in os.walk(results_dir, followlinks=True):
            # Looking for oversampling only results
            parent = Path(parent)
            if "10fold" not in parent.name:
                if verbose:
                    print(f"Skipping {parent}: not 10fold")
                continue
            if parent.name != "10fold-oversampling" and oversampled_only:
                if verbose:
                    print(f"Skipping {parent}: not oversampled")
                continue

            if verbose:
                print(f"Checking {parent}")

            # Get the category + filter
            relpath = parent.relative_to(results_dir)
            category = relpath.parts[0].replace("_1l_3000n", "")
            if verbose:
                print(f"Checking category: {category}")
            if include_categories is not None:
                if not any(include_str in category for include_str in include_categories):
                    if verbose:
                        print(f"Skipping {category}: not in {include_categories}")
                    continue
            if exclude_categories is not None:
                if any(exclude_str in category for exclude_str in exclude_categories):
                    if verbose:
                        print(f"Skipping {category}: in {exclude_categories}")
                    continue

            # Get the rest of the name, ignore certain runs
            rest_of_name = list(relpath.parts[1:])
            for dir_name in ["10fold", "10fold-oversampling"]:
                try:
                    rest_of_name.remove(dir_name)
                except ValueError:
                    pass

            if len(rest_of_name) > 1:
                raise ValueError(
                    f"Too many parts in the name: {rest_of_name}. Path: {relpath}"
                )
            if rest_of_name:
                rest_of_name = rest_of_name[0]
            if verbose:
                print(f"Rest of name: {rest_of_name}")

            # Filter out certain runs
            if include_names is not None:
                if not any(name in rest_of_name for name in include_names):
                    if verbose:
                        print(
                            f"Skipping {category} {rest_of_name}: not in {include_names}"
                        )
                    continue
            if exclude_names is not None:
                if any(name in rest_of_name for name in exclude_names):
                    if verbose:
                        print(f"Skipping {category} {rest_of_name}: in {exclude_names}")
                    continue

            full_task_name = category
            if rest_of_name:
                full_task_name += f"_{rest_of_name}"

            # Get the split results
            if verbose:
                print(f"Getting split results for {full_task_name}")
            split_results = split_results_handler.read_split_results(parent)
            if not split_results:
                raise ValueError(f"No split results found in {parent}")

            if (
                ("sex" in full_task_name) or ("life_stage" in full_task_name)
            ) and mislabel_corrections:
                corrections = epirr_to_corrections[category]
                for split_name, results_df in list(split_results.items()):
                    current_true_class = results_df["True class"].to_dict()
                    new_true_class = {
                        k: corrections.get(md5sum_to_epirr[k], v)
                        for k, v in current_true_class.items()
                    }
                    results_df["True class"] = new_true_class.values()

                    split_results[split_name] = results_df

            if ("assay" in full_task_name) and merge_assays:
                for split_name, df in split_results.items():
                    try:
                        split_result_df = merge_similar_assays(df)
                    except ValueError as e:
                        print(f"Skipping {full_task_name} assay merging: {e}")
                        break
                    split_results[split_name] = split_result_df

            if min_pred_score:
                for split_name, df in split_results.items():
                    tmp_df = df.copy()
                    tmp_df = SplitResultsHandler.add_max_pred(tmp_df)
                    tmp_df = tmp_df[tmp_df["Max pred"] >= min_pred_score]
                    tmp_df = tmp_df.drop(columns=["Max pred"])
                    split_results[split_name] = tmp_df

            all_split_results[full_task_name] = split_results

        if return_type in ["metrics", "both"]:
            try:
                split_results_metrics = split_results_handler.compute_split_metrics(
                    all_split_results, concat_first_level=True
                )
            except KeyError as e:
                logging.error("KeyError: %s", e)
                logging.error("all_split_results: %s", all_split_results)
                logging.error("check folder: %s", results_dir)
                raise e

        if return_type == "metrics":
            # pylint: disable=possibly-used-before-assignment
            return split_results_metrics
        if return_type == "split_results":
            return all_split_results

        # the default return type is 'both'
        return split_results_metrics, all_split_results

    def obtain_all_feature_set_data(
        self,
        parent_folder: Path,
        merge_assays: bool,
        return_type: str,
        include_sets: List[str] | None = None,
        include_categories: List[str] | None = None,
        exclude_names: List[str] | None = None,
        oversampled_only: bool | None = True,
        verbose: bool | None = False,
    ) -> (
        Dict[str, Dict[str, Dict[str, Dict[str, float]]]]
        | Dict[str, Dict[str, Dict[str, pd.DataFrame]]]
    ):
        """
        Obtain either metrics or split results for all feature sets based on the specified return_type.

        Args:
            parent_folder (Path): The parent folder containing all feature set folders.
                Needs to be the parent of feature set folders.
            merge_assays (bool): Whether to merge similar assays (e.g., RNA-seq x2, WGBS x2).
            return_type (str): Type of data to return, either "metrics" or "split_results".
            include_sets (List[str] | None): Feature sets to include.
            include_categories (List[str] | None): Task categories to include.
            exclude_names (List[str] | None): Names of folders to exclude (e.g., 7c or no-mix).
            oversampled_only (bool): Only include oversampled runs.
            verbose (bool): Print additional information.

        Returns:
            Union[Dict[str, Dict[str, Dict[str, Dict[str, float]]]], Dict[str, Dict[str, Dict[str, pd.DataFrame]]]]:
            A dictionary containing either metrics or results for all feature sets.
            Format for metrics: {feature_set: {task_name: {split_name: metric_dict}}}
            Format for split results: {feature_set: {task_name: {split_name: results_dataframe}}}
        """
        valid_return_types = ["metrics", "split_results"]
        if return_type not in valid_return_types:
            raise ValueError(
                f"Invalid return_type: {return_type}. Choose from {valid_return_types}."
            )

        all_data = {}
        if verbose:
            print(f"Checking folder: {parent_folder}")
        for folder in parent_folder.iterdir():
            if not folder.is_dir():
                continue

            feature_set = folder.name
            if include_sets is not None and feature_set not in include_sets:
                if verbose:
                    print(f"Skipping {feature_set}: not in 'include_sets' list.")
                continue

            try:
                # Fetch either metrics or split results based on the return_type
                split_data = self.general_split_metrics(
                    results_dir=folder,
                    merge_assays=merge_assays,
                    return_type=return_type,
                    include_categories=include_categories,
                    exclude_names=exclude_names,
                    oversampled_only=oversampled_only,
                    verbose=verbose,
                )

                # Invert if metrics, otherwise keep as is
                if return_type == "metrics":
                    inverted_data = self.invert_metrics_dict(split_data)  # type: ignore
                    all_data[feature_set] = inverted_data
                elif return_type == "split_results":
                    all_data[feature_set] = split_data  # type: ignore

            except ValueError as err:
                raise ValueError(f"Problem with {feature_set}") from err

        return all_data

Static methods

def add_max_pred(df: pd.DataFrame, target_label: str = 'True class', expected_classes: List[str] | None = None)

Add the max prediction column ("Max pred") to the results dataframe.

The dataframe needs to not contain extra metadata columns. target_label: Column to ascertain output classes columns

def calculate_metrics_for_single_df(df: pd.DataFrame, task_name: str = 'unknown task', logging_name: str = 'df') ‑> Dict[str, float]

Compute metrics for a single task's DataFrame.

Expects a DataFrame with the following columns: md5sum, True class, predicted class, predicted probabilities

Output is a dictionary of metrics with the following the following keys: Accuracy, F1_macro, AUC_micro, AUC_macro, count

def compute_acc_per_assay(split_results: Dict[str, pd.DataFrame], metadata_df: pd.DataFrame) ‑> pandas.core.frame.DataFrame

Compute accuracy per assay for each split.

Args: - split_results: {split_name: results_df}. - metadata_df: The metadata dataframe.

def compute_split_metrics(all_split_dfs: Dict[str, Dict[str, pd.DataFrame]], concat_first_level: bool = False) ‑> Dict[str, Dict[str, Dict[str, float]]]

Compute desired metrics for each split and classifier, accommodating different dictionary structures.

This method supports two structures of input dictionaries: 1. {split_name: {classifier_name: results_df}} when concat_first_level is False. 2. {classifier_name: {split_name: results_df}} when concat_first_level is True.

Args

all_split_dfs
A nested dictionary of DataFrames. The structure depends on the concat_first_level flag. (Docstring corrected to use all_split_dfs)
concat_first_level
A boolean flag that indicates the structure of the all_split_dfs dictionary. If True, the first level is the classifier name. Otherwise, the first level is the split name.

Returns

A nested dictionary with metrics computed for each classifier and split. The structure is {split_name: {classifier_name: {metric_name: value}}}.

def concatenate_split_results(split_dfs: Dict[str, Dict[str, pd.DataFrame]] | Dict[str, pd.DataFrame], concat_first_level: bool = False, depth: int = 2)

Concatenate split results for each different classifier or split name based on the order.

This method supports two structures of input dictionaries: 1. {split_name: {classifier_name: results_df}} when concat_first_level is False. 2. {classifier_name: {split_name: results_df}} when concat_first_level is True.

Args

split_dfs
A nested dictionary of DataFrames to be concatenated. The structure depends on the concat_first_level flag.
concat_first_level
A boolean flag that indicates the structure of the split_dfs dictionary. If True, the first level is the classifier name. Otherwise, the first level is the split name.
depth
The depth of the dictionary structure. Must be 1 or 2.

Returns

Dict[str, pd.DataFrame]
{classifier_name: concatenated_dataframe}

Raises

AssertionError
If the index of any concatenated DataFrame is not of type str, indicating an unexpected index format.
def extract_count_from_metrics(metrics) ‑> Dict[str, int]

Extract total file count from metrics dict (sums on each split).

Returns a dict {classifier_name: count}

def gather_split_results_across_categories(parent_results_dir: Path, verbose: bool = False) ‑> Dict[str, Dict[str, pandas.core.frame.DataFrame]]

Gather NN split results for each classification task in the given folder children.

Returns

Dict[str, Dict[str, pd.DataFrame]]
{general_name:{split_name: results_df}}
def gather_split_results_across_methods(results_dir: Path, label_category: str, only_NN: bool = False) ‑> Dict[str, Dict[str, pandas.core.frame.DataFrame]]

Gather split results for each classifier type.

Args

results_dir
The directory containing the results. Child directories should be task/classifer names.
label_category
The label category for the results.
only_NN
A boolean flag to only gather the NN results.

Returns

Dict[str, Dict[str, pd.DataFrame]]
{split_name:{classifier_name: results_df}}
def general_split_metrics(results_dir: Path, merge_assays: bool, exclude_categories: List[str] | None = None, exclude_names: List[str] | None = None, include_categories: List[str] | None = None, include_names: List[str] | None = None, return_type: str = 'both', mislabel_corrections: Tuple[Dict[str, str], Dict[str, Dict[str, str]]] | None = None, oversampled_only: bool | None = True, verbose: bool | None = False, min_pred_score: float | None = None)

Create the content data for figure 2a. (get metrics for each task)

Args

results_dir : Path
Directory containing the results. Needs to be parent over category folders.
merge_assays : bool
Merge similar assays (rna-seq x2, wgbs x2)
exclude_categories : List[str]
Task categories to exclude (first level directory names).
exclude_names : List[str]
Names of folders to exclude (ex: 7c or no-mix).
include_categories : List[str]
Task categories to include (first level directory names).
include_names : List[str]
Names of folders to include (ex: 7c or no-mix).
return_type : str
Type of data to return ('metrics', 'split_results', 'both').
mislabel_corrections : Tuple[Dict[str, str], Dict[str, Dict[str, str]]]
({md5sum: EpiRR_no-v},{label_category: {EpiRR_no-v: corrected_label}})
oversampled_only : bool
Only include oversampled runs.
verbose : bool
Print additional information.
min_pred_score : float
Minimum prediction score to consider. Affects the metrics. Defaults to None.

Returns

Union[Dict[str, Dict[str, Dict[str, float]]], Dict[str, Dict[str, pd.DataFrame]], Tuple[Dict[str, Dict[str, Dict[str, float]]], Dict[str, Dict[str, pd.DataFrame]]]] Depending on return_type, it returns: - 'metrics': A metrics dictionary with the structure {split_name: {task_name: metrics_dict}} - 'split_results': A split results dictionary with the structure {task_name: {split_name: split_results_df}} - 'both': A tuple with both dictionaries described above

def invert_metrics_dict(metrics: Dict[str, Dict[str, Dict[str, float]]]) ‑> Dict[str, Dict[str, Dict[str, float]]]

Invert metrics dict so classifier name is parent key. Before: {split_name:{classifier_name:{metric_name:val}}} After: {classifier_name:{split_name:{metric_name:val}}}

def read_split_results(parent_dir: Path) ‑> Dict[str, pandas.core.frame.DataFrame]

Read split results from the given parent directory.

Args

parent_dir
The parent directory containing the split results.

Returns

Dict[str, pd.DataFrame]
{split_name: results_df}
def verify_md5_consistency(dfs: Dict[str, pd.DataFrame]) ‑> None

Verify that all dataframes have the same md5sums.

Used to compare the md5sums of the same split across different classifiers methods.

Args

dfs : Dict[str, pd.DataFrame]
{classifier_name: results_df} Results dataframes need to have md5sums as index.

Methods

def obtain_all_feature_set_data(self, parent_folder: Path, merge_assays: bool, return_type: str, include_sets: List[str] | None = None, include_categories: List[str] | None = None, exclude_names: List[str] | None = None, oversampled_only: bool | None = True, verbose: bool | None = False)

Obtain either metrics or split results for all feature sets based on the specified return_type.

Args

parent_folder : Path
The parent folder containing all feature set folders. Needs to be the parent of feature set folders.
merge_assays : bool
Whether to merge similar assays (e.g., RNA-seq x2, WGBS x2).
return_type : str
Type of data to return, either "metrics" or "split_results".
include_sets (List[str] | None): Feature sets to include.
include_categories (List[str] | None): Task categories to include.
exclude_names (List[str] | None): Names of folders to exclude (e.g., 7c or no-mix).
oversampled_only : bool
Only include oversampled runs.
verbose : bool
Print additional information.

Returns

Union[Dict[str, Dict[str, Dict[str, Dict[str, float]]]], Dict[str, Dict[str, Dict[str, pd.DataFrame]]]]:
A dictionary containing either metrics or results for all feature sets.
Format for metrics
{feature_set: {task_name: {split_name: metric_dict}}}
Format for split results
{feature_set: {task_name: {split_name: results_dataframe}}}
class TemporaryLogFilter (filter_obj, logger=None)

Context manager for adding a filter to a logger.

Expand source code
class TemporaryLogFilter:
    """Context manager for adding a filter to a logger."""

    def __init__(self, filter_obj, logger=None):
        self.logger = logger or logging.getLogger()
        self.filter = filter_obj

    def __enter__(self):
        self.logger.addFilter(self.filter)

    def __exit__(self, exc_type, exc_value, traceback):
        self.logger.removeFilter(self.filter)