🎭 ADVANCED SENTIMENT ANALYZER WITH TRANSFORMERS

📝 INTRODUCTION

This is an enterprise-grade sentiment analysis system that leverages state-of-the-art transformer models (BERT, RoBERTa, XLNet) alongside traditional ML approaches. It includes advanced features like aspect-based sentiment analysis, emotion detection, sarcasm detection, and multi-language support.

✨ FEATURES

  1. Multi-Model Architecture: BERT, RoBERTa, XLNet, DistilBERT, ALBERT
  2. Aspect-Based Sentiment Analysis: Extract sentiment for specific aspects (price, quality, service)
  3. Emotion Detection: Detect 7 emotions (joy, sadness, anger, fear, surprise, disgust, love)
  4. Sarcasm Detection: Identify sarcastic reviews
  5. Multi-Language Support: 50+ languages using XLM-RoBERTa
  6. Explainable AI: SHAP and LIME explanations for predictions
  7. Active Learning: Continuously improve with human feedback
  8. Real-time Streaming: Kafka integration for real-time analysis
  9. Model Versioning: MLflow integration for experiment tracking
  10. Automated Retraining: CI/CD pipeline for model updates

📁 PROJECT STRUCTURE

advanced-sentiment-analyzer/
│
├── src/
│   ├── models/
│   │   ├── base_model.py
│   │   ├── bert_model.py
│   │   ├── roberta_model.py
│   │   ├── xlnet_model.py
│   │   ├── ensemble_model.py
│   │   ├── aspect_model.py
│   │   ├── emotion_model.py
│   │   └── sarcasm_model.py
│   │
│   ├── training/
│   │   ├── trainer.py
│   │   ├── data_preprocessor.py
│   │   ├── active_learner.py
│   │   └── hyperparameter_tuner.py
│   │
│   ├── inference/
│   │   ├── predictor.py
│   │   ├── batch_processor.py
│   │   └── streaming_processor.py
│   │
│   ├── explainability/
│   │   ├── shap_explainer.py
│   │   └── lime_explainer.py
│   │
│   └── utils/
│       ├── text_processor.py
│       ├── language_detector.py
│       └── metrics_calculator.py
│
├── api/
│   ├── rest_api.py
│   ├── grpc_api.py
│   └── websocket_api.py
│
├── data/
│   ├── raw/
│   ├── processed/
│   └── augmented/
│
├── config/
│   ├── model_config.yaml
│   └── training_config.yaml
│
├── experiments/
│   └── mlflow/
│
├── docker/
│   ├── Dockerfile.api
│   ├── Dockerfile.training
│   └── docker-compose.yml
│
├── kafka/
│   ├── producer.py
│   └── consumer.py
│
├── monitoring/
│   ├── prometheus_config.yml
│   └── grafana_dashboards/
│
├── tests/
│   ├── unit/
│   ├── integration/
│   └── performance/
│
├── requirements.txt
├── requirements-dev.txt
└── README.md

🚀 COMPLETE CODE

1. requirements.txt

# Core ML/DL
torch==2.0.1
transformers==4.35.0
datasets==2.14.0
accelerate==0.24.0
sentencepiece==0.1.99
protobuf==3.20.3
# Traditional ML
scikit-learn==1.3.0
xgboost==1.7.6
lightgbm==4.0.0
# Data Processing
pandas==2.0.3
numpy==1.24.3
polars==0.19.0
dask==2023.9.0
# Explainability
shap==0.42.1
lime==0.2.0.1
eli5==0.13.0
# API & Services
fastapi==0.104.0
uvicorn==0.24.0
grpcio==1.59.0
grpcio-tools==1.59.0
websockets==12.0
kafka-python==2.0.2
# Database
pymongo==4.5.0
redis==5.0.1
elasticsearch==8.11.0
# Monitoring
prometheus-client==0.18.0
mlflow==2.6.0
wandb==0.15.0
# Visualization
plotly==5.17.0
dash==2.14.0
streamlit==1.27.0
# Testing & Quality
pytest==7.4.0
pytest-cov==4.1.0
black==23.9.0
flake8==6.1.0
mypy==1.5.0
# Utils
tqdm==4.66.0
python-dotenv==1.0.0
pyyaml==6.0.1
joblib==1.3.2

2. src/models/base_model.py

import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer, AutoConfig
from abc import ABC, abstractmethod
import logging
from typing import Dict, List, Optional, Union
import numpy as np
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BaseSentimentModel(ABC, nn.Module):
"""Base class for all sentiment models"""
def __init__(self, model_name: str, num_labels: int = 3, device: str = None):
super().__init__()
self.model_name = model_name
self.num_labels = num_labels
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
# Load model and tokenizer
self.config = AutoConfig.from_pretrained(
model_name,
num_labels=num_labels,
output_hidden_states=True
)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name, config=self.config)
# Classification head
self.classifier = nn.Sequential(
nn.Linear(self.config.hidden_size, 512),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(512, 256),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(256, num_labels)
)
self.to(self.device)
logger.info(f"Initialized {model_name} on {self.device}")
@abstractmethod
def forward(self, input_ids, attention_mask, labels=None):
"""Forward pass"""
pass
def predict(self, texts: Union[str, List[str]], return_proba: bool = False):
"""Make predictions on input texts"""
self.eval()
if isinstance(texts, str):
texts = [texts]
# Tokenize
inputs = self.tokenizer(
texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
outputs = self.forward(**inputs)
if return_proba:
probabilities = torch.softmax(outputs, dim=-1)
return probabilities.cpu().numpy()
else:
predictions = torch.argmax(outputs, dim=-1)
return predictions.cpu().numpy()
def save_model(self, path: str):
"""Save model to disk"""
torch.save({
'model_state_dict': self.state_dict(),
'model_name': self.model_name,
'num_labels': self.num_labels,
'config': self.config
}, path)
logger.info(f"Model saved to {path}")
def load_model(self, path: str):
"""Load model from disk"""
checkpoint = torch.load(path, map_location=self.device)
self.model_name = checkpoint['model_name']
self.num_labels = checkpoint['num_labels']
self.config = checkpoint['config']
self.load_state_dict(checkpoint['model_state_dict'])
logger.info(f"Model loaded from {path}")
class TransformerSentimentModel(BaseSentimentModel):
"""Transformer-based sentiment model"""
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.last_hidden_state[:, 0, :]  # Use [CLS] token
logits = self.classifier(pooled_output)
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return loss, logits
return logits
class EnsembleSentimentModel(nn.Module):
"""Ensemble of multiple models"""
def __init__(self, models: List[BaseSentimentModel], weights: Optional[List[float]] = None):
super().__init__()
self.models = nn.ModuleList(models)
self.weights = weights or [1.0/len(models)] * len(models)
def forward(self, input_ids, attention_mask):
predictions = []
for model in self.models:
output = model(input_ids, attention_mask)
predictions.append(output)
# Weighted average
ensemble_output = sum(w * pred for w, pred in zip(self.weights, predictions))
return ensemble_output
def predict(self, texts: Union[str, List[str]], return_proba: bool = False):
"""Ensemble prediction"""
all_probs = []
for model in self.models:
probs = model.predict(texts, return_proba=True)
all_probs.append(probs)
# Weighted average of probabilities
ensemble_probs = np.average(all_probs, axis=0, weights=self.weights)
if return_proba:
return ensemble_probs
else:
return np.argmax(ensemble_probs, axis=-1)

3. src/models/bert_model.py

from src.models.base_model import TransformerSentimentModel
import torch
import torch.nn as nn
class BERTSentimentModel(TransformerSentimentModel):
"""BERT-based sentiment model with advanced features"""
def __init__(self, model_name='bert-base-uncased', num_labels=3, use_pooler=True):
super().__init__(model_name, num_labels)
self.use_pooler = use_pooler
# Additional layers for better performance
self.attention = nn.MultiheadAttention(
embed_dim=self.config.hidden_size,
num_heads=8,
dropout=0.1,
batch_first=True
)
self.layer_norm = nn.LayerNorm(self.config.hidden_size)
def forward(self, input_ids, attention_mask, token_type_ids=None, labels=None):
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids
)
if self.use_pooler:
# Use pooler output
pooled_output = outputs.pooler_output
else:
# Use mean of last hidden states
last_hidden = outputs.last_hidden_state
masked_output = last_hidden * attention_mask.unsqueeze(-1)
pooled_output = masked_output.sum(dim=1) / attention_mask.sum(dim=1, keepdim=True)
# Apply attention
attended_output, _ = self.attention(pooled_output.unsqueeze(1), 
pooled_output.unsqueeze(1), 
pooled_output.unsqueeze(1))
attended_output = attended_output.squeeze(1)
# Residual connection and layer norm
attended_output = self.layer_norm(pooled_output + attended_output)
# Classification
logits = self.classifier(attended_output)
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return loss, logits
return logits
def get_attention_weights(self, text):
"""Get attention weights for explainability"""
self.eval()
inputs = self.tokenizer(text, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model(**inputs, output_attentions=True)
attention = outputs.attentions[-1]  # Last layer attention
return attention.cpu().numpy()
class DistilBERTSentimentModel(BERTSentimentModel):
"""DistilBERT model for faster inference"""
def __init__(self, model_name='distilbert-base-uncased', num_labels=3):
super().__init__(model_name, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask
)
# DistilBERT doesn't have pooler, use mean of last hidden
last_hidden = outputs.last_hidden_state
masked_output = last_hidden * attention_mask.unsqueeze(-1)
pooled_output = masked_output.sum(dim=1) / attention_mask.sum(dim=1, keepdim=True)
logits = self.classifier(pooled_output)
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return loss, logits
return logits

4. src/models/aspect_model.py

import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from typing import List, Dict, Tuple
import numpy as np
class AspectSentimentModel(nn.Module):
"""Aspect-based sentiment analysis model"""
def __init__(self, model_name='bert-base-uncased', aspects: List[str] = None):
super().__init__()
self.model_name = model_name
self.aspects = aspects or ['price', 'quality', 'service', 'delivery', 'design']
# Base model
self.base_model = AutoModel.from_pretrained(model_name)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.config = self.base_model.config
# Aspect encoders
self.aspect_embeddings = nn.Embedding(len(self.aspects), self.config.hidden_size)
# Attention mechanism
self.aspect_attention = nn.MultiheadAttention(
embed_dim=self.config.hidden_size,
num_heads=8,
dropout=0.1,
batch_first=True
)
# Aspect-specific classifiers
self.aspect_classifiers = nn.ModuleList([
nn.Sequential(
nn.Linear(self.config.hidden_size * 2, 256),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(256, 3)  # 3 sentiments per aspect
) for _ in range(len(self.aspects))
])
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, input_ids, attention_mask, aspect_indices=None):
# Get base model outputs
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
# Get token embeddings
token_embeddings = outputs.last_hidden_state
# Get aspect embeddings
batch_size = input_ids.shape[0]
aspect_emb = self.aspect_embeddings(
torch.arange(len(self.aspects)).to(self.device)
).unsqueeze(0).expand(batch_size, -1, -1)
# Apply attention between tokens and aspects
attended_tokens, attention_weights = self.aspect_attention(
aspect_emb,  # Query: aspects
token_embeddings,  # Key: tokens
token_embeddings  # Value: tokens
)
# Combine aspect and token information
combined = torch.cat([aspect_emb, attended_tokens], dim=-1)
# Get predictions for each aspect
aspect_logits = []
for i, classifier in enumerate(self.aspect_classifiers):
logits = classifier(combined[:, i, :])
aspect_logits.append(logits)
return torch.stack(aspect_logits, dim=1), attention_weights
def predict(self, text: str) -> Dict[str, Dict[str, float]]:
"""Predict sentiment for each aspect"""
self.eval()
# Tokenize
inputs = self.tokenizer(
text,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
logits, attention_weights = self.forward(**inputs)
probabilities = torch.softmax(logits, dim=-1)
# Convert to predictions
sentiment_map = {0: 'negative', 1: 'neutral', 2: 'positive'}
results = {}
for i, aspect in enumerate(self.aspects):
probs = probabilities[0, i].cpu().numpy()
sentiment = sentiment_map[np.argmax(probs)]
confidence = float(np.max(probs))
results[aspect] = {
'sentiment': sentiment,
'confidence': confidence,
'probabilities': {
'negative': float(probs[0]),
'neutral': float(probs[1]),
'positive': float(probs[2])
}
}
return results
def get_aspect_attention(self, text: str) -> Dict[str, List[Tuple[str, float]]]:
"""Get attention weights for each aspect"""
self.eval()
# Tokenize
inputs = self.tokenizer(
text,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
_, attention_weights = self.forward(**inputs)
# Decode tokens
tokens = self.tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
# Get attention for each aspect
aspect_attention = {}
attention = attention_weights[0].cpu().numpy()  # [num_aspects, seq_len]
for i, aspect in enumerate(self.aspects):
token_attention = list(zip(tokens, attention[i]))
token_attention = sorted(token_attention, key=lambda x: x[1], reverse=True)
aspect_attention[aspect] = token_attention[:10]  # Top 10 tokens
return aspect_attention

5. src/models/emotion_model.py

import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from typing import Dict, List
import numpy as np
class EmotionDetectionModel(nn.Module):
"""Multi-label emotion detection model"""
def __init__(self, model_name='roberta-base'):
super().__init__()
self.model_name = model_name
self.emotions = ['joy', 'sadness', 'anger', 'fear', 'surprise', 'disgust', 'love']
self.num_emotions = len(self.emotions)
# Base model
self.base_model = AutoModel.from_pretrained(model_name)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.config = self.base_model.config
# Multi-label classification head
self.classifier = nn.Sequential(
nn.Linear(self.config.hidden_size, 512),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(512, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, self.num_emotions)
)
# Emotion-specific attention
self.emotion_attention = nn.Parameter(
torch.randn(self.num_emotions, self.config.hidden_size)
)
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, input_ids, attention_mask, labels=None):
# Get base model outputs
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
# Get token embeddings
token_embeddings = outputs.last_hidden_state
# Apply emotion-specific attention
# emotion_attention: [num_emotions, hidden]
# token_embeddings: [batch, seq_len, hidden]
attention_scores = torch.einsum('eh,bsh->bse', self.emotion_attention, token_embeddings)
attention_weights = torch.softmax(attention_scores, dim=1)
# Weighted sum of token embeddings
emotion_vectors = torch.einsum('bse,bsh->beh', attention_weights, token_embeddings)
# Classification
logits = self.classifier(emotion_vectors)
if labels is not None:
loss_fn = nn.BCEWithLogitsLoss()
loss = loss_fn(logits, labels.float())
return loss, torch.sigmoid(logits)
return torch.sigmoid(logits)
def predict(self, text: str, threshold: float = 0.5) -> Dict[str, float]:
"""Predict emotions in text"""
self.eval()
# Tokenize
inputs = self.tokenizer(
text,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
probabilities = self.forward(**inputs)[0] if len(inputs) > 1 else self.forward(**inputs)
probabilities = probabilities.cpu().numpy()[0]
# Apply threshold
results = {}
for emotion, prob in zip(self.emotions, probabilities):
results[emotion] = float(prob)
results[f"{emotion}_detected"] = bool(prob >= threshold)
# Get primary emotion
primary_idx = np.argmax(probabilities)
results['primary_emotion'] = self.emotions[primary_idx]
results['primary_confidence'] = float(probabilities[primary_idx])
return results
def predict_batch(self, texts: List[str], threshold: float = 0.5) -> List[Dict[str, float]]:
"""Predict emotions for multiple texts"""
self.eval()
# Tokenize
inputs = self.tokenizer(
texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
probabilities = self.forward(**inputs)
probabilities = probabilities.cpu().numpy()
results = []
for probs in probabilities:
result = {}
for emotion, prob in zip(self.emotions, probs):
result[emotion] = float(prob)
result[f"{emotion}_detected"] = bool(prob >= threshold)
primary_idx = np.argmax(probs)
result['primary_emotion'] = self.emotions[primary_idx]
result['primary_confidence'] = float(probs[primary_idx])
results.append(result)
return results

6. src/models/sarcasm_model.py

import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
import numpy as np
class SarcasmDetectionModel(nn.Module):
"""Sarcasm detection model using contrastive learning"""
def __init__(self, model_name='roberta-base'):
super().__init__()
self.model_name = model_name
# Base model
self.base_model = AutoModel.from_pretrained(model_name)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.config = self.base_model.config
# Projection heads
self.projection = nn.Sequential(
nn.Linear(self.config.hidden_size, 512),
nn.ReLU(),
nn.Linear(512, 256)
)
# Contrastive learning head
self.temperature = nn.Parameter(torch.ones(1) * 0.07)
# Classification head
self.classifier = nn.Sequential(
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(128, 2)  # sarcastic or not
)
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, input_ids, attention_mask, labels=None):
# Get base model outputs
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
# Use [CLS] token
pooled = outputs.last_hidden_state[:, 0, :]
# Project
projected = self.projection(pooled)
# Classify
logits = self.classifier(projected)
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return loss, logits
return logits
def contrastive_loss(self, anchor, positive, negative):
"""Compute contrastive loss for better representations"""
# Normalize embeddings
anchor = nn.functional.normalize(anchor, dim=-1)
positive = nn.functional.normalize(positive, dim=-1)
negative = nn.functional.normalize(negative, dim=-1)
# Compute similarities
pos_sim = torch.sum(anchor * positive, dim=-1) / self.temperature
neg_sim = torch.sum(anchor * negative, dim=-1) / self.temperature
# Compute loss
pos_exp = torch.exp(pos_sim)
neg_exp = torch.exp(neg_sim)
loss = -torch.log(pos_exp / (pos_exp + neg_exp))
return loss.mean()
def predict(self, text: str) -> Dict[str, any]:
"""Predict if text is sarcastic"""
self.eval()
# Tokenize
inputs = self.tokenizer(
text,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
logits = self.forward(**inputs)
probabilities = torch.softmax(logits, dim=-1)
probs = probabilities.cpu().numpy()[0]
return {
'is_sarcastic': bool(np.argmax(probs) == 1),
'sarcastic_probability': float(probs[1]),
'not_sarcastic_probability': float(probs[0]),
'confidence': float(np.max(probs))
}
def get_sarcasm_features(self, text: str) -> Dict[str, float]:
"""Extract features that indicate sarcasm"""
self.eval()
# Get embeddings
inputs = self.tokenizer(
text,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
outputs = self.base_model(**inputs)
embeddings = outputs.last_hidden_state[0]  # [seq_len, hidden]
# Calculate linguistic features
tokens = self.tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
# Feature extraction
features = {
'positive_word_ratio': self._calculate_positive_ratio(tokens),
'negative_word_ratio': self._calculate_negative_ratio(tokens),
'exaggeration_score': self._calculate_exaggeration(text),
'contrast_score': self._calculate_contrast(embeddings),
'punctuation_anomaly': self._check_punctuation(text)
}
return features
def _calculate_positive_ratio(self, tokens):
# Simplified - in practice use sentiment lexicon
positive_words = {'good', 'great', 'awesome', 'fantastic', 'amazing'}
token_set = set(t.lower() for t in tokens)
return len(positive_words & token_set) / max(len(tokens), 1)
def _calculate_negative_ratio(self, tokens):
negative_words = {'bad', 'terrible', 'awful', 'horrible', 'worst'}
token_set = set(t.lower() for t in tokens)
return len(negative_words & token_set) / max(len(tokens), 1)
def _calculate_exaggeration(self, text):
# Count exaggerative markers
markers = ['!', '?', 'absolutely', 'literally', 'totally', 'completely']
count = sum(text.count(m) for m in markers)
return min(count / 10, 1.0)
def _calculate_contrast(self, embeddings):
# Measure semantic contrast using cosine similarity
if len(embeddings) < 2:
return 0.0
# Compare first and last sentence embeddings
first_sent = embeddings[:len(embeddings)//2].mean(0)
last_sent = embeddings[len(embeddings)//2:].mean(0)
similarity = nn.functional.cosine_similarity(
first_sent.unsqueeze(0), 
last_sent.unsqueeze(0)
)
return 1 - similarity.item()  # Contrast = 1 - similarity
def _check_punctuation(self, text):
# Check for unusual punctuation patterns
exclamation_count = text.count('!')
question_count = text.count('?')
return (exclamation_count + question_count) > 3

7. src/training/trainer.py

import torch
from torch.utils.data import DataLoader, Dataset
from transformers import get_linear_schedule_with_warmup
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support
import wandb
import mlflow
from typing import Dict, List, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SentimentDataset(Dataset):
"""Custom dataset for sentiment analysis"""
def __init__(self, texts, labels, tokenizer, max_length=512):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = str(self.texts[idx])
label = self.labels[idx]
encoding = self.tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': torch.tensor(label, dtype=torch.long)
}
class ModelTrainer:
"""Advanced model trainer with experiment tracking"""
def __init__(self, model, config):
self.model = model
self.config = config
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model.to(self.device)
# Initialize experiment tracking
self._init_experiment_tracking()
def _init_experiment_tracking(self):
"""Initialize wandb and mlflow"""
# Weights & Biases
wandb.init(
project="sentiment-analyzer",
config=self.config,
name=f"run_{self.config.get('model_name', 'experiment')}"
)
# MLflow
mlflow.set_experiment("sentiment_analysis")
mlflow.start_run()
mlflow.log_params(self.config)
def train(self, train_texts, train_labels, val_texts=None, val_labels=None):
"""Train the model"""
# Create datasets
train_dataset = SentimentDataset(
train_texts, train_labels,
self.model.tokenizer,
self.config.get('max_length', 512)
)
train_loader = DataLoader(
train_dataset,
batch_size=self.config.get('batch_size', 16),
shuffle=True,
num_workers=4
)
# Validation loader
val_loader = None
if val_texts and val_labels:
val_dataset = SentimentDataset(
val_texts, val_labels,
self.model.tokenizer,
self.config.get('max_length', 512)
)
val_loader = DataLoader(
val_dataset,
batch_size=self.config.get('batch_size', 16),
shuffle=False,
num_workers=4
)
# Optimizer
optimizer = torch.optim.AdamW(
self.model.parameters(),
lr=self.config.get('learning_rate', 2e-5),
weight_decay=self.config.get('weight_decay', 0.01)
)
# Scheduler
total_steps = len(train_loader) * self.config.get('epochs', 3)
scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=int(total_steps * 0.1),
num_training_steps=total_steps
)
# Training loop
best_val_acc = 0
train_losses = []
val_metrics = []
for epoch in range(self.config.get('epochs', 3)):
# Training
self.model.train()
epoch_loss = 0
train_progress = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{self.config.get("epochs", 3)}')
for batch in train_progress:
# Move to device
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
labels = batch['labels'].to(self.device)
# Forward pass
loss, logits = self.model(input_ids, attention_mask, labels)
# Backward pass
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
scheduler.step()
epoch_loss += loss.item()
train_progress.set_postfix({'loss': loss.item()})
# Log to wandb
wandb.log({'train_loss': loss.item()})
avg_train_loss = epoch_loss / len(train_loader)
train_losses.append(avg_train_loss)
# Validation
if val_loader:
val_metrics_epoch = self.evaluate(val_loader)
val_metrics.append(val_metrics_epoch)
# Log validation metrics
wandb.log({
'val_loss': val_metrics_epoch['loss'],
'val_accuracy': val_metrics_epoch['accuracy'],
'val_f1': val_metrics_epoch['f1']
})
mlflow.log_metrics({
f'val_accuracy_epoch_{epoch}': val_metrics_epoch['accuracy'],
f'val_f1_epoch_{epoch}': val_metrics_epoch['f1']
})
# Save best model
if val_metrics_epoch['accuracy'] > best_val_acc:
best_val_acc = val_metrics_epoch['accuracy']
self.save_model('best_model.pt')
logger.info(f"New best model saved with accuracy: {best_val_acc:.4f}")
# Log to mlflow
mlflow.log_metric('train_loss', avg_train_loss, step=epoch)
# Final logging
wandb.finish()
mlflow.end_run()
return {
'train_losses': train_losses,
'val_metrics': val_metrics,
'best_val_accuracy': best_val_acc
}
def evaluate(self, dataloader):
"""Evaluate the model"""
self.model.eval()
total_loss = 0
all_preds = []
all_labels = []
with torch.no_grad():
for batch in tqdm(dataloader, desc='Evaluating'):
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
labels = batch['labels'].to(self.device)
loss, logits = self.model(input_ids, attention_mask, labels)
total_loss += loss.item()
preds = torch.argmax(logits, dim=-1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
# Calculate metrics
accuracy = accuracy_score(all_labels, all_preds)
precision, recall, f1, _ = precision_recall_fscore_support(
all_labels, all_preds, average='weighted'
)
return {
'loss': total_loss / len(dataloader),
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1
}
def save_model(self, path):
"""Save model checkpoint"""
torch.save({
'model_state_dict': self.model.state_dict(),
'config': self.config
}, path)
logger.info(f"Model saved to {path}")
def load_model(self, path):
"""Load model checkpoint"""
checkpoint = torch.load(path, map_location=self.device)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.config = checkpoint['config']
logger.info(f"Model loaded from {path}")
class ActiveLearningTrainer(ModelTrainer):
"""Trainer with active learning capabilities"""
def __init__(self, model, config):
super().__init__(model, config)
self.unlabeled_pool = []
def add_unlabeled_data(self, texts):
"""Add unlabeled data to pool"""
self.unlabeled_pool.extend(texts)
def select_samples_for_labeling(self, n_samples=10, strategy='uncertainty'):
"""Select most informative samples for labeling"""
if strategy == 'uncertainty':
return self._select_by_uncertainty(n_samples)
elif strategy == 'diversity':
return self._select_by_diversity(n_samples)
elif strategy == 'hybrid':
return self._select_hybrid(n_samples)
def _select_by_uncertainty(self, n_samples):
"""Select samples with highest prediction uncertainty"""
self.model.eval()
uncertainties = []
for text in tqdm(self.unlabeled_pool, desc='Calculating uncertainty'):
inputs = self.model.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=512
).to(self.device)
with torch.no_grad():
logits = self.model(**inputs)
probs = torch.softmax(logits, dim=-1)
# Entropy as uncertainty measure
entropy = -torch.sum(probs * torch.log(probs + 1e-10))
uncertainties.append(entropy.item())
# Select top uncertain samples
indices = np.argsort(uncertainties)[-n_samples:]
selected = [self.unlabeled_pool[i] for i in indices]
# Remove selected from pool
self.unlabeled_pool = [t for i, t in enumerate(self.unlabeled_pool) 
if i not in indices]
return selected
def _select_by_diversity(self, n_samples):
"""Select diverse samples using clustering"""
# Get embeddings for all samples
embeddings = []
for text in tqdm(self.unlabeled_pool, desc='Getting embeddings'):
inputs = self.model.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=512
).to(self.device)
with torch.no_grad():
outputs = self.model.base_model(**inputs)
emb = outputs.last_hidden_state[:, 0, :].cpu().numpy()
embeddings.append(emb[0])
embeddings = np.array(embeddings)
# Cluster using k-means
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=min(n_samples, len(embeddings)))
clusters = kmeans.fit_predict(embeddings)
# Select one sample from each cluster
selected = []
for i in range(min(n_samples, len(np.unique(clusters)))):
cluster_indices = np.where(clusters == i)[0]
selected_idx = np.random.choice(cluster_indices)
selected.append(self.unlabeled_pool[selected_idx])
# Remove selected from pool
self.unlabeled_pool = [t for i, t in enumerate(self.unlabeled_pool) 
if i not in [self.unlabeled_pool.index(s) for s in selected]]
return selected
def _select_hybrid(self, n_samples):
"""Hybrid selection: uncertainty + diversity"""
# First get uncertain samples
uncertain_samples = self._select_by_uncertainty(n_samples * 2)
# Then select diverse among them
self.unlabeled_pool = uncertain_samples + self.unlabeled_pool
return self._select_by_diversity(n_samples)

8. src/explainability/shap_explainer.py

import shap
import torch
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Dict, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SHAPExplainer:
"""SHAP-based model explainer"""
def __init__(self, model, tokenizer, background_texts: List[str]):
self.model = model
self.tokenizer = tokenizer
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Create background dataset
self.background = self._create_background_dataset(background_texts)
# Create explainer
self.explainer = shap.Explainer(
self._model_predict,
self.background,
feature_names=self.tokenizer.convert_ids_to_tokens
)
def _create_background_dataset(self, texts: List[str]):
"""Create background dataset for SHAP"""
encodings = self.tokenizer(
texts,
padding=True,
truncation=True,
max_length=128,
return_tensors="pt"
)
return encodings['input_ids'].numpy()
def _model_predict(self, texts):
"""Wrapper for model prediction"""
self.model.eval()
inputs = self.tokenizer(
texts.tolist() if hasattr(texts, 'tolist') else texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
outputs = self.model(**inputs)
if hasattr(outputs, 'logits'):
logits = outputs.logits
else:
logits = outputs
probs = torch.softmax(logits, dim=-1)
return probs.cpu().numpy()
def explain(self, text: str, class_idx: int = None) -> Dict[str, Any]:
"""Generate SHAP explanation for text"""
# Tokenize
tokens = self.tokenizer.tokenize(text)
input_ids = self.tokenizer.encode(text, return_tensors='pt').to(self.device)
# Get SHAP values
shap_values = self.explainer([text])
# Get explanation for specific class
if class_idx is None:
# Get prediction
probs = self._model_predict([text])[0]
class_idx = np.argmax(probs)
# Extract values for the class
values = shap_values.values[0, :, class_idx]
# Create explanation
explanation = {
'text': text,
'tokens': tokens,
'class_idx': int(class_idx),
'class_name': self._get_class_name(class_idx),
'shap_values': values.tolist(),
'base_value': float(shap_values.base_values[0, class_idx]),
'prediction': float(shap_values.data[0, class_idx])
}
# Get top contributing tokens
token_contributions = list(zip(tokens, values))
token_contributions.sort(key=lambda x: abs(x[1]), reverse=True)
explanation['top_positive'] = [
{'token': t, 'contribution': float(v)} 
for t, v in token_contributions if v > 0
][:5]
explanation['top_negative'] = [
{'token': t, 'contribution': float(v)} 
for t, v in token_contributions if v < 0
][:5]
return explanation
def plot_explanation(self, text: str, class_idx: int = None):
"""Plot SHAP explanation"""
explanation = self.explain(text, class_idx)
# Create waterfall plot
shap.waterfall_plot(
shap.Explanation(
values=np.array(explanation['shap_values']),
base_values=explanation['base_value'],
data=explanation['tokens'],
feature_names=explanation['tokens']
),
show=False
)
plt.title(f"SHAP Explanation - Class: {explanation['class_name']}")
plt.tight_layout()
return plt
def _get_class_name(self, class_idx: int) -> str:
"""Get class name from index"""
class_names = {0: 'Negative', 1: 'Neutral', 2: 'Positive'}
return class_names.get(class_idx, f'Class_{class_idx}')
def get_feature_importance(self, texts: List[str]) -> Dict[str, float]:
"""Get global feature importance"""
all_contributions = {}
for text in texts:
explanation = self.explain(text)
for token, value in zip(explanation['tokens'], explanation['shap_values']):
if token not in all_contributions:
all_contributions[token] = []
all_contributions[token].append(abs(value))
# Average contributions
feature_importance = {
token: np.mean(values) 
for token, values in all_contributions.items()
}
# Sort by importance
feature_importance = dict(
sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)
)
return feature_importance

9. src/explainability/lime_explainer.py

```python
from lime.lime_text import LimeTextExplainer
import numpy as np
from typing import List, Dict, Any
import matplotlib.pyplot as plt

class LIM

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper