"""
This module implements specialized operations for tensors.
These operations build on the core tensor capabilities and autograd engine
to provide higher-level functionality.
"""
import numpy as np
from typing import List, Optional, Tuple, Union
from fit.core.autograd import Function, get_function
from fit.core.tensor import Tensor
# Matrix Operations
[docs]
def matmul(a: Tensor, b: Tensor) -> Tensor:
"""
Perform matrix multiplication: a @ b.
Args:
a: First tensor
b: Second tensor
Returns:
Result tensor
"""
return a @ b
[docs]
def transpose(x: Tensor, axes: Optional[Tuple[int, ...]] = None) -> Tensor:
"""
Transpose a tensor.
Args:
x: Input tensor
axes: Permutation of dimensions (default is to reverse dimensions)
Returns:
Transposed tensor
"""
if not x.requires_grad:
return Tensor(np.transpose(x.data, axes))
# Define operation for transpose
class Transpose(Function):
@staticmethod
def apply(ctx, a, axes=None):
ctx["axes"] = axes
return np.transpose(a, axes)
@staticmethod
def backward(ctx, grad_output):
# Transpose the gradient with inverse permutation
axes = ctx["axes"]
if axes is None:
# If axes is None, the transpose reverses the dimensions
return np.transpose(grad_output), None
# Get inverse permutation
n = len(axes)
inverse_axes = np.zeros(n, dtype=int)
for i, ax in enumerate(axes):
inverse_axes[ax] = i
return np.transpose(grad_output, tuple(inverse_axes)), None
# Register function temporarily
function_registry = {}
function_registry["transpose"] = Transpose
# Call the function
result = Transpose.forward(x, axes)
return result
[docs]
def einsum(equation: str, *tensors: Tensor) -> Tensor:
"""
Einstein summation for tensors.
Args:
equation: Summation equation in Einstein notation
*tensors: Input tensors
Returns:
Result tensor
"""
# Check if gradients are needed
requires_grad = any(t.requires_grad for t in tensors)
if not requires_grad:
tensor_data = [t.data for t in tensors]
return Tensor(np.einsum(equation, *tensor_data))
# Define operation for einsum
class Einsum(Function):
@staticmethod
def apply(ctx, equation, *inputs):
ctx["equation"] = equation
ctx["input_shapes"] = [inp.shape for inp in inputs]
return np.einsum(equation, *inputs)
@staticmethod
def backward(ctx, grad_output):
equation = ctx["equation"]
input_shapes = ctx["input_shapes"]
# Parse the equation
input_part, output_part = equation.split("->")
input_subscripts = input_part.split(",")
grads = []
for i, subscript in enumerate(input_subscripts):
# Create gradient equation
grad_equation = f"{output_part},{','.join([s for j, s in enumerate(input_subscripts) if j != i])}->{''.join(subscript)}"
# Determine the order of operands for the gradient computation
operands = [grad_output]
for j, t in enumerate(inputs):
if j != i:
operands.append(t)
# Compute gradient
grads.append(
np.einsum(grad_equation, *operands).reshape(input_shapes[i])
)
# Add None for equation parameter
return (None,) + tuple(grads)
# Get tensor data
tensor_data = [t.data for t in tensors]
# Call the function
result = Einsum.forward(equation, *tensor_data)
# Create output tensor
output = Tensor(result, requires_grad=requires_grad)
if requires_grad:
# Store references to input tensors
output._prev = set(t for t in tensors if t.requires_grad)
# Define backward function
def backward_fn():
if output.grad is None:
return
# Compute input gradients
input_grads = Einsum.backward(
{"equation": equation, "input_shapes": [t.data.shape for t in tensors]},
output.grad,
)
# Skip the first element (None for equation parameter)
input_grads = input_grads[1:]
# Update gradients of input tensors
for t, input_grad in zip(tensors, input_grads):
if t.requires_grad and input_grad is not None:
t.grad = input_grad if t.grad is None else t.grad + input_grad
# Set backward function
output.backward_fn = backward_fn
return output
# Activation Functions
[docs]
def sigmoid(x: Tensor) -> Tensor:
"""
Apply sigmoid activation: 1 / (1 + exp(-x)).
Args:
x: Input tensor
Returns:
Output tensor
"""
# Define sigmoid function
class Sigmoid(Function):
@staticmethod
def apply(ctx, a):
result = 1.0 / (1.0 + np.exp(-a))
ctx["output"] = result
return result
@staticmethod
def backward(ctx, grad_output):
output = ctx["output"]
return (grad_output * output * (1 - output),)
# Call the function
return Sigmoid.forward(x)
[docs]
def softmax(x: Tensor, axis: int = -1) -> Tensor:
"""
Apply softmax activation along specified axis.
Args:
x: Input tensor
axis: Axis along which to apply softmax
Returns:
Output tensor
"""
# Define softmax function
class Softmax(Function):
@staticmethod
def apply(ctx, a, axis):
# For numerical stability, subtract max
a_max = np.max(a, axis=axis, keepdims=True)
exp_a = np.exp(a - a_max)
result = exp_a / np.sum(exp_a, axis=axis, keepdims=True)
ctx["output"] = result
ctx["axis"] = axis
return result
@staticmethod
def backward(ctx, grad_output):
output = ctx["output"]
axis = ctx["axis"]
# Compute Jacobian-vector product efficiently
# For each position i:
# dL/dy_i = Σ_j dL/dx_j * dx_j/dy_i
# dx_j/dy_i = y_i * (δ_ij - y_j)
# Sum(dL/dx_i * y_i)
S = np.sum(grad_output * output, axis=axis, keepdims=True)
# dL/dy_i = y_i * (dL/dx_i - S)
grad_input = output * (grad_output - S)
return grad_input, None
# Call the function
return Softmax.forward(x, axis)
[docs]
def tanh(x: Tensor) -> Tensor:
"""
Apply hyperbolic tangent activation: (exp(x) - exp(-x)) / (exp(x) + exp(-x)).
Args:
x: Input tensor
Returns:
Output tensor
"""
# Define tanh function
class Tanh(Function):
@staticmethod
def apply(ctx, a):
result = np.tanh(a)
ctx["output"] = result
return result
@staticmethod
def backward(ctx, grad_output):
output = ctx["output"]
return (grad_output * (1 - output * output),)
# Call the function
return Tanh.forward(x)
# Mathematical Operations
[docs]
def abs(x: Tensor) -> Tensor:
"""
Compute the absolute value of a tensor.
Args:
x: Input tensor
Returns:
Output tensor
"""
# Define abs function
class Abs(Function):
@staticmethod
def apply(ctx, a):
ctx["input"] = a
return np.abs(a)
@staticmethod
def backward(ctx, grad_output):
input_data = ctx["input"]
# Gradient is sign of input
return (grad_output * np.sign(input_data),)
# Call the function
return Abs.forward(x)
[docs]
def sqrt(x: Tensor) -> Tensor:
"""
Compute the square root of a tensor.
Args:
x: Input tensor
Returns:
Output tensor
"""
# Define sqrt function
class Sqrt(Function):
@staticmethod
def apply(ctx, a):
result = np.sqrt(a)
ctx["output"] = result
return result
@staticmethod
def backward(ctx, grad_output):
output = ctx["output"]
return (grad_output / (2 * output),)
# Call the function
return Sqrt.forward(x)
[docs]
def sin(x: Tensor) -> Tensor:
"""
Compute the sine of a tensor (in radians).
Args:
x: Input tensor
Returns:
Output tensor
"""
# Define sin function
class Sin(Function):
@staticmethod
def apply(ctx, a):
ctx["input"] = a
return np.sin(a)
@staticmethod
def backward(ctx, grad_output):
input_data = ctx["input"]
return (grad_output * np.cos(input_data),)
# Call the function
return Sin.forward(x)
[docs]
def cos(x: Tensor) -> Tensor:
"""
Compute the cosine of a tensor (in radians).
Args:
x: Input tensor
Returns:
Output tensor
"""
# Define cos function
class Cos(Function):
@staticmethod
def apply(ctx, a):
ctx["input"] = a
return np.cos(a)
@staticmethod
def backward(ctx, grad_output):
input_data = ctx["input"]
return (grad_output * (-np.sin(input_data)),)
# Call the function
return Cos.forward(x)
# Tensor Creation and Manipulation
[docs]
def zeros(shape: Tuple[int, ...], requires_grad: bool = False) -> Tensor:
"""
Create a tensor filled with zeros.
Args:
shape: Shape of the tensor
requires_grad: Whether the tensor requires gradient computation
Returns:
Tensor filled with zeros
"""
return Tensor(np.zeros(shape), requires_grad=requires_grad)
[docs]
def ones(shape: Tuple[int, ...], requires_grad: bool = False) -> Tensor:
"""
Create a tensor filled with ones.
Args:
shape: Shape of the tensor
requires_grad: Whether the tensor requires gradient computation
Returns:
Tensor filled with ones
"""
return Tensor(np.ones(shape), requires_grad=requires_grad)
[docs]
def randn(shape: Tuple[int, ...], requires_grad: bool = False) -> Tensor:
"""
Create a tensor filled with random values from a standard normal distribution.
Args:
shape: Shape of the tensor
requires_grad: Whether the tensor requires gradient computation
Returns:
Tensor filled with random values
"""
return Tensor(np.random.randn(*shape), requires_grad=requires_grad)
[docs]
def concatenate(tensors: List[Tensor], axis: int = 0) -> Tensor:
"""
Concatenate tensors along specified axis.
Args:
tensors: List of tensors to concatenate
axis: Axis along which to concatenate
Returns:
Concatenated tensor
"""
# Check if gradients are needed
requires_grad = any(t.requires_grad for t in tensors)
if not requires_grad:
tensor_data = [t.data for t in tensors]
return Tensor(np.concatenate(tensor_data, axis=axis))
# Define operation for concatenation
class Concatenate(Function):
@staticmethod
def apply(ctx, inputs, axis):
shapes = [inp.shape for inp in inputs]
ctx["shapes"] = shapes
ctx["axis"] = axis
return np.concatenate(inputs, axis=axis)
@staticmethod
def backward(ctx, grad_output):
shapes = ctx["shapes"]
axis = ctx["axis"]
# Compute indices for slicing
indices = [0]
for shape in shapes:
indices.append(indices[-1] + shape[axis])
# Slice gradient for each input
grads = []
for i in range(len(shapes)):
shape = shapes[i]
start, end = indices[i], indices[i + 1]
# Create slice object
slices = [slice(None)] * grad_output.ndim
slices[axis] = slice(start, end)
grads.append(grad_output[tuple(slices)])
# Add None for concatenation parameters
return grads + [None]
# Get tensor data
tensor_data = [t.data for t in tensors]
# Create output tensor
result = Concatenate.forward(tensor_data, axis)
output = Tensor(result, requires_grad=requires_grad)
if requires_grad:
# Store references to input tensors
output._prev = set(t for t in tensors if t.requires_grad)
# Define backward function
def backward_fn():
if output.grad is None:
return
# Compute input gradients
grads = Concatenate.backward(
{"shapes": [t.data.shape for t in tensors], "axis": axis}, output.grad
)
# Skip the last element (None for axis parameter)
grads = grads[:-1]
# Update gradients of input tensors
for t, grad in zip(tensors, grads):
if t.requires_grad:
t.grad = grad if t.grad is None else t.grad + grad
# Set backward function
output.backward_fn = backward_fn
return output
# Statistical and Reduction Operations
[docs]
def std(x: Tensor, axis: Optional[int] = None, keepdims: bool = False) -> Tensor:
"""
Compute the standard deviation of a tensor.
Args:
x: Input tensor
axis: Axis along which to compute standard deviation
keepdims: Whether to keep the reduced dimensions
Returns:
Standard deviation tensor
"""
# Use variance then square root
return sqrt(var(x, axis, keepdims))
[docs]
def var(x: Tensor, axis: Optional[int] = None, keepdims: bool = False) -> Tensor:
"""
Compute the variance of a tensor.
Args:
x: Input tensor
axis: Axis along which to compute variance
keepdims: Whether to keep the reduced dimensions
Returns:
Variance tensor
"""
# Define var function
class Var(Function):
@staticmethod
def apply(ctx, a, axis, keepdims):
mean = np.mean(a, axis=axis, keepdims=True)
ctx["input"] = a
ctx["mean"] = mean
ctx["axis"] = axis
ctx["keepdims"] = keepdims
# Sum of squared deviations
result = np.mean((a - mean) ** 2, axis=axis, keepdims=keepdims)
return result
@staticmethod
def backward(ctx, grad_output):
input_data = ctx["input"]
mean = ctx["mean"]
axis = ctx["axis"]
keepdims = ctx["keepdims"]
# Reshape grad_output for broadcasting if needed
if not keepdims and axis is not None:
grad_output = np.expand_dims(grad_output, axis=axis)
# Compute gradient: 2 * (x - mean) / n
n = input_data.shape[axis] if axis is not None else input_data.size
grad_input = 2 * (input_data - mean) * grad_output / n
return grad_input, None, None
# Call the function
return Var.forward(x, axis, keepdims)
[docs]
def argmax(x: Tensor, axis: Optional[int] = None) -> Tensor:
"""
Return the indices of the maximum values along specified axis.
Args:
x: Input tensor
axis: Axis along which to find maximum values
Returns:
Indices tensor
"""
# This operation is not differentiable, so we don't need backward
return Tensor(np.argmax(x.data, axis=axis))
[docs]
def logsumexp(x: Tensor, axis: Optional[int] = None, keepdims: bool = False) -> Tensor:
"""
Compute the log of the sum of exponentials of input elements.
This is a numerically stable version of log(sum(exp(x))).
Args:
x: Input tensor
axis: Axis along which to perform the operation
keepdims: Whether to keep the reduced dimensions
Returns:
Result tensor
"""
# Define logsumexp function
class LogSumExp(Function):
@staticmethod
def apply(ctx, a, axis, keepdims):
a_max = np.max(a, axis=axis, keepdims=True)
ctx["a_max"] = a_max
ctx["input"] = a
ctx["axis"] = axis
ctx["keepdims"] = keepdims
exp_a = np.exp(a - a_max)
sum_exp = np.sum(exp_a, axis=axis, keepdims=keepdims)
result = (
np.log(sum_exp) + a_max.reshape(sum_exp.shape)
if not keepdims
else np.log(sum_exp) + a_max
)
return result
@staticmethod
def backward(ctx, grad_output):
input_data = ctx["input"]
a_max = ctx["a_max"]
axis = ctx["axis"]
keepdims = ctx["keepdims"]
# Reshape grad_output for broadcasting if needed
if not keepdims and axis is not None:
grad_output = np.expand_dims(grad_output, axis=axis)
# Compute softmax
exp_a = np.exp(input_data - a_max)
sum_exp = np.sum(exp_a, axis=axis, keepdims=True)
softmax = exp_a / sum_exp
# Gradient is grad_output * softmax
grad_input = grad_output * softmax
return grad_input, None, None
# Call the function
return LogSumExp.forward(x, axis, keepdims)
# Loss Functions
[docs]
def mse_loss(predictions: Tensor, targets: Tensor, reduction: str = "mean") -> Tensor:
"""
Compute Mean Squared Error loss.
Args:
predictions: Predicted values
targets: Target values
reduction: Reduction method ('mean' or 'sum')
Returns:
Loss tensor
"""
diff = predictions - targets
squared_diff = diff * diff
if reduction == "mean":
return squared_diff.mean()
elif reduction == "sum":
return squared_diff.sum()
else:
return squared_diff
[docs]
def binary_cross_entropy(
predictions: Tensor, targets: Tensor, reduction: str = "mean"
) -> Tensor:
"""
Compute Binary Cross Entropy loss.
Args:
predictions: Predicted probabilities (between 0 and 1)
targets: Binary target values (0 or 1)
reduction: Reduction method ('mean' or 'sum')
Returns:
Loss tensor
"""
# Add small epsilon for numerical stability
epsilon = 1e-12
predictions = predictions.data.clip(epsilon, 1 - epsilon)
# Define BCE function
class BCE(Function):
@staticmethod
def apply(ctx, pred, target, reduction):
ctx["pred"] = pred
ctx["target"] = target
ctx["reduction"] = reduction
# Compute BCE: -t * log(p) - (1 - t) * log(1 - p)
loss = -target * np.log(pred) - (1 - target) * np.log(1 - pred)
if reduction == "mean":
return np.mean(loss)
elif reduction == "sum":
return np.sum(loss)
else:
return loss
@staticmethod
def backward(ctx, grad_output):
pred = ctx["pred"]
target = ctx["target"]
reduction = ctx["reduction"]
# Gradient: (p - t) / (p * (1 - p))
grad = (pred - target) / (pred * (1 - pred))
if reduction == "mean":
grad = grad * grad_output / pred.size
elif reduction == "sum":
grad = grad * grad_output
else:
# Reshape grad_output for broadcasting if needed
if grad_output.ndim < grad.ndim:
grad_output = np.expand_dims(
grad_output, axis=tuple(range(1, grad.ndim))
)
grad = grad * grad_output
return grad, None, None
# Call the function
return BCE.forward(predictions.data, targets.data, reduction)
[docs]
def softmax_cross_entropy(
logits: Tensor, targets: Tensor, reduction: str = "mean"
) -> Tensor:
"""
Compute Softmax Cross Entropy loss.
Args:
logits: Unnormalized predictions (logits)
targets: Target class indices or one-hot encoded targets
reduction: Reduction method ('mean' or 'sum')
Returns:
Loss tensor
"""
# Define SCE function
class SoftmaxCrossEntropy(Function):
@staticmethod
def apply(ctx, logits, targets, reduction):
# For numerical stability
logits = logits - np.max(logits, axis=1, keepdims=True)
ctx["logits"] = logits
# Convert targets to one-hot if they are class indices
if targets.ndim == 1:
batch_size, num_classes = logits.shape
one_hot = np.zeros((batch_size, num_classes))
one_hot[np.arange(batch_size), targets] = 1
targets = one_hot
ctx["targets"] = targets
ctx["reduction"] = reduction
# Compute softmax probabilities
exp_logits = np.exp(logits)
probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
# Compute cross entropy: -sum(targets * log(probs))
log_probs = np.log(probs)
loss = -np.sum(targets * log_probs, axis=1)
if reduction == "mean":
return np.mean(loss)
elif reduction == "sum":
return np.sum(loss)
else:
return loss
@staticmethod
def backward(ctx, grad_output):
logits = ctx["logits"]
targets = ctx["targets"]
reduction = ctx["reduction"]
# Softmax probabilities
exp_logits = np.exp(logits)
probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
# Gradient: p - t
batch_size = logits.shape[0]
grad = probs - targets
if reduction == "mean":
grad = grad * grad_output / batch_size
elif reduction == "sum":
grad = grad * grad_output
else:
# Reshape grad_output for broadcasting
grad = grad * grad_output.reshape(-1, 1)
return grad, None, None
# Call the function
return SoftmaxCrossEntropy.forward(logits.data, targets.data, reduction)
# Distance and Similarity Functions
[docs]
def cosine_similarity(
x1: Tensor, x2: Tensor, dim: int = 1, eps: float = 1e-8
) -> Tensor:
"""
Compute cosine similarity between tensors along specified dimension.
Args:
x1: First tensor
x2: Second tensor
dim: Dimension along which to compute similarity
eps: Small value to avoid division by zero
Returns:
Similarity tensor
"""
# Define cosine similarity function
class CosineSimilarity(Function):
@staticmethod
def apply(ctx, a, b, dim, eps):
ctx["a"] = a
ctx["b"] = b
ctx["dim"] = dim
ctx["eps"] = eps
# Compute the dot product and norms
dot_product = np.sum(a * b, axis=dim)
a_norm = np.sqrt(np.sum(a * a, axis=dim))
b_norm = np.sqrt(np.sum(b * b, axis=dim))
# Compute cosine similarity
similarity = dot_product / np.maximum(a_norm * b_norm, eps)
# Store intermediate results for backward pass
ctx["a_norm"] = a_norm
ctx["b_norm"] = b_norm
ctx["similarity"] = similarity
return similarity
@staticmethod
def backward(ctx, grad_output):
a = ctx["a"]
b = ctx["b"]
dim = ctx["dim"]
eps = ctx["eps"]
a_norm = ctx["a_norm"]
b_norm = ctx["b_norm"]
similarity = ctx["similarity"]
# Reshape for broadcasting
reshape_dim = lambda x: np.expand_dims(x, dim)
grad_output_reshaped = reshape_dim(grad_output)
a_norm_reshaped = reshape_dim(a_norm)
b_norm_reshaped = reshape_dim(b_norm)
similarity_reshaped = reshape_dim(similarity)
# Compute gradients
ab_norm = np.maximum(a_norm * b_norm, eps)
# Gradient for a
grad_a = grad_output_reshaped * (
b / ab_norm - similarity_reshaped * a / a_norm_reshaped**2
)
# Gradient for b
grad_b = grad_output_reshaped * (
a / ab_norm - similarity_reshaped * b / b_norm_reshaped**2
)
return grad_a, grad_b, None, None
# Call the function
return CosineSimilarity.forward(x1.data, x2.data, dim, eps)
[docs]
def pairwise_distance(
x1: Tensor, x2: Tensor, p: float = 2.0, eps: float = 1e-6
) -> Tensor:
"""
Compute pairwise distance between tensors.
Args:
x1: First tensor (batch_size x D)
x2: Second tensor (batch_size x D)
p: p-norm to use for distance calculation
eps: Small value to avoid division by zero
Returns:
Distance tensor (batch_size)
"""
# Define pairwise distance function
class PairwiseDistance(Function):
@staticmethod
def apply(ctx, a, b, p, eps):
ctx["a"] = a
ctx["b"] = b
ctx["p"] = p
ctx["eps"] = eps
# Compute p-norm
diff = a - b
abs_diff = np.abs(diff)
if p == 1:
# L1 distance
return np.sum(abs_diff, axis=1)
elif p == 2:
# L2 distance
return np.sqrt(np.sum(diff * diff, axis=1) + eps)
else:
# General p-norm
p_diff = abs_diff**p
return np.sum(p_diff, axis=1) ** (1 / p)
@staticmethod
def backward(ctx, grad_output):
a = ctx["a"]
b = ctx["b"]
p = ctx["p"]
eps = ctx["eps"]
# Compute gradient
diff = a - b
abs_diff = np.abs(diff)
if p == 1:
# L1 gradient: sign(a - b)
grad = np.sign(diff)
elif p == 2:
# L2 gradient: (a - b) / ||a - b||_2
norm = np.sqrt(np.sum(diff * diff, axis=1, keepdims=True) + eps)
grad = diff / norm
else:
# General p-norm gradient
norm = np.sum(abs_diff**p, axis=1, keepdims=True) ** (1 / p - 1)
grad = p * (abs_diff ** (p - 1)) * np.sign(diff) * norm
# Multiply by upstream gradient
grad_output_reshaped = np.expand_dims(grad_output, 1)
grad_a = grad * grad_output_reshaped
grad_b = -grad * grad_output_reshaped
return grad_a, grad_b, None, None
# Call the function
return PairwiseDistance.forward(x1.data, x2.data, p, eps)