spamBERT/server.py
Carsten Abele f05320a8cb Add project README and reason/quote fields to classifier response
- README.md: full project overview with setup, training, API, and RSpamd integration docs
- server.py: add reason (human-readable explanation) and quote (suspicious snippet) to response
- spamllm.lua: pass reason and quote through to RSpamd symbol description for logs/UI

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 22:32:54 +01:00

186 lines
6.1 KiB
Python

"""
FastAPI Service für Spam-Klassifikation mit Spracherkennung.
Stellt einen HTTP-Endpunkt bereit, den RSpamd als external_service aufrufen kann.
Mails in nicht-erwarteten Sprachen (nicht DE/EN) bekommen einen Spam-Bonus.
"""
import logging
from contextlib import asynccontextmanager
from pathlib import Path
import torch
from fastapi import FastAPI
from langdetect import DetectorFactory, detect_langs
from pydantic import BaseModel
from transformers import AutoModelForSequenceClassification, AutoTokenizer
# Deterministische Spracherkennung
DetectorFactory.seed = 0
# Erwartete Sprachen - alles andere bekommt einen Spam-Score-Bonus
EXPECTED_LANGUAGES = {"de", "en"}
# Score-Bonus für unerwartete Sprachen (0-5 Punkte extra)
FOREIGN_LANG_BONUS = 4.0
logger = logging.getLogger("spamllm")
logging.basicConfig(level=logging.INFO)
MODEL_PATH = Path("./model/final")
# Global model state
model = None
tokenizer = None
device = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global model, tokenizer, device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Loading model from {MODEL_PATH} on {device}")
tokenizer = AutoTokenizer.from_pretrained(str(MODEL_PATH))
model = AutoModelForSequenceClassification.from_pretrained(str(MODEL_PATH))
model.to(device)
model.eval()
logger.info("Model loaded successfully")
yield
app = FastAPI(title="SpamLLM Classifier", lifespan=lifespan)
class ClassifyRequest(BaseModel):
subject: str = ""
body: str = ""
from_addr: str = ""
class ClassifyResponse(BaseModel):
is_spam: bool
confidence: float
score: float # RSpamd-kompatibler Score (0-15)
language: str # Erkannte Sprache
foreign_lang_bonus: float # Zusätzlicher Score für Fremdsprache
reason: str # Menschenlesbare Begründung
quote: str # Verdächtigster Textausschnitt
def detect_language(text: str) -> tuple[str, bool]:
"""Erkennt die Sprache und ob sie erwartet ist."""
if not text or len(text.strip()) < 20:
return "unknown", False
try:
langs = detect_langs(text)
top_lang = langs[0]
lang_code = top_lang.lang
is_foreign = lang_code not in EXPECTED_LANGUAGES
return lang_code, is_foreign
except Exception:
return "unknown", False
# Spam-Signalwörter für Quote-Extraktion (DE + EN)
SPAM_PATTERNS = [
"click here", "klicken sie", "jetzt bestellen", "order now",
"act now", "sofort", "dringend", "urgent", "verify your",
"bestätigen sie", "gewonnen", "you won", "congratulations",
"herzlichen glückwunsch", "free", "gratis", "kostenlos",
"100%", "guarantee", "garantie", "limited time", "nur heute",
"unsubscribe", "abmelden", "no risk", "kein risiko",
"bank details", "bankdaten", "password", "passwort",
"account suspended", "konto gesperrt", "credit card", "kreditkarte",
"viagra", "cialis", "pharmacy", "apotheke", "discount", "rabatt",
"million", "prize", "preis", "winner", "gewinner",
]
def find_spam_quote(subject: str, body: str) -> str:
"""Findet den verdächtigsten Textausschnitt in der Mail."""
full_text = f"{subject} {body}".lower()
for pattern in SPAM_PATTERNS:
pos = full_text.find(pattern)
if pos != -1:
# Kontext um das Match herum extrahieren (max 120 Zeichen)
original = f"{subject} {body}"
start = max(0, pos - 30)
end = min(len(original), pos + len(pattern) + 60)
snippet = original[start:end].strip()
if start > 0:
snippet = "..." + snippet
if end < len(original):
snippet = snippet + "..."
return snippet
# Kein Pattern gefunden -> ersten Satz des Bodys als Fallback
if body:
first_sentence = body.split(".")[0].strip()
return first_sentence[:120] + ("..." if len(first_sentence) > 120 else "")
return subject[:120] if subject else ""
def build_reason(spam_prob: float, is_foreign: bool, language: str) -> str:
"""Baut eine menschenlesbare Begründung zusammen."""
reasons = []
if spam_prob > 0.8:
reasons.append(f"High spam confidence ({spam_prob:.0%})")
elif spam_prob > 0.5:
reasons.append(f"Moderate spam confidence ({spam_prob:.0%})")
elif spam_prob > 0.3:
reasons.append(f"Low spam confidence ({spam_prob:.0%})")
else:
reasons.append(f"Likely ham ({1 - spam_prob:.0%} confidence)")
if is_foreign:
reasons.append(f"Unexpected language: {language} (not DE/EN)")
return "; ".join(reasons)
@app.post("/classify", response_model=ClassifyResponse)
async def classify(request: ClassifyRequest):
# Kombiniere Mail-Felder zu einem Text
text = f"From: {request.from_addr}\nSubject: {request.subject}\n\n{request.body}"
# Spracherkennung auf dem Body (Subject ist oft zu kurz)
lang_text = request.body if request.body else request.subject
language, is_foreign = detect_language(lang_text)
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, padding=True)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
probs = torch.softmax(outputs.logits, dim=-1)
spam_prob = probs[0][1].item()
# Konvertiere Wahrscheinlichkeit zu RSpamd-Score (0-15 Skala)
rspamd_score = spam_prob * 15.0
# Fremdsprachen-Bonus: Nicht DE/EN bekommt extra Punkte
lang_bonus = FOREIGN_LANG_BONUS if is_foreign else 0.0
rspamd_score = min(rspamd_score + lang_bonus, 15.0)
# Spam-Schwelle nach Bonus neu bewerten
effective_spam = spam_prob > 0.5 or (is_foreign and spam_prob > 0.3)
reason = build_reason(spam_prob, is_foreign, language)
quote = find_spam_quote(request.subject, request.body) if effective_spam else ""
return ClassifyResponse(
is_spam=effective_spam,
confidence=spam_prob,
score=round(rspamd_score, 2),
language=language,
foreign_lang_bonus=lang_bonus,
reason=reason,
quote=quote,
)
@app.get("/health")
async def health():
return {"status": "ok", "model_loaded": model is not None}