Source code for fit.nn.modules.transformer
"""
Transformer Blocks & Complete Transformer Architecture
This module implements the complete Transformer architecture including:
- Transformer Encoder/Decoder Blocks
- Positional Encoding
- Layer Normalization
- Feed-Forward Networks
- Complete Transformer models
Built on top of the attention mechanisms, this creates the full power
of modern Transformer architectures.
"""
import numpy as np
import math
from typing import Optional, Tuple, List
from fit.core.tensor import Tensor
from fit.nn.modules.base import Layer
from fit.nn.modules.linear import Linear
from fit.nn.modules.activation import ReLU, GELU, Dropout
from fit.nn.modules.normalization import LayerNorm
from fit.nn.modules.attention import (
MultiHeadAttention,
SelfAttention,
CrossAttention,
CausalSelfAttention,
)
[docs]
class PositionalEncoding(Layer):
"""
Positional Encoding: adds position information to embeddings.
Since Transformers have no inherent notion of sequence order,
we add sinusoidal position encodings to give the model
information about token positions.
"""
[docs]
def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):
"""
Initialize positional encoding.
Args:
d_model: Model dimension
max_len: Maximum sequence length to precompute
dropout: Dropout probability
"""
super().__init__()
self.d_model = d_model
self.dropout = Dropout(dropout) if dropout > 0 else None
# Precompute positional encodings
self.pe = self._create_positional_encoding(max_len, d_model)
def _create_positional_encoding(self, max_len: int, d_model: int) -> np.ndarray:
"""Create sinusoidal positional encodings."""
pe = np.zeros((max_len, d_model))
position = np.arange(0, max_len, dtype=np.float32).reshape(-1, 1)
# Create div_term for the sinusoidal pattern
div_term = np.exp(np.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
# Apply sin to even indices
pe[:, 0::2] = np.sin(position * div_term)
# Apply cos to odd indices
if d_model % 2 == 1:
pe[:, 1::2] = np.cos(position * div_term[:-1])
else:
pe[:, 1::2] = np.cos(position * div_term)
return pe
[docs]
def forward(self, x: Tensor) -> Tensor:
"""
Add positional encoding to input embeddings.
Args:
x: Input embeddings (batch_size, seq_len, d_model)
Returns:
Embeddings with positional encoding added
"""
batch_size, seq_len, d_model = x.data.shape
# Get positional encodings for this sequence length
pos_encoding = self.pe[:seq_len, :d_model]
# Add positional encoding (broadcasting over batch dimension)
pos_tensor = Tensor(pos_encoding, requires_grad=False)
result = x + pos_tensor
# Apply dropout if specified
if self.dropout is not None:
result = self.dropout(result)
return result
[docs]
class GELU(Layer):
"""
Gaussian Error Linear Unit: smooth activation function used in Transformers.
GELU(x) = x * Φ(x) where Φ is the cumulative distribution function
of the standard normal distribution.
"""
[docs]
def forward(self, x: Tensor) -> Tensor:
"""Apply GELU activation."""
# Approximation: GELU(x) ≈ 0.5 * x * (1 + tanh(√(2/π) * (x + 0.044715 * x³)))
x_cubed = x.data * x.data * x.data
tanh_input = math.sqrt(2.0 / math.pi) * (x.data + 0.044715 * x_cubed)
tanh_output = np.tanh(tanh_input)
gelu_output = 0.5 * x.data * (1 + tanh_output)
output = Tensor(gelu_output, requires_grad=x.requires_grad)
def _backward():
if output.grad is None or not x.requires_grad:
return
# GELU derivative (approximate)
sech2 = 1 - tanh_output * tanh_output # sech²(x) = 1 - tanh²(x)
derivative = 0.5 * (1 + tanh_output) + 0.5 * x.data * sech2 * math.sqrt(
2.0 / math.pi
) * (1 + 3 * 0.044715 * x.data * x.data)
x_grad = output.grad * derivative
x.grad = x_grad if x.grad is None else x.grad + x_grad
output._backward = _backward
output._prev = {x}
return output
[docs]
class FeedForward(Layer):
"""
Position-wise Feed-Forward Network: applies same FFN to each position.
FFN(x) = max(0, xW₁ + b₁)W₂ + b₂
This adds non-linearity and allows the model to process information
within each position independently.
"""
[docs]
def __init__(
self, d_model: int, d_ff: int, activation: str = "gelu", dropout: float = 0.1
):
"""
Initialize feed-forward network.
Args:
d_model: Model dimension
d_ff: Feed-forward dimension (usually 4 * d_model)
activation: Activation function ('relu', 'gelu')
dropout: Dropout probability
"""
super().__init__()
self.linear1 = Linear(d_model, d_ff)
self.linear2 = Linear(d_ff, d_model)
if activation == "relu":
self.activation = ReLU()
elif activation == "gelu":
self.activation = GELU()
else:
raise ValueError(f"Unknown activation: {activation}")
self.dropout = Dropout(dropout) if dropout > 0 else None
# Add as children
self.add_child(self.linear1)
self.add_child(self.linear2)
self.add_child(self.activation)
if self.dropout:
self.add_child(self.dropout)
[docs]
def forward(self, x: Tensor) -> Tensor:
"""
Apply feed-forward network.
Args:
x: Input tensor (batch_size, seq_len, d_model)
Returns:
Output tensor (batch_size, seq_len, d_model)
"""
# First linear layer
out = self.linear1(x)
# Activation
out = self.activation(out)
# Dropout (if training)
if self.dropout is not None:
out = self.dropout(out)
# Second linear layer
out = self.linear2(out)
return out
[docs]
class TransformerEncoderBlock(Layer):
"""
Transformer Encoder Block: the core building block of the Transformer encoder.
Structure:
1. Multi-Head Self-Attention
2. Residual connection + Layer Norm
3. Feed-Forward Network
4. Residual connection + Layer Norm
"""
[docs]
def __init__(
self,
d_model: int,
num_heads: int,
d_ff: int,
dropout: float = 0.1,
activation: str = "gelu",
):
"""
Initialize transformer encoder block.
Args:
d_model: Model dimension
num_heads: Number of attention heads
d_ff: Feed-forward dimension
dropout: Dropout probability
activation: Activation function in FFN
"""
super().__init__()
# Multi-head self-attention
self.self_attention = MultiHeadAttention(d_model, num_heads, dropout)
# Feed-forward network
self.feed_forward = FeedForward(d_model, d_ff, activation, dropout)
# Layer normalization layers
self.norm1 = LayerNorm(d_model)
self.norm2 = LayerNorm(d_model)
# Dropout for residual connections
self.dropout = Dropout(dropout) if dropout > 0 else None
# Add as children
self.add_child(self.self_attention)
self.add_child(self.feed_forward)
self.add_child(self.norm1)
self.add_child(self.norm2)
if self.dropout:
self.add_child(self.dropout)
[docs]
def forward(self, x: Tensor, mask: Optional[Tensor] = None) -> Tensor:
"""
Forward pass through encoder block.
Args:
x: Input tensor (batch_size, seq_len, d_model)
mask: Optional attention mask
Returns:
Output tensor (batch_size, seq_len, d_model)
"""
# Self-attention with residual connection and layer norm
attn_output = self.self_attention(x, x, x, mask=mask)
if self.dropout is not None:
attn_output = self.dropout(attn_output)
# First residual connection and layer norm
x = self.norm1(x + attn_output)
# Feed-forward with residual connection and layer norm
ff_output = self.feed_forward(x)
if self.dropout is not None:
ff_output = self.dropout(ff_output)
# Second residual connection and layer norm
output = self.norm2(x + ff_output)
return output
[docs]
class TransformerDecoderBlock(Layer):
"""
Transformer Decoder Block: core building block of the Transformer decoder.
Structure:
1. Masked Multi-Head Self-Attention
2. Residual connection + Layer Norm
3. Multi-Head Cross-Attention (encoder-decoder attention)
4. Residual connection + Layer Norm
5. Feed-Forward Network
6. Residual connection + Layer Norm
"""
[docs]
def __init__(
self,
d_model: int,
num_heads: int,
d_ff: int,
dropout: float = 0.1,
activation: str = "gelu",
):
"""
Initialize transformer decoder block.
Args:
d_model: Model dimension
num_heads: Number of attention heads
d_ff: Feed-forward dimension
dropout: Dropout probability
activation: Activation function in FFN
"""
super().__init__()
# Masked self-attention
self.self_attention = CausalSelfAttention(d_model, num_heads, dropout)
# Cross-attention (encoder-decoder attention)
self.cross_attention = CrossAttention(d_model, num_heads, dropout)
# Feed-forward network
self.feed_forward = FeedForward(d_model, d_ff, activation, dropout)
# Layer normalization layers
self.norm1 = LayerNorm(d_model)
self.norm2 = LayerNorm(d_model)
self.norm3 = LayerNorm(d_model)
# Dropout for residual connections
self.dropout = Dropout(dropout) if dropout > 0 else None
# Add as children
self.add_child(self.self_attention)
self.add_child(self.cross_attention)
self.add_child(self.feed_forward)
self.add_child(self.norm1)
self.add_child(self.norm2)
self.add_child(self.norm3)
if self.dropout:
self.add_child(self.dropout)
[docs]
def forward(
self,
x: Tensor,
encoder_output: Tensor,
self_attn_mask: Optional[Tensor] = None,
cross_attn_mask: Optional[Tensor] = None,
) -> Tensor:
"""
Forward pass through decoder block.
Args:
x: Decoder input (batch_size, target_seq_len, d_model)
encoder_output: Encoder output (batch_size, source_seq_len, d_model)
self_attn_mask: Mask for self-attention
cross_attn_mask: Mask for cross-attention
Returns:
Output tensor (batch_size, target_seq_len, d_model)
"""
# Masked self-attention
self_attn_output = self.self_attention(x)
if self.dropout is not None:
self_attn_output = self.dropout(self_attn_output)
# First residual connection and layer norm
x = self.norm1(x + self_attn_output)
# Cross-attention
cross_attn_output = self.cross_attention(
query=x, key_value=encoder_output, mask=cross_attn_mask
)
if self.dropout is not None:
cross_attn_output = self.dropout(cross_attn_output)
# Second residual connection and layer norm
x = self.norm2(x + cross_attn_output)
# Feed-forward
ff_output = self.feed_forward(x)
if self.dropout is not None:
ff_output = self.dropout(ff_output)
# Third residual connection and layer norm
output = self.norm3(x + ff_output)
return output
[docs]
class TransformerEncoder(Layer):
"""
Complete Transformer Encoder: stack of encoder blocks with embeddings.
"""
[docs]
def __init__(
self,
vocab_size: int,
d_model: int,
num_heads: int,
num_layers: int,
d_ff: int,
max_len: int = 5000,
dropout: float = 0.1,
activation: str = "gelu",
):
"""
Initialize transformer encoder.
Args:
vocab_size: Size of vocabulary
d_model: Model dimension
num_heads: Number of attention heads
num_layers: Number of encoder layers
d_ff: Feed-forward dimension
max_len: Maximum sequence length
dropout: Dropout probability
activation: Activation function
"""
super().__init__()
# Token embeddings
self.embedding = Embedding(vocab_size, d_model)
# Positional encoding
self.pos_encoding = PositionalEncoding(d_model, max_len, dropout)
# Stack of encoder blocks
self.layers = []
for _ in range(num_layers):
layer = TransformerEncoderBlock(
d_model, num_heads, d_ff, dropout, activation
)
self.layers.append(layer)
self.add_child(layer)
# Add other components as children
self.add_child(self.embedding)
self.add_child(self.pos_encoding)
[docs]
def forward(self, x: Tensor, mask: Optional[Tensor] = None) -> Tensor:
"""
Forward pass through transformer encoder.
Args:
x: Input token indices (batch_size, seq_len)
mask: Optional attention mask
Returns:
Encoded representations (batch_size, seq_len, d_model)
"""
# Token embedding + positional encoding
x = self.embedding(x)
x = self.pos_encoding(x)
# Pass through all encoder layers
for layer in self.layers:
x = layer(x, mask=mask)
return x
[docs]
class TransformerDecoder(Layer):
"""
Complete Transformer Decoder: stack of decoder blocks with embeddings.
"""
[docs]
def __init__(
self,
vocab_size: int,
d_model: int,
num_heads: int,
num_layers: int,
d_ff: int,
max_len: int = 5000,
dropout: float = 0.1,
activation: str = "gelu",
):
"""
Initialize transformer decoder.
Args:
vocab_size: Size of vocabulary
d_model: Model dimension
num_heads: Number of attention heads
num_layers: Number of decoder layers
d_ff: Feed-forward dimension
max_len: Maximum sequence length
dropout: Dropout probability
activation: Activation function
"""
super().__init__()
# Token embeddings
self.embedding = Embedding(vocab_size, d_model)
# Positional encoding
self.pos_encoding = PositionalEncoding(d_model, max_len, dropout)
# Stack of decoder blocks
self.layers = []
for _ in range(num_layers):
layer = TransformerDecoderBlock(
d_model, num_heads, d_ff, dropout, activation
)
self.layers.append(layer)
self.add_child(layer)
# Add other components as children
self.add_child(self.embedding)
self.add_child(self.pos_encoding)
[docs]
def forward(
self,
x: Tensor,
encoder_output: Tensor,
self_attn_mask: Optional[Tensor] = None,
cross_attn_mask: Optional[Tensor] = None,
) -> Tensor:
"""
Forward pass through transformer decoder.
Args:
x: Target token indices (batch_size, target_seq_len)
encoder_output: Encoder output (batch_size, source_seq_len, d_model)
self_attn_mask: Mask for self-attention
cross_attn_mask: Mask for cross-attention
Returns:
Decoded representations (batch_size, target_seq_len, d_model)
"""
# Token embedding + positional encoding
x = self.embedding(x)
x = self.pos_encoding(x)
# Pass through all decoder layers
for layer in self.layers:
x = layer(x, encoder_output, self_attn_mask, cross_attn_mask)
return x
[docs]
class Embedding(Layer):
"""
Token embedding layer that converts token indices to dense vectors.
"""
[docs]
def __init__(self, vocab_size: int, d_model: int):
"""
Initialize embedding layer.
Args:
vocab_size: Size of vocabulary
d_model: Embedding dimension
"""
super().__init__()
self.vocab_size = vocab_size
self.d_model = d_model
# Initialize embedding matrix
# Use scaled random initialization
scale = math.sqrt(1.0 / d_model)
self.weight = Tensor(
np.random.normal(0, scale, (vocab_size, d_model)), requires_grad=True
)
self.add_parameter(self.weight)
[docs]
def forward(self, x: Tensor) -> Tensor:
"""
Look up embeddings for input tokens.
Args:
x: Token indices (batch_size, seq_len)
Returns:
Embeddings (batch_size, seq_len, d_model)
"""
# Convert token indices to embeddings
batch_size, seq_len = x.data.shape
indices = x.data.astype(int)
# Look up embeddings
embeddings = self.weight.data[indices] # (batch_size, seq_len, d_model)
# Scale embeddings by sqrt(d_model) as in original paper
embeddings = embeddings * math.sqrt(self.d_model)
output = Tensor(embeddings, requires_grad=self.weight.requires_grad)
def _backward():
if output.grad is None or not self.weight.requires_grad:
return
# Gradient w.r.t. embedding weights
weight_grad = np.zeros_like(self.weight.data)
# Accumulate gradients for each token
for i in range(batch_size):
for j in range(seq_len):
token_id = indices[i, j]
weight_grad[token_id] += output.grad[i, j] * math.sqrt(self.d_model)
self.weight.grad = (
weight_grad
if self.weight.grad is None
else self.weight.grad + weight_grad
)
output._backward = _backward
output._prev = {self.weight}
return output
# Complete Transformer model for sequence-to-sequence tasks
[docs]
class Transformer(Layer):
"""
Complete Transformer model for sequence-to-sequence tasks.
This is the full Transformer as described in "Attention Is All You Need".
"""
[docs]
def __init__(
self,
src_vocab_size: int,
tgt_vocab_size: int,
d_model: int = 512,
num_heads: int = 8,
num_encoder_layers: int = 6,
num_decoder_layers: int = 6,
d_ff: int = 2048,
max_len: int = 5000,
dropout: float = 0.1,
activation: str = "gelu",
):
"""
Initialize complete Transformer model.
Args:
src_vocab_size: Source vocabulary size
tgt_vocab_size: Target vocabulary size
d_model: Model dimension
num_heads: Number of attention heads
num_encoder_layers: Number of encoder layers
num_decoder_layers: Number of decoder layers
d_ff: Feed-forward dimension
max_len: Maximum sequence length
dropout: Dropout probability
activation: Activation function
"""
super().__init__()
# Encoder
self.encoder = TransformerEncoder(
src_vocab_size,
d_model,
num_heads,
num_encoder_layers,
d_ff,
max_len,
dropout,
activation,
)
# Decoder
self.decoder = TransformerDecoder(
tgt_vocab_size,
d_model,
num_heads,
num_decoder_layers,
d_ff,
max_len,
dropout,
activation,
)
# Output projection
self.output_projection = Linear(d_model, tgt_vocab_size)
# Add as children
self.add_child(self.encoder)
self.add_child(self.decoder)
self.add_child(self.output_projection)
[docs]
def forward(
self,
src: Tensor,
tgt: Tensor,
src_mask: Optional[Tensor] = None,
tgt_mask: Optional[Tensor] = None,
) -> Tensor:
"""
Forward pass through complete Transformer.
Args:
src: Source sequences (batch_size, src_seq_len)
tgt: Target sequences (batch_size, tgt_seq_len)
src_mask: Source attention mask
tgt_mask: Target attention mask
Returns:
Output logits (batch_size, tgt_seq_len, tgt_vocab_size)
"""
# Encode source sequence
encoder_output = self.encoder(src, mask=src_mask)
# Decode target sequence
decoder_output = self.decoder(
tgt, encoder_output, self_attn_mask=tgt_mask, cross_attn_mask=src_mask
)
# Project to vocabulary
output = self.output_projection(decoder_output)
return output
[docs]
def demonstrate_transformer():
"""Demonstrate transformer components with simple examples."""
print("🤖 Transformer Architecture Demonstration")
print("=" * 60)
# Test parameters
vocab_size = 1000
d_model = 128
seq_len = 10
batch_size = 2
# Create sample data
np.random.seed(42)
src_tokens = Tensor(np.random.randint(0, vocab_size, (batch_size, seq_len)))
tgt_tokens = Tensor(np.random.randint(0, vocab_size, (batch_size, seq_len)))
print(f"Input shapes: src={src_tokens.data.shape}, tgt={tgt_tokens.data.shape}")
print()
# Test individual components
print("🧩 Testing Individual Components:")
print("-" * 40)
# 1. Positional Encoding
pos_enc = PositionalEncoding(d_model, max_len=100)
dummy_embeddings = Tensor(np.random.randn(batch_size, seq_len, d_model))
pos_output = pos_enc(dummy_embeddings)
print(
f"✅ Positional Encoding: {dummy_embeddings.data.shape} -> {pos_output.data.shape}"
)
# 2. Layer Normalization
layer_norm = LayerNorm(d_model)
norm_output = layer_norm(dummy_embeddings)
print(
f"✅ Layer Normalization: {dummy_embeddings.data.shape} -> {norm_output.data.shape}"
)
# 3. Feed Forward
ff = FeedForward(d_model, d_ff=512)
ff_output = ff(dummy_embeddings)
print(f"✅ Feed Forward: {dummy_embeddings.data.shape} -> {ff_output.data.shape}")
# 4. Transformer Block
encoder_block = TransformerEncoderBlock(d_model, num_heads=8, d_ff=512)
block_output = encoder_block(dummy_embeddings)
print(
f"✅ Encoder Block: {dummy_embeddings.data.shape} -> {block_output.data.shape}"
)
print()
# Test complete models
print("🏗️ Testing Complete Models:")
print("-" * 30)
# 1. Encoder only
encoder = TransformerEncoder(
vocab_size=vocab_size,
d_model=d_model,
num_heads=4,
num_layers=2,
d_ff=256,
max_len=100,
)
encoder_output = encoder(src_tokens)
print(
f"✅ Transformer Encoder: {src_tokens.data.shape} -> {encoder_output.data.shape}"
)
# 2. Complete Transformer
transformer = Transformer(
src_vocab_size=vocab_size,
tgt_vocab_size=vocab_size,
d_model=d_model,
num_heads=4,
num_encoder_layers=2,
num_decoder_layers=2,
d_ff=256,
max_len=100,
)
transformer_output = transformer(src_tokens, tgt_tokens)
print(
f"✅ Complete Transformer: {src_tokens.data.shape}, {tgt_tokens.data.shape} -> {transformer_output.data.shape}"
)
print()
# Test gradient flow
print("🔄 Testing Gradient Flow:")
print("-" * 25)
loss = transformer_output.sum()
loss.backward()
# Check if gradients exist
param_count = 0
grad_count = 0
for param in transformer.parameters():
param_count += 1
if param.grad is not None:
grad_count += 1
print(f"Parameters with gradients: {grad_count}/{param_count}")
print(f"✅ Gradient flow working correctly!")
print()
# Parameter count
total_params = sum(np.prod(p.data.shape) for p in transformer.parameters())
print(f"📊 Model Statistics:")
print(f"Total parameters: {total_params:,}")
print(f"Model size: ~{total_params * 4 / 1024 / 1024:.1f} MB (float32)")
if __name__ == "__main__":
demonstrate_transformer()