""" FastAPI Service für Spam-Klassifikation mit Spracherkennung. Stellt einen HTTP-Endpunkt bereit, den RSpamd als external_service aufrufen kann. Mails in nicht-erwarteten Sprachen (nicht DE/EN) bekommen einen Spam-Bonus. """ import logging from contextlib import asynccontextmanager from pathlib import Path import torch from fastapi import FastAPI from langdetect import DetectorFactory, detect_langs from pydantic import BaseModel from transformers import AutoModelForSequenceClassification, AutoTokenizer # Deterministische Spracherkennung DetectorFactory.seed = 0 # Erwartete Sprachen - alles andere bekommt einen Spam-Score-Bonus EXPECTED_LANGUAGES = {"de", "en"} # Score-Bonus für unerwartete Sprachen (0-5 Punkte extra) FOREIGN_LANG_BONUS = 4.0 logger = logging.getLogger("spamllm") logging.basicConfig(level=logging.INFO) MODEL_PATH = Path("./model/final") # Global model state model = None tokenizer = None device = None @asynccontextmanager async def lifespan(app: FastAPI): global model, tokenizer, device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Loading model from {MODEL_PATH} on {device}") tokenizer = AutoTokenizer.from_pretrained(str(MODEL_PATH)) model = AutoModelForSequenceClassification.from_pretrained(str(MODEL_PATH)) model.to(device) model.eval() logger.info("Model loaded successfully") yield app = FastAPI(title="SpamLLM Classifier", lifespan=lifespan) class ClassifyRequest(BaseModel): subject: str = "" body: str = "" from_addr: str = "" class ClassifyResponse(BaseModel): is_spam: bool confidence: float score: float # RSpamd-kompatibler Score (0-15) language: str # Erkannte Sprache foreign_lang_bonus: float # Zusätzlicher Score für Fremdsprache def detect_language(text: str) -> tuple[str, bool]: """Erkennt die Sprache und ob sie erwartet ist.""" if not text or len(text.strip()) < 20: return "unknown", False try: langs = detect_langs(text) top_lang = langs[0] lang_code = top_lang.lang is_foreign = lang_code not in EXPECTED_LANGUAGES return lang_code, is_foreign except Exception: return "unknown", False @app.post("/classify", response_model=ClassifyResponse) async def classify(request: ClassifyRequest): # Kombiniere Mail-Felder zu einem Text text = f"From: {request.from_addr}\nSubject: {request.subject}\n\n{request.body}" # Spracherkennung auf dem Body (Subject ist oft zu kurz) lang_text = request.body if request.body else request.subject language, is_foreign = detect_language(lang_text) inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) probs = torch.softmax(outputs.logits, dim=-1) spam_prob = probs[0][1].item() # Konvertiere Wahrscheinlichkeit zu RSpamd-Score (0-15 Skala) rspamd_score = spam_prob * 15.0 # Fremdsprachen-Bonus: Nicht DE/EN bekommt extra Punkte lang_bonus = FOREIGN_LANG_BONUS if is_foreign else 0.0 rspamd_score = min(rspamd_score + lang_bonus, 15.0) # Spam-Schwelle nach Bonus neu bewerten effective_spam = spam_prob > 0.5 or (is_foreign and spam_prob > 0.3) return ClassifyResponse( is_spam=effective_spam, confidence=spam_prob, score=round(rspamd_score, 2), language=language, foreign_lang_bonus=lang_bonus, ) @app.get("/health") async def health(): return {"status": "ok", "model_loaded": model is not None}