- README.md: full project overview with setup, training, API, and RSpamd integration docs - server.py: add reason (human-readable explanation) and quote (suspicious snippet) to response - spamllm.lua: pass reason and quote through to RSpamd symbol description for logs/UI Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
186 lines
6.1 KiB
Python
186 lines
6.1 KiB
Python
"""
|
|
FastAPI Service für Spam-Klassifikation mit Spracherkennung.
|
|
|
|
Stellt einen HTTP-Endpunkt bereit, den RSpamd als external_service aufrufen kann.
|
|
Mails in nicht-erwarteten Sprachen (nicht DE/EN) bekommen einen Spam-Bonus.
|
|
"""
|
|
|
|
import logging
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
|
|
import torch
|
|
from fastapi import FastAPI
|
|
from langdetect import DetectorFactory, detect_langs
|
|
from pydantic import BaseModel
|
|
from transformers import AutoModelForSequenceClassification, AutoTokenizer
|
|
|
|
# Deterministische Spracherkennung
|
|
DetectorFactory.seed = 0
|
|
|
|
# Erwartete Sprachen - alles andere bekommt einen Spam-Score-Bonus
|
|
EXPECTED_LANGUAGES = {"de", "en"}
|
|
# Score-Bonus für unerwartete Sprachen (0-5 Punkte extra)
|
|
FOREIGN_LANG_BONUS = 4.0
|
|
|
|
logger = logging.getLogger("spamllm")
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
MODEL_PATH = Path("./model/final")
|
|
|
|
# Global model state
|
|
model = None
|
|
tokenizer = None
|
|
device = None
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
global model, tokenizer, device
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
logger.info(f"Loading model from {MODEL_PATH} on {device}")
|
|
tokenizer = AutoTokenizer.from_pretrained(str(MODEL_PATH))
|
|
model = AutoModelForSequenceClassification.from_pretrained(str(MODEL_PATH))
|
|
model.to(device)
|
|
model.eval()
|
|
logger.info("Model loaded successfully")
|
|
yield
|
|
|
|
|
|
app = FastAPI(title="SpamLLM Classifier", lifespan=lifespan)
|
|
|
|
|
|
class ClassifyRequest(BaseModel):
|
|
subject: str = ""
|
|
body: str = ""
|
|
from_addr: str = ""
|
|
|
|
|
|
class ClassifyResponse(BaseModel):
|
|
is_spam: bool
|
|
confidence: float
|
|
score: float # RSpamd-kompatibler Score (0-15)
|
|
language: str # Erkannte Sprache
|
|
foreign_lang_bonus: float # Zusätzlicher Score für Fremdsprache
|
|
reason: str # Menschenlesbare Begründung
|
|
quote: str # Verdächtigster Textausschnitt
|
|
|
|
|
|
def detect_language(text: str) -> tuple[str, bool]:
|
|
"""Erkennt die Sprache und ob sie erwartet ist."""
|
|
if not text or len(text.strip()) < 20:
|
|
return "unknown", False
|
|
|
|
try:
|
|
langs = detect_langs(text)
|
|
top_lang = langs[0]
|
|
lang_code = top_lang.lang
|
|
is_foreign = lang_code not in EXPECTED_LANGUAGES
|
|
return lang_code, is_foreign
|
|
except Exception:
|
|
return "unknown", False
|
|
|
|
|
|
# Spam-Signalwörter für Quote-Extraktion (DE + EN)
|
|
SPAM_PATTERNS = [
|
|
"click here", "klicken sie", "jetzt bestellen", "order now",
|
|
"act now", "sofort", "dringend", "urgent", "verify your",
|
|
"bestätigen sie", "gewonnen", "you won", "congratulations",
|
|
"herzlichen glückwunsch", "free", "gratis", "kostenlos",
|
|
"100%", "guarantee", "garantie", "limited time", "nur heute",
|
|
"unsubscribe", "abmelden", "no risk", "kein risiko",
|
|
"bank details", "bankdaten", "password", "passwort",
|
|
"account suspended", "konto gesperrt", "credit card", "kreditkarte",
|
|
"viagra", "cialis", "pharmacy", "apotheke", "discount", "rabatt",
|
|
"million", "prize", "preis", "winner", "gewinner",
|
|
]
|
|
|
|
|
|
def find_spam_quote(subject: str, body: str) -> str:
|
|
"""Findet den verdächtigsten Textausschnitt in der Mail."""
|
|
full_text = f"{subject} {body}".lower()
|
|
|
|
for pattern in SPAM_PATTERNS:
|
|
pos = full_text.find(pattern)
|
|
if pos != -1:
|
|
# Kontext um das Match herum extrahieren (max 120 Zeichen)
|
|
original = f"{subject} {body}"
|
|
start = max(0, pos - 30)
|
|
end = min(len(original), pos + len(pattern) + 60)
|
|
snippet = original[start:end].strip()
|
|
if start > 0:
|
|
snippet = "..." + snippet
|
|
if end < len(original):
|
|
snippet = snippet + "..."
|
|
return snippet
|
|
|
|
# Kein Pattern gefunden -> ersten Satz des Bodys als Fallback
|
|
if body:
|
|
first_sentence = body.split(".")[0].strip()
|
|
return first_sentence[:120] + ("..." if len(first_sentence) > 120 else "")
|
|
return subject[:120] if subject else ""
|
|
|
|
|
|
def build_reason(spam_prob: float, is_foreign: bool, language: str) -> str:
|
|
"""Baut eine menschenlesbare Begründung zusammen."""
|
|
reasons = []
|
|
|
|
if spam_prob > 0.8:
|
|
reasons.append(f"High spam confidence ({spam_prob:.0%})")
|
|
elif spam_prob > 0.5:
|
|
reasons.append(f"Moderate spam confidence ({spam_prob:.0%})")
|
|
elif spam_prob > 0.3:
|
|
reasons.append(f"Low spam confidence ({spam_prob:.0%})")
|
|
else:
|
|
reasons.append(f"Likely ham ({1 - spam_prob:.0%} confidence)")
|
|
|
|
if is_foreign:
|
|
reasons.append(f"Unexpected language: {language} (not DE/EN)")
|
|
|
|
return "; ".join(reasons)
|
|
|
|
|
|
@app.post("/classify", response_model=ClassifyResponse)
|
|
async def classify(request: ClassifyRequest):
|
|
# Kombiniere Mail-Felder zu einem Text
|
|
text = f"From: {request.from_addr}\nSubject: {request.subject}\n\n{request.body}"
|
|
|
|
# Spracherkennung auf dem Body (Subject ist oft zu kurz)
|
|
lang_text = request.body if request.body else request.subject
|
|
language, is_foreign = detect_language(lang_text)
|
|
|
|
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, padding=True)
|
|
inputs = {k: v.to(device) for k, v in inputs.items()}
|
|
|
|
with torch.no_grad():
|
|
outputs = model(**inputs)
|
|
probs = torch.softmax(outputs.logits, dim=-1)
|
|
spam_prob = probs[0][1].item()
|
|
|
|
# Konvertiere Wahrscheinlichkeit zu RSpamd-Score (0-15 Skala)
|
|
rspamd_score = spam_prob * 15.0
|
|
|
|
# Fremdsprachen-Bonus: Nicht DE/EN bekommt extra Punkte
|
|
lang_bonus = FOREIGN_LANG_BONUS if is_foreign else 0.0
|
|
rspamd_score = min(rspamd_score + lang_bonus, 15.0)
|
|
|
|
# Spam-Schwelle nach Bonus neu bewerten
|
|
effective_spam = spam_prob > 0.5 or (is_foreign and spam_prob > 0.3)
|
|
|
|
reason = build_reason(spam_prob, is_foreign, language)
|
|
quote = find_spam_quote(request.subject, request.body) if effective_spam else ""
|
|
|
|
return ClassifyResponse(
|
|
is_spam=effective_spam,
|
|
confidence=spam_prob,
|
|
score=round(rspamd_score, 2),
|
|
language=language,
|
|
foreign_lang_bonus=lang_bonus,
|
|
reason=reason,
|
|
quote=quote,
|
|
)
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok", "model_loaded": model is not None}
|