Multilingual spam classifier (DE/EN) with language detection. Non-DE/EN mails receive an additional spam score bonus. - train.py: Fine-tune distilbert-base-multilingual-cased on spam/ham data - server.py: FastAPI service with langdetect integration - rspamd/: Lua plugin and config for RSpamd integration - export_rspamd_data.py: Export Maildir folders to CSV training data - test_classify.py: Local model validation with DE/EN/foreign test cases Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
136 lines
4.4 KiB
Python
136 lines
4.4 KiB
Python
"""
|
|
Exportiert Mails aus Maildir-Ordnern als Trainingsdaten für SpamLLM.
|
|
|
|
Erwartet eine typische Maildir-Struktur:
|
|
- Spam-Ordner: z.B. ~/.spam/ oder /var/vmail/user/.Junk/
|
|
- Ham-Ordner: z.B. ~/Maildir/ oder /var/vmail/user/.INBOX/
|
|
|
|
Erzeugt: data/train.csv mit Spalten: text, labels (0=ham, 1=spam)
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import email
|
|
import email.policy
|
|
import os
|
|
import random
|
|
from pathlib import Path
|
|
|
|
|
|
def extract_text_from_email(filepath: str) -> dict | None:
|
|
"""Extrahiert Subject und Body aus einer E-Mail-Datei."""
|
|
try:
|
|
with open(filepath, "rb") as f:
|
|
msg = email.message_from_binary_file(f, policy=email.policy.default)
|
|
|
|
subject = msg.get("Subject", "")
|
|
from_addr = msg.get("From", "")
|
|
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain":
|
|
content = part.get_content()
|
|
if isinstance(content, str):
|
|
body += content
|
|
else:
|
|
content = msg.get_content()
|
|
if isinstance(content, str):
|
|
body = content
|
|
|
|
# Auf sinnvolle Länge begrenzen
|
|
body = body[:4096]
|
|
|
|
if not subject and not body:
|
|
return None
|
|
|
|
text = f"From: {from_addr}\nSubject: {subject}\n\n{body}"
|
|
return {"text": text}
|
|
|
|
except Exception as e:
|
|
print(f" Skipping {filepath}: {e}")
|
|
return None
|
|
|
|
|
|
def collect_mails(directory: str, label: int, max_count: int = 0) -> list[dict]:
|
|
"""Sammelt Mails aus einem Maildir-Verzeichnis."""
|
|
results = []
|
|
mail_dir = Path(directory)
|
|
|
|
if not mail_dir.exists():
|
|
print(f"WARNING: {directory} existiert nicht!")
|
|
return results
|
|
|
|
# Maildir hat typischerweise cur/, new/, tmp/ Unterordner
|
|
search_dirs = [mail_dir]
|
|
for subdir in ["cur", "new"]:
|
|
sub = mail_dir / subdir
|
|
if sub.exists():
|
|
search_dirs.append(sub)
|
|
|
|
files = []
|
|
for search_dir in search_dirs:
|
|
for f in search_dir.iterdir():
|
|
if f.is_file() and not f.name.startswith("."):
|
|
files.append(f)
|
|
|
|
if max_count > 0 and len(files) > max_count:
|
|
random.shuffle(files)
|
|
files = files[:max_count]
|
|
|
|
label_name = "spam" if label == 1 else "ham"
|
|
print(f"Processing {len(files)} {label_name} mails from {directory}...")
|
|
|
|
for filepath in files:
|
|
extracted = extract_text_from_email(str(filepath))
|
|
if extracted:
|
|
extracted["labels"] = label
|
|
results.append(extracted)
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Export Maildir to CSV training data")
|
|
parser.add_argument("--spam-dir", required=True, help="Path to spam Maildir")
|
|
parser.add_argument("--ham-dir", required=True, help="Path to ham Maildir")
|
|
parser.add_argument("--output", default="data/train.csv", help="Output CSV path")
|
|
parser.add_argument("--max-per-class", type=int, default=0, help="Max mails per class (0=all)")
|
|
parser.add_argument("--test-split", type=float, default=0.2, help="Test set ratio")
|
|
args = parser.parse_args()
|
|
|
|
spam_mails = collect_mails(args.spam_dir, label=1, max_count=args.max_per_class)
|
|
ham_mails = collect_mails(args.ham_dir, label=0, max_count=args.max_per_class)
|
|
|
|
all_mails = spam_mails + ham_mails
|
|
random.shuffle(all_mails)
|
|
|
|
print(f"\nTotal: {len(all_mails)} mails ({len(spam_mails)} spam, {len(ham_mails)} ham)")
|
|
|
|
# Train/Test Split
|
|
split_idx = int(len(all_mails) * (1 - args.test_split))
|
|
train_data = all_mails[:split_idx]
|
|
test_data = all_mails[split_idx:]
|
|
|
|
# Verzeichnis erstellen
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Train CSV
|
|
with open(output_path, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=["text", "labels"])
|
|
writer.writeheader()
|
|
writer.writerows(train_data)
|
|
print(f"Train set: {len(train_data)} mails -> {output_path}")
|
|
|
|
# Test CSV
|
|
test_path = output_path.parent / "test.csv"
|
|
with open(test_path, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=["text", "labels"])
|
|
writer.writeheader()
|
|
writer.writerows(test_data)
|
|
print(f"Test set: {len(test_data)} mails -> {test_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|