Source code for fit.ensemble.boosting

"""
Boosting ensemble methods.

This module implements boosting algorithms like AdaBoost that
sequentially fit weak learners and combine them into a strong learner.
"""

import numpy as np
from typing import Optional, Union, Any

from fit.core.tensor import Tensor
from fit.ensemble.base import BaseEnsemble


[docs] class AdaBoostClassifier(BaseEnsemble): """ AdaBoost classifier implementation. AdaBoost fits a sequence of weak learners on repeatedly modified versions of the data. The predictions from all of them are then combined through a weighted majority vote. Examples: >>> from fit.ensemble import AdaBoostClassifier >>> from fit.simple.models import MLP >>> >>> # Create AdaBoost classifier >>> ada = AdaBoostClassifier( ... base_estimator=MLP([4, 2]), ... n_estimators=50, ... learning_rate=1.0 ... ) >>> ada.fit(X_train, y_train) >>> predictions = ada.predict(X_test) """
[docs] def __init__( self, base_estimator=None, n_estimators: int = 50, learning_rate: float = 1.0, random_state: Optional[int] = None, ): """ Initialize AdaBoost classifier. Args: base_estimator: Base estimator to boost n_estimators: Maximum number of estimators learning_rate: Learning rate shrinks the contribution of each classifier random_state: Random state for reproducibility """ super().__init__(n_estimators=n_estimators, random_state=random_state) self.base_estimator = base_estimator self.learning_rate = learning_rate self.estimator_weights_ = [] self.estimator_errors_ = []
def _make_estimator(self) -> Any: """ Create a new estimator instance. Returns: New estimator instance """ if self.base_estimator is None: # Default to a simple decision stump (single layer perceptron) from fit.simple.models import MLP return MLP([1, 1], activation="tanh") # Simple weak learner # Create a copy of the base estimator if hasattr(self.base_estimator, "copy"): return self.base_estimator.copy() else: estimator_class = self.base_estimator.__class__ return estimator_class() def _fit_estimator( self, estimator, X: np.ndarray, y: np.ndarray, sample_weights: np.ndarray ): """ Fit a single estimator with sample weights. Args: estimator: The estimator to fit X: Training data y: Target values sample_weights: Weights for each sample """ # For simplicity, we'll simulate weighted training by sampling # In a full implementation, the estimator would support sample weights # Create weighted bootstrap sample n_samples = len(X) weighted_indices = np.random.choice( n_samples, size=n_samples, replace=True, p=sample_weights / sample_weights.sum(), ) X_weighted = X[weighted_indices] y_weighted = y[weighted_indices] # Fit estimator if hasattr(estimator, "fit"): estimator.fit(X_weighted, y_weighted) def _predict_estimator(self, estimator, X: np.ndarray) -> np.ndarray: """ Make predictions with a single estimator. Args: estimator: The fitted estimator X: Input data Returns: Predictions from the estimator """ if hasattr(estimator, "predict"): predictions = estimator.predict(X) elif hasattr(estimator, "forward"): # For neural network models X_tensor = Tensor(X) predictions = estimator.forward(X_tensor).data else: raise ValueError(f"Estimator {estimator} has no predict or forward method") # Convert to binary predictions if needed if predictions.ndim > 1: predictions = np.argmax(predictions, axis=1) # Convert to {-1, +1} format for AdaBoost unique_classes = np.unique(predictions) if len(unique_classes) == 2: # Binary classification: convert to -1, +1 binary_pred = np.where(predictions == unique_classes[0], -1, 1) return binary_pred else: # Multi-class: keep original format return predictions
[docs] def fit( self, X: Union[np.ndarray, Tensor], y: Union[np.ndarray, Tensor] ) -> "AdaBoostClassifier": """ Build a boosted classifier from the training set. Args: X: Training data y: Target values Returns: Self for method chaining """ # Convert to numpy if needed if isinstance(X, Tensor): X = X.data if isinstance(y, Tensor): y = y.data X = np.asarray(X) y = np.asarray(y) # Convert labels to {-1, +1} for binary classification self.classes_ = np.unique(y) if len(self.classes_) == 2: y_binary = np.where(y == self.classes_[0], -1, 1) else: # For multi-class, we'll use one-vs-rest approach (simplified) y_binary = y.copy() n_samples = X.shape[0] # Initialize sample weights uniformly sample_weights = np.ones(n_samples) / n_samples # Clear previous results self.estimators_ = [] self.estimator_weights_ = [] self.estimator_errors_ = [] for iboost in range(self.n_estimators): # Create and fit weak learner estimator = self._make_estimator() self._fit_estimator(estimator, X, y_binary, sample_weights) # Get predictions y_predict = self._predict_estimator(estimator, X) # Calculate error rate incorrect = y_predict != y_binary estimator_error = np.average(incorrect, weights=sample_weights) # If error is too high or too low, stop if estimator_error <= 0: # Perfect classifier self.estimators_.append(estimator) self.estimator_weights_.append(1.0) self.estimator_errors_.append(estimator_error) break if estimator_error >= 0.5: # Worse than random if len(self.estimators_) == 0: raise ValueError( "BaseClassifier in AdaBoostClassifier " "ensemble is worse than random, ensemble " "can not be fitted." ) break # Calculate alpha (estimator weight) alpha = ( self.learning_rate * 0.5 * np.log((1 - estimator_error) / estimator_error) ) # Store estimator and its weight self.estimators_.append(estimator) self.estimator_weights_.append(alpha) self.estimator_errors_.append(estimator_error) # Update sample weights sample_weights *= np.exp(alpha * incorrect * (y_predict != y_binary)) sample_weights /= sample_weights.sum() # If all samples have equal weight, stop if np.abs(sample_weights - 1.0 / n_samples).sum() < 1e-10: break self.is_fitted_ = True return self
[docs] def predict(self, X: Union[np.ndarray, Tensor]) -> np.ndarray: """ Predict classes for samples in X. Args: X: Input data Returns: Predicted class labels """ if not self.is_fitted_: raise ValueError("This AdaBoostClassifier instance is not fitted yet.") # Convert to numpy if needed if isinstance(X, Tensor): X = X.data X = np.asarray(X) n_samples = X.shape[0] # Get weighted predictions from all estimators decision_scores = np.zeros(n_samples) for estimator, weight in zip(self.estimators_, self.estimator_weights_): predictions = self._predict_estimator(estimator, X) decision_scores += weight * predictions # Convert back to original class labels if len(self.classes_) == 2: # Binary classification binary_predictions = np.where(decision_scores >= 0, 1, -1) return np.where( binary_predictions == -1, self.classes_[0], self.classes_[1] ) else: # Multi-class (simplified) return np.where(decision_scores >= 0, self.classes_[1], self.classes_[0])
[docs] def decision_function(self, X: Union[np.ndarray, Tensor]) -> np.ndarray: """ Compute the decision function of X. Args: X: Input data Returns: Decision function values """ if not self.is_fitted_: raise ValueError("This AdaBoostClassifier instance is not fitted yet.") # Convert to numpy if needed if isinstance(X, Tensor): X = X.data X = np.asarray(X) n_samples = X.shape[0] decision_scores = np.zeros(n_samples) for estimator, weight in zip(self.estimators_, self.estimator_weights_): predictions = self._predict_estimator(estimator, X) decision_scores += weight * predictions return decision_scores
def _combine_predictions(self, predictions: np.ndarray) -> np.ndarray: """Not used in AdaBoost as it uses weighted voting.""" raise NotImplementedError("AdaBoost uses weighted voting logic")
[docs] class GradientBoostingClassifier(BaseEnsemble): """ Gradient Boosting classifier. This implementation is simplified and focuses on the core concept of gradient boosting for educational purposes. Examples: >>> from fit.ensemble import GradientBoostingClassifier >>> >>> # Create gradient boosting classifier >>> gb = GradientBoostingClassifier( ... n_estimators=100, ... learning_rate=0.1, ... max_depth=3 ... ) >>> gb.fit(X_train, y_train) >>> predictions = gb.predict(X_test) """
[docs] def __init__( self, n_estimators: int = 100, learning_rate: float = 0.1, max_depth: int = 3, random_state: Optional[int] = None, ): """ Initialize Gradient Boosting classifier. Args: n_estimators: Number of boosting stages learning_rate: Learning rate shrinks contribution of each tree max_depth: Maximum depth of individual regression estimators random_state: Random state for reproducibility """ super().__init__(n_estimators=n_estimators, random_state=random_state) self.learning_rate = learning_rate self.max_depth = max_depth # Simplified: we'll use the initial prediction as the mean self.init_prediction_ = None
def _make_estimator(self) -> Any: """ Create a new estimator instance (decision tree surrogate). Returns: New estimator instance """ # For simplicity, use a small MLP as a surrogate for decision trees from fit.simple.models import MLP return MLP([1, 4, 1], activation="tanh") def _fit_estimator( self, estimator, X: np.ndarray, residuals: np.ndarray, sample_indices: np.ndarray, ): """ Fit a single estimator to residuals. Args: estimator: The estimator to fit X: Training data residuals: Current residuals to fit sample_indices: Sample indices (not used in basic GB) """ # Fit estimator to residuals if hasattr(estimator, "fit"): estimator.fit(X, residuals) def _predict_estimator(self, estimator, X: np.ndarray) -> np.ndarray: """ Make predictions with a single estimator. Args: estimator: The fitted estimator X: Input data Returns: Predictions from the estimator """ if hasattr(estimator, "predict"): return estimator.predict(X) elif hasattr(estimator, "forward"): # For neural network models X_tensor = Tensor(X) predictions = estimator.forward(X_tensor) return predictions.data.flatten() else: raise ValueError(f"Estimator {estimator} has no predict or forward method")
[docs] def fit( self, X: Union[np.ndarray, Tensor], y: Union[np.ndarray, Tensor] ) -> "GradientBoostingClassifier": """ Fit the gradient boosting model. Args: X: Training data y: Target values Returns: Self for method chaining """ # Convert to numpy if needed if isinstance(X, Tensor): X = X.data if isinstance(y, Tensor): y = y.data X = np.asarray(X, dtype=np.float64) y = np.asarray(y) # Store classes self.classes_ = np.unique(y) n_classes = len(self.classes_) if n_classes == 2: # Binary classification: convert to {0, 1} y_encoded = np.where(y == self.classes_[0], 0, 1) else: # Multi-class: use label encoding y_encoded = np.searchsorted(self.classes_, y) n_samples = X.shape[0] # Initialize with prior (class probability) if n_classes == 2: # Binary case: use log-odds pos_rate = np.mean(y_encoded) pos_rate = np.clip(pos_rate, 1e-15, 1 - 1e-15) # Avoid log(0) self.init_prediction_ = np.log(pos_rate / (1 - pos_rate)) else: # Multi-class: use most frequent class self.init_prediction_ = np.bincount(y_encoded).argmax() # Initialize predictions if n_classes == 2: current_predictions = np.full(n_samples, self.init_prediction_) else: current_predictions = np.full(n_samples, self.init_prediction_) # Clear previous estimators self.estimators_ = [] # Fit estimators for stage in range(self.n_estimators): # Calculate residuals (negative gradient) if n_classes == 2: # Binary classification: logistic loss gradient probabilities = self._sigmoid(current_predictions) residuals = y_encoded - probabilities else: # Multi-class: simplified residuals residuals = y_encoded - current_predictions # Fit estimator to residuals estimator = self._make_estimator() self._fit_estimator(estimator, X, residuals, np.arange(n_samples)) # Get predictions from the estimator tree_predictions = self._predict_estimator(estimator, X) # Update current predictions current_predictions += self.learning_rate * tree_predictions # Store the estimator self.estimators_.append(estimator) self.is_fitted_ = True return self
[docs] def predict(self, X: Union[np.ndarray, Tensor]) -> np.ndarray: """ Predict class labels for samples in X. Args: X: Input data Returns: Predicted class labels """ if not self.is_fitted_: raise ValueError( "This GradientBoostingClassifier instance is not fitted yet." ) # Get decision function values decision_values = self.decision_function(X) if len(self.classes_) == 2: # Binary classification predictions = (decision_values >= 0).astype(int) return self.classes_[predictions] else: # Multi-class predictions = np.round(decision_values).astype(int) predictions = np.clip(predictions, 0, len(self.classes_) - 1) return self.classes_[predictions]
[docs] def decision_function(self, X: Union[np.ndarray, Tensor]) -> np.ndarray: """ Compute the decision function of X. Args: X: Input data Returns: Decision function values """ if not self.is_fitted_: raise ValueError( "This GradientBoostingClassifier instance is not fitted yet." ) # Convert to numpy if needed if isinstance(X, Tensor): X = X.data X = np.asarray(X, dtype=np.float64) n_samples = X.shape[0] # Start with initial prediction predictions = np.full(n_samples, self.init_prediction_) # Add contributions from all estimators for estimator in self.estimators_: tree_predictions = self._predict_estimator(estimator, X) predictions += self.learning_rate * tree_predictions return predictions
[docs] def predict_proba(self, X: Union[np.ndarray, Tensor]) -> np.ndarray: """ Predict class probabilities for samples in X. Args: X: Input data Returns: Class probabilities """ if len(self.classes_) != 2: raise NotImplementedError( "predict_proba only implemented for binary classification" ) decision_values = self.decision_function(X) probabilities = self._sigmoid(decision_values) # Return probabilities for both classes proba = np.column_stack([1 - probabilities, probabilities]) return proba
def _sigmoid(self, x: np.ndarray) -> np.ndarray: """ Compute sigmoid function with numerical stability. Args: x: Input values Returns: Sigmoid values """ # Clip to prevent overflow x = np.clip(x, -500, 500) return 1 / (1 + np.exp(-x)) def _combine_predictions(self, predictions: np.ndarray) -> np.ndarray: """Not used in Gradient Boosting.""" raise NotImplementedError("Gradient Boosting uses sequential fitting")
[docs] class SimpleBoostingClassifier(BaseEnsemble): """ Simplified boosting classifier for educational purposes. This is a basic implementation that demonstrates the core concepts of boosting without the complexity of AdaBoost or Gradient Boosting. Examples: >>> from fit.ensemble import SimpleBoostingClassifier >>> from fit.simple.models import MLP >>> >>> # Create simple boosting classifier >>> boost = SimpleBoostingClassifier( ... base_estimator=MLP([4, 2]), ... n_estimators=10 ... ) >>> boost.fit(X_train, y_train) >>> predictions = boost.predict(X_test) """
[docs] def __init__( self, base_estimator=None, n_estimators: int = 10, random_state: Optional[int] = None, ): """ Initialize simple boosting classifier. Args: base_estimator: Base estimator to boost n_estimators: Number of estimators random_state: Random state for reproducibility """ super().__init__(n_estimators=n_estimators, random_state=random_state) self.base_estimator = base_estimator self.estimator_weights_ = []
def _make_estimator(self) -> Any: """Create a new estimator instance.""" if self.base_estimator is None: from fit.simple.models import MLP return MLP([1, 2], activation="tanh") if hasattr(self.base_estimator, "copy"): return self.base_estimator.copy() else: estimator_class = self.base_estimator.__class__ return estimator_class() def _fit_estimator( self, estimator, X: np.ndarray, y: np.ndarray, sample_indices: np.ndarray ): """Fit estimator on bootstrap sample.""" X_sample = X[sample_indices] y_sample = y[sample_indices] if hasattr(estimator, "fit"): estimator.fit(X_sample, y_sample) def _predict_estimator(self, estimator, X: np.ndarray) -> np.ndarray: """Make predictions with estimator.""" if hasattr(estimator, "predict"): predictions = estimator.predict(X) elif hasattr(estimator, "forward"): X_tensor = Tensor(X) predictions = estimator.forward(X_tensor).data else: raise ValueError(f"Estimator {estimator} has no predict or forward method") # Convert to class predictions if needed if predictions.ndim > 1: predictions = np.argmax(predictions, axis=1) return predictions
[docs] def fit( self, X: Union[np.ndarray, Tensor], y: Union[np.ndarray, Tensor] ) -> "SimpleBoostingClassifier": """ Fit the simple boosting model. Args: X: Training data y: Target values Returns: Self for method chaining """ # Convert to numpy if needed if isinstance(X, Tensor): X = X.data if isinstance(y, Tensor): y = y.data X = np.asarray(X) y = np.asarray(y) # Clear previous results self.estimators_ = [] self.estimator_weights_ = [] # Fit estimators sequentially for i in range(self.n_estimators): # Create estimator estimator = self._make_estimator() # Generate bootstrap sample with emphasis on previously misclassified examples if i == 0: # First iteration: uniform sampling sample_indices = self._bootstrap_sample(X.shape[0]) else: # Subsequent iterations: focus on misclassified examples prev_predictions = self._predict_estimator(self.estimators_[-1], X) misclassified = prev_predictions != y # Create weighted sampling favoring misclassified examples weights = np.ones(len(X)) weights[misclassified] *= 2 # Double weight for misclassified weights /= weights.sum() sample_indices = np.random.choice( len(X), size=len(X), replace=True, p=weights ) # Fit estimator self._fit_estimator(estimator, X, y, sample_indices) # Calculate accuracy on full dataset predictions = self._predict_estimator(estimator, X) accuracy = np.mean(predictions == y) # Simple weight based on accuracy weight = max(0.1, accuracy) # Minimum weight of 0.1 # Store estimator and weight self.estimators_.append(estimator) self.estimator_weights_.append(weight) self.is_fitted_ = True return self
def _combine_predictions(self, predictions: np.ndarray) -> np.ndarray: """ Combine predictions using weighted voting. Args: predictions: Array of shape (n_estimators, n_samples) Returns: Combined predictions """ n_samples = predictions.shape[1] result = np.zeros(n_samples, dtype=int) for i in range(n_samples): # Get weighted votes for this sample votes = predictions[:, i] weights = np.array(self.estimator_weights_) # Find unique classes and their weighted votes unique_classes = np.unique(votes) class_weights = np.zeros(len(unique_classes)) for j, cls in enumerate(unique_classes): class_mask = votes == cls class_weights[j] = np.sum(weights[class_mask]) # Choose class with highest weighted vote best_class_idx = np.argmax(class_weights) result[i] = unique_classes[best_class_idx] return result