""" FastAPI Service für Spam-Klassifikation mit Spracherkennung. Stellt einen HTTP-Endpunkt bereit, den RSpamd als external_service aufrufen kann. Mails in nicht-erwarteten Sprachen (nicht DE/EN) bekommen einen Spam-Bonus. """ import logging from contextlib import asynccontextmanager from pathlib import Path import torch from fastapi import FastAPI from langdetect import DetectorFactory, detect_langs from pydantic import BaseModel from transformers import AutoModelForSequenceClassification, AutoTokenizer # Deterministische Spracherkennung DetectorFactory.seed = 0 # Erwartete Sprachen - alles andere bekommt einen Spam-Score-Bonus EXPECTED_LANGUAGES = {"de", "en"} # Score-Bonus für unerwartete Sprachen (0-5 Punkte extra) FOREIGN_LANG_BONUS = 4.0 logger = logging.getLogger("spamllm") logging.basicConfig(level=logging.INFO) MODEL_PATH = Path("./model/final") # Global model state model = None tokenizer = None device = None @asynccontextmanager async def lifespan(app: FastAPI): global model, tokenizer, device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Loading model from {MODEL_PATH} on {device}") tokenizer = AutoTokenizer.from_pretrained(str(MODEL_PATH)) model = AutoModelForSequenceClassification.from_pretrained(str(MODEL_PATH)) model.to(device) model.eval() logger.info("Model loaded successfully") yield app = FastAPI(title="SpamLLM Classifier", lifespan=lifespan) class ClassifyRequest(BaseModel): subject: str = "" body: str = "" from_addr: str = "" class ClassifyResponse(BaseModel): is_spam: bool confidence: float score: float # RSpamd-kompatibler Score (0-15) language: str # Erkannte Sprache foreign_lang_bonus: float # Zusätzlicher Score für Fremdsprache reason: str # Menschenlesbare Begründung quote: str # Verdächtigster Textausschnitt def detect_language(text: str) -> tuple[str, bool]: """Erkennt die Sprache und ob sie erwartet ist.""" if not text or len(text.strip()) < 20: return "unknown", False try: langs = detect_langs(text) top_lang = langs[0] lang_code = top_lang.lang is_foreign = lang_code not in EXPECTED_LANGUAGES return lang_code, is_foreign except Exception: return "unknown", False # Spam-Signalwörter für Quote-Extraktion (DE + EN) SPAM_PATTERNS = [ "click here", "klicken sie", "jetzt bestellen", "order now", "act now", "sofort", "dringend", "urgent", "verify your", "bestätigen sie", "gewonnen", "you won", "congratulations", "herzlichen glückwunsch", "free", "gratis", "kostenlos", "100%", "guarantee", "garantie", "limited time", "nur heute", "unsubscribe", "abmelden", "no risk", "kein risiko", "bank details", "bankdaten", "password", "passwort", "account suspended", "konto gesperrt", "credit card", "kreditkarte", "viagra", "cialis", "pharmacy", "apotheke", "discount", "rabatt", "million", "prize", "preis", "winner", "gewinner", ] def find_spam_quote(subject: str, body: str) -> str: """Findet den verdächtigsten Textausschnitt in der Mail.""" full_text = f"{subject} {body}".lower() for pattern in SPAM_PATTERNS: pos = full_text.find(pattern) if pos != -1: # Kontext um das Match herum extrahieren (max 120 Zeichen) original = f"{subject} {body}" start = max(0, pos - 30) end = min(len(original), pos + len(pattern) + 60) snippet = original[start:end].strip() if start > 0: snippet = "..." + snippet if end < len(original): snippet = snippet + "..." return snippet # Kein Pattern gefunden -> ersten Satz des Bodys als Fallback if body: first_sentence = body.split(".")[0].strip() return first_sentence[:120] + ("..." if len(first_sentence) > 120 else "") return subject[:120] if subject else "" def build_reason(spam_prob: float, is_foreign: bool, language: str) -> str: """Baut eine menschenlesbare Begründung zusammen.""" reasons = [] if spam_prob > 0.8: reasons.append(f"High spam confidence ({spam_prob:.0%})") elif spam_prob > 0.5: reasons.append(f"Moderate spam confidence ({spam_prob:.0%})") elif spam_prob > 0.3: reasons.append(f"Low spam confidence ({spam_prob:.0%})") else: reasons.append(f"Likely ham ({1 - spam_prob:.0%} confidence)") if is_foreign: reasons.append(f"Unexpected language: {language} (not DE/EN)") return "; ".join(reasons) @app.post("/classify", response_model=ClassifyResponse) async def classify(request: ClassifyRequest): # Kombiniere Mail-Felder zu einem Text text = f"From: {request.from_addr}\nSubject: {request.subject}\n\n{request.body}" # Spracherkennung auf dem Body (Subject ist oft zu kurz) lang_text = request.body if request.body else request.subject language, is_foreign = detect_language(lang_text) inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) probs = torch.softmax(outputs.logits, dim=-1) spam_prob = probs[0][1].item() # Konvertiere Wahrscheinlichkeit zu RSpamd-Score (0-15 Skala) rspamd_score = spam_prob * 15.0 # Fremdsprachen-Bonus: Nicht DE/EN bekommt extra Punkte lang_bonus = FOREIGN_LANG_BONUS if is_foreign else 0.0 rspamd_score = min(rspamd_score + lang_bonus, 15.0) # Spam-Schwelle nach Bonus neu bewerten effective_spam = spam_prob > 0.5 or (is_foreign and spam_prob > 0.3) reason = build_reason(spam_prob, is_foreign, language) quote = find_spam_quote(request.subject, request.body) if effective_spam else "" return ClassifyResponse( is_spam=effective_spam, confidence=spam_prob, score=round(rspamd_score, 2), language=language, foreign_lang_bonus=lang_bonus, reason=reason, quote=quote, ) @app.get("/health") async def health(): return {"status": "ok", "model_loaded": model is not None}