spamBERT/export_rspamd_data.py
Carsten Abele 38efd20b4d Initial commit: SpamLLM - DistilBERT spam classifier for RSpamd
Multilingual spam classifier (DE/EN) with language detection.
Non-DE/EN mails receive an additional spam score bonus.

- train.py: Fine-tune distilbert-base-multilingual-cased on spam/ham data
- server.py: FastAPI service with langdetect integration
- rspamd/: Lua plugin and config for RSpamd integration
- export_rspamd_data.py: Export Maildir folders to CSV training data
- test_classify.py: Local model validation with DE/EN/foreign test cases

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 22:27:05 +01:00

136 lines
4.4 KiB
Python

"""
Exportiert Mails aus Maildir-Ordnern als Trainingsdaten für SpamLLM.
Erwartet eine typische Maildir-Struktur:
- Spam-Ordner: z.B. ~/.spam/ oder /var/vmail/user/.Junk/
- Ham-Ordner: z.B. ~/Maildir/ oder /var/vmail/user/.INBOX/
Erzeugt: data/train.csv mit Spalten: text, labels (0=ham, 1=spam)
"""
import argparse
import csv
import email
import email.policy
import os
import random
from pathlib import Path
def extract_text_from_email(filepath: str) -> dict | None:
"""Extrahiert Subject und Body aus einer E-Mail-Datei."""
try:
with open(filepath, "rb") as f:
msg = email.message_from_binary_file(f, policy=email.policy.default)
subject = msg.get("Subject", "")
from_addr = msg.get("From", "")
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
content = part.get_content()
if isinstance(content, str):
body += content
else:
content = msg.get_content()
if isinstance(content, str):
body = content
# Auf sinnvolle Länge begrenzen
body = body[:4096]
if not subject and not body:
return None
text = f"From: {from_addr}\nSubject: {subject}\n\n{body}"
return {"text": text}
except Exception as e:
print(f" Skipping {filepath}: {e}")
return None
def collect_mails(directory: str, label: int, max_count: int = 0) -> list[dict]:
"""Sammelt Mails aus einem Maildir-Verzeichnis."""
results = []
mail_dir = Path(directory)
if not mail_dir.exists():
print(f"WARNING: {directory} existiert nicht!")
return results
# Maildir hat typischerweise cur/, new/, tmp/ Unterordner
search_dirs = [mail_dir]
for subdir in ["cur", "new"]:
sub = mail_dir / subdir
if sub.exists():
search_dirs.append(sub)
files = []
for search_dir in search_dirs:
for f in search_dir.iterdir():
if f.is_file() and not f.name.startswith("."):
files.append(f)
if max_count > 0 and len(files) > max_count:
random.shuffle(files)
files = files[:max_count]
label_name = "spam" if label == 1 else "ham"
print(f"Processing {len(files)} {label_name} mails from {directory}...")
for filepath in files:
extracted = extract_text_from_email(str(filepath))
if extracted:
extracted["labels"] = label
results.append(extracted)
return results
def main():
parser = argparse.ArgumentParser(description="Export Maildir to CSV training data")
parser.add_argument("--spam-dir", required=True, help="Path to spam Maildir")
parser.add_argument("--ham-dir", required=True, help="Path to ham Maildir")
parser.add_argument("--output", default="data/train.csv", help="Output CSV path")
parser.add_argument("--max-per-class", type=int, default=0, help="Max mails per class (0=all)")
parser.add_argument("--test-split", type=float, default=0.2, help="Test set ratio")
args = parser.parse_args()
spam_mails = collect_mails(args.spam_dir, label=1, max_count=args.max_per_class)
ham_mails = collect_mails(args.ham_dir, label=0, max_count=args.max_per_class)
all_mails = spam_mails + ham_mails
random.shuffle(all_mails)
print(f"\nTotal: {len(all_mails)} mails ({len(spam_mails)} spam, {len(ham_mails)} ham)")
# Train/Test Split
split_idx = int(len(all_mails) * (1 - args.test_split))
train_data = all_mails[:split_idx]
test_data = all_mails[split_idx:]
# Verzeichnis erstellen
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Train CSV
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["text", "labels"])
writer.writeheader()
writer.writerows(train_data)
print(f"Train set: {len(train_data)} mails -> {output_path}")
# Test CSV
test_path = output_path.parent / "test.csv"
with open(test_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["text", "labels"])
writer.writeheader()
writer.writerows(test_data)
print(f"Test set: {len(test_data)} mails -> {test_path}")
if __name__ == "__main__":
main()