""" Exportiert Mails aus Maildir-Ordnern als Trainingsdaten für SpamLLM. Erwartet eine typische Maildir-Struktur: - Spam-Ordner: z.B. ~/.spam/ oder /var/vmail/user/.Junk/ - Ham-Ordner: z.B. ~/Maildir/ oder /var/vmail/user/.INBOX/ Erzeugt: data/train.csv mit Spalten: text, labels (0=ham, 1=spam) """ import argparse import csv import email import email.policy import os import random from pathlib import Path def extract_text_from_email(filepath: str) -> dict | None: """Extrahiert Subject und Body aus einer E-Mail-Datei.""" try: with open(filepath, "rb") as f: msg = email.message_from_binary_file(f, policy=email.policy.default) subject = msg.get("Subject", "") from_addr = msg.get("From", "") body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": content = part.get_content() if isinstance(content, str): body += content else: content = msg.get_content() if isinstance(content, str): body = content # Auf sinnvolle Länge begrenzen body = body[:4096] if not subject and not body: return None text = f"From: {from_addr}\nSubject: {subject}\n\n{body}" return {"text": text} except Exception as e: print(f" Skipping {filepath}: {e}") return None def collect_mails(directory: str, label: int, max_count: int = 0) -> list[dict]: """Sammelt Mails aus einem Maildir-Verzeichnis.""" results = [] mail_dir = Path(directory) if not mail_dir.exists(): print(f"WARNING: {directory} existiert nicht!") return results # Maildir hat typischerweise cur/, new/, tmp/ Unterordner search_dirs = [mail_dir] for subdir in ["cur", "new"]: sub = mail_dir / subdir if sub.exists(): search_dirs.append(sub) files = [] for search_dir in search_dirs: for f in search_dir.iterdir(): if f.is_file() and not f.name.startswith("."): files.append(f) if max_count > 0 and len(files) > max_count: random.shuffle(files) files = files[:max_count] label_name = "spam" if label == 1 else "ham" print(f"Processing {len(files)} {label_name} mails from {directory}...") for filepath in files: extracted = extract_text_from_email(str(filepath)) if extracted: extracted["labels"] = label results.append(extracted) return results def main(): parser = argparse.ArgumentParser(description="Export Maildir to CSV training data") parser.add_argument("--spam-dir", required=True, help="Path to spam Maildir") parser.add_argument("--ham-dir", required=True, help="Path to ham Maildir") parser.add_argument("--output", default="data/train.csv", help="Output CSV path") parser.add_argument("--max-per-class", type=int, default=0, help="Max mails per class (0=all)") parser.add_argument("--test-split", type=float, default=0.2, help="Test set ratio") args = parser.parse_args() spam_mails = collect_mails(args.spam_dir, label=1, max_count=args.max_per_class) ham_mails = collect_mails(args.ham_dir, label=0, max_count=args.max_per_class) all_mails = spam_mails + ham_mails random.shuffle(all_mails) print(f"\nTotal: {len(all_mails)} mails ({len(spam_mails)} spam, {len(ham_mails)} ham)") # Train/Test Split split_idx = int(len(all_mails) * (1 - args.test_split)) train_data = all_mails[:split_idx] test_data = all_mails[split_idx:] # Verzeichnis erstellen output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) # Train CSV with open(output_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["text", "labels"]) writer.writeheader() writer.writerows(train_data) print(f"Train set: {len(train_data)} mails -> {output_path}") # Test CSV test_path = output_path.parent / "test.csv" with open(test_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["text", "labels"]) writer.writeheader() writer.writerows(test_data) print(f"Test set: {len(test_data)} mails -> {test_path}") if __name__ == "__main__": main()