# test_import_msg.py — pokusný import .msg do schránky přes Graph API # Parsuje .msg soubor a vytvoří zprávu v Inbox cílové schránky. import base64 import msal import requests import extract_msg import sys from pathlib import Path # === CONFIG === TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9" CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f" CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk" MAILBOX = "vladimir.buzalka@buzalka.cz" AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}" SCOPE = ["https://graph.microsoft.com/.default"] GRAPH_URL = "https://graph.microsoft.com/v1.0" TARGET_FOLDER = "JNJ" # subfolder under Inbox # === MSG FILE === MSG_PATH = Path(__file__).parent / "FC130007ACFE5DCB0000.msg" def get_token(): app = msal.ConfidentialClientApplication( CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET ) token = app.acquire_token_for_client(scopes=SCOPE) if "access_token" not in token: raise RuntimeError(f"Auth failed: {token}") return token["access_token"] def parse_msg(path): """Parse .msg file and return dict with message properties.""" msg = extract_msg.Message(str(path)) # Read all properties before closing subject = msg.subject or "(no subject)" body_html = msg.htmlBody if isinstance(body_html, bytes): body_html = body_html.decode("utf-8", errors="replace") body_text = msg.body or "" sender_email = msg.sender or "" sender_name = getattr(msg, "senderName", None) or sender_email to_raw = msg.to or "" cc_raw = msg.cc or "" date_raw = msg.date att_list = [] for att in msg.attachments: if att.data and att.longFilename: att_list.append({ "@odata.type": "#microsoft.graph.fileAttachment", "name": att.longFilename, "contentType": getattr(att, "mimetype", None) or "application/octet-stream", "contentBytes": base64.b64encode(att.data).decode(), }) msg.close() # Process after close to_list = [a.strip() for a in to_raw.split(";") if a.strip()] cc_list = [a.strip() for a in cc_raw.split(";") if a.strip()] received = str(date_raw) if date_raw else None return { "subject": subject, "body_html": body_html, "body_text": body_text, "sender_email": sender_email, "sender_name": sender_name, "to": to_list, "cc": cc_list, "received": received, "attachments": att_list, } def make_recipient(addr): """Create Graph API recipient object from email address.""" # Handle 'Name ' format if "<" in addr and ">" in addr: name = addr[:addr.index("<")].strip().strip('"') email = addr[addr.index("<") + 1 : addr.index(">")].strip() else: name = addr email = addr return {"emailAddress": {"name": name, "address": email}} def import_msg(msg_path): token = get_token() headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } print(f"Parsing: {msg_path}") data = parse_msg(msg_path) print(f" Subject: {data['subject']}") print(f" From: {data['sender_name']} <{data['sender_email']}>") print(f" To: {data['to']}") print(f" Date: {data['received']}") print(f" Attachments: {len(data['attachments'])}") # 1. Create message in mailFolder (Inbox) payload = { "subject": data["subject"], "body": { "contentType": "HTML" if data["body_html"] else "Text", "content": data["body_html"] or data["body_text"], }, "from": make_recipient( f"{data['sender_name']} <{data['sender_email']}>" ), "toRecipients": [make_recipient(a) for a in data["to"]], "ccRecipients": [make_recipient(a) for a in data["cc"]], "isRead": True, # PR_MESSAGE_FLAGS (0x0E07) = 1 → read, NOT draft (without MSGFLAG_UNSENT=0x08) "singleValueExtendedProperties": [ { "id": "Integer 0x0E07", "value": "1", } ], } if data["received"]: # Graph API expects ISO 8601 UTC format from datetime import datetime, timezone try: from dateutil import parser as dtparser dt = dtparser.parse(data["received"]) payload["receivedDateTime"] = dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f" Warning: cannot parse date '{data['received']}': {e}") if data["attachments"]: payload["attachments"] = data["attachments"] # Find target folder (Inbox/JNJ) folder_url = f"{GRAPH_URL}/users/{MAILBOX}/mailFolders/Inbox/childFolders" r_folders = requests.get(folder_url, headers=headers, timeout=15) folder_id = None for f in r_folders.json().get("value", []): if f["displayName"].lower() == TARGET_FOLDER.lower(): folder_id = f["id"] break if not folder_id: # Create the folder if it doesn't exist r_create = requests.post( folder_url, headers=headers, json={"displayName": TARGET_FOLDER}, timeout=15 ) folder_id = r_create.json()["id"] print(f" Created folder '{TARGET_FOLDER}'") url = f"{GRAPH_URL}/users/{MAILBOX}/mailFolders/{folder_id}/messages" print(f"\nPOST -> Inbox/{TARGET_FOLDER}") r = requests.post(url, headers=headers, json=payload, timeout=30) if r.status_code in (200, 201): msg_id = r.json().get("id", "?") print(f" OK! Message created, id={msg_id[:40]}...") return r.json() else: print(f" FAILED [{r.status_code}]: {r.text[:500]}") return None if __name__ == "__main__": path = sys.argv[1] if len(sys.argv) > 1 else MSG_PATH import_msg(Path(path))