Source code for skfeaturellm.transformations.pipeline

"""
Transformation pipeline for applying feature transformations to DataFrames.
"""

import json
import warnings
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Type, Union

import pandas as pd

from skfeaturellm.transformations.base import BaseTransformation, TransformationError


[docs] class TransformationParseError(TransformationError): """Raised when parsing a transformation definition fails.""" pass
_TRANSFORMATION_REGISTRY: Dict[str, Type[BaseTransformation]] = {}
[docs] def register_transformation(name: str): """ Decorator to register a transformation class with a type name. Parameters ---------- name : str The type name used in JSON/YAML configs (e.g., "add", "div") """ def decorator(cls: Type[BaseTransformation]) -> Type[BaseTransformation]: _TRANSFORMATION_REGISTRY[name] = cls return cls return decorator
[docs] def get_registered_transformations() -> Dict[str, Type[BaseTransformation]]: """Return a copy of the transformation registry.""" return _TRANSFORMATION_REGISTRY.copy()
[docs] def get_transformation_types_for_prompt() -> str: """ Generate documentation of available transformation types for LLM prompts. This function dynamically generates the transformation types section by querying the registry and calling get_prompt_description() on each registered transformation class. Returns ------- str Formatted documentation string listing all available transformations """ lines = [] for name, cls in sorted(_TRANSFORMATION_REGISTRY.items()): description = cls.get_prompt_description() lines.append(f'- "{name}": {description}') return "\n".join(lines)
[docs] def get_unary_operation_types() -> Set[str]: """ Get the set of registered unary operation type names. Returns ------- Set[str] Set of unary operation names (e.g., {"log", "sqrt", "abs"}) """ # Import here to avoid circular imports from skfeaturellm.transformations.unary.arithmetic import UnaryTransformation unary_ops = set() for name, cls in _TRANSFORMATION_REGISTRY.items(): if issubclass(cls, UnaryTransformation): unary_ops.add(name) return unary_ops
[docs] def get_binary_operation_types() -> Set[str]: """ Get the set of registered binary operation type names. Returns ------- Set[str] Set of binary operation names (e.g., {"add", "sub", "mul", "div"}) """ # Import here to avoid circular imports from skfeaturellm.transformations.binary.arithmetic import ( BinaryArithmeticTransformation, ) binary_ops = set() for name, cls in _TRANSFORMATION_REGISTRY.items(): if issubclass(cls, BinaryArithmeticTransformation): binary_ops.add(name) return binary_ops
[docs] def get_all_operation_types() -> Set[str]: """ Get all registered operation type names. Returns ------- Set[str] Set of all operation names """ return set(_TRANSFORMATION_REGISTRY.keys())
[docs] class TransformationPipeline: """ Executes a set of transformations against a DataFrame. The executor can be initialized with transformations directly, or loaded from JSON/YAML configuration files. Parameters ---------- transformations : List[BaseTransformation], optional List of transformation objects to execute raise_on_error : bool, default=True If True, raise exceptions on transformation errors. If False, skip failed transformations with a warning. Examples -------- Direct instantiation: >>> from skfeaturellm.transformations import AddTransformation, DivTransformation >>> executor = TransformationPipeline(transformations=[ ... DivTransformation("ratio", "a", right_column="b"), ... AddTransformation("sum", "a", right_column="b"), ... ]) >>> result_df = executor.fit(df).transform(df) From JSON file: >>> executor = TransformationPipeline.from_json("transformations.json") >>> result_df = executor.fit(df).transform(df) From dict (e.g., LLM output): >>> config = {"transformations": [{"type": "add", "feature_name": "sum", ...}]} >>> executor = TransformationPipeline.from_dict(config) >>> result_df = executor.fit(df).transform(df) """ def __init__( self, transformations: Optional[List[BaseTransformation]] = None, raise_on_error: bool = True, ): self.transformations = transformations or [] self.raise_on_error = raise_on_error
[docs] @classmethod def from_dict( cls, config: Dict[str, Any], raise_on_error: bool = True, ) -> "TransformationPipeline": """ Create an executor from a dictionary configuration. Parameters ---------- config : Dict[str, Any] Configuration dict with a "transformations" key containing a list of transformation definitions raise_on_error : bool, default=True If True, raise exceptions on transformation errors Returns ------- TransformationPipeline Configured executor instance Raises ------ TransformationParseError If the configuration is invalid """ if "transformations" not in config: raise TransformationParseError( "Configuration must contain a 'transformations' key" ) transformations = [] for i, t_config in enumerate(config["transformations"]): transformation = cls._parse_transformation(t_config, index=i) transformations.append(transformation) return cls(transformations=transformations, raise_on_error=raise_on_error)
[docs] @classmethod def from_json( cls, path: Union[str, Path], raise_on_error: bool = True, ) -> "TransformationPipeline": """ Create an executor from a JSON configuration file. Parameters ---------- path : Union[str, Path] Path to the JSON configuration file raise_on_error : bool, default=True If True, raise exceptions on transformation errors Returns ------- TransformationPipeline Configured executor instance """ path = Path(path) with open(path) as f: config = json.load(f) return cls.from_dict(config, raise_on_error=raise_on_error)
[docs] @classmethod def from_yaml( cls, path: Union[str, Path], raise_on_error: bool = True, ) -> "TransformationPipeline": """ Create an executor from a YAML configuration file. Parameters ---------- path : Union[str, Path] Path to the YAML configuration file raise_on_error : bool, default=True If True, raise exceptions on transformation errors Returns ------- TransformationPipeline Configured executor instance Raises ------ ImportError If PyYAML is not installed """ try: import yaml except ImportError: raise ImportError( "PyYAML is required for YAML support. " "Install it with: pip install pyyaml" ) path = Path(path) with open(path) as f: config = yaml.safe_load(f) return cls.from_dict(config, raise_on_error=raise_on_error)
@classmethod def _parse_transformation( cls, config: Dict[str, Any], index: int ) -> BaseTransformation: """ Parse a single transformation definition into a transformation object. Parameters ---------- config : Dict[str, Any] Transformation configuration dict index : int Index of the transformation (for error messages) Returns ------- BaseTransformation The parsed transformation object """ if "type" not in config: raise TransformationParseError( f"Transformation at index {index} missing required 'type' field" ) t_type = config["type"] if t_type not in _TRANSFORMATION_REGISTRY: available = list(_TRANSFORMATION_REGISTRY.keys()) raise TransformationParseError( f"Unknown transformation type '{t_type}' at index {index}. " f"Available types: {available}" ) t_class = _TRANSFORMATION_REGISTRY[t_type] kwargs = {k: v for k, v in config.items() if k != "type"} try: return t_class(**kwargs) except TypeError as e: raise TransformationParseError( f"Invalid arguments for transformation '{t_type}' at index {index}: {e}" ) except ValueError as e: raise TransformationParseError(e)
[docs] def fit(self, df: pd.DataFrame) -> "TransformationPipeline": """ Fit all transformations to training data. Parameters ---------- df : pd.DataFrame The training DataFrame Returns ------- TransformationPipeline self """ for transformation in self.transformations: try: transformation.fit(df) except TransformationError as e: if self.raise_on_error: raise warnings.warn( f"Fitting transformation '{transformation.feature_name}' failed: {e}. " f"Skipping." ) return self
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame: """ Apply all fitted transformations and return a DataFrame with new features. Parameters ---------- df : pd.DataFrame The input DataFrame Returns ------- pd.DataFrame A copy of the input DataFrame with new feature columns added """ if not self.transformations: warnings.warn("No transformations to apply.") return df.copy() result_df = df.copy() for transformation in self.transformations: try: feature_series = transformation.transform(df) result_df[transformation.feature_name] = feature_series except TransformationError as e: if self.raise_on_error: raise warnings.warn( f"Transformation '{transformation.feature_name}' failed: {e}. " f"Skipping." ) return result_df
[docs] def get_required_columns( self, transformations: Optional[List[BaseTransformation]] = None ) -> Set[str]: """ Get all column names required by transformations. Parameters ---------- transformations : List[BaseTransformation], optional List of transformations to analyze. If not provided, uses self.transformations. Returns ------- Set[str] Set of required column names """ transforms = ( transformations if transformations is not None else self.transformations ) required: Set[str] = set() for transformation in transforms: required.update(transformation.get_required_columns()) return required