Files

181 lines
5.8 KiB
Python

# test_import_msg.py — pokusný import .msg do schránky přes Graph API
# Parsuje .msg soubor a vytvoří zprávu v Inbox cílové schránky.
import base64
import msal
import requests
import extract_msg
import sys
from pathlib import Path
# === CONFIG ===
TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
MAILBOX = "vladimir.buzalka@buzalka.cz"
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPE = ["https://graph.microsoft.com/.default"]
GRAPH_URL = "https://graph.microsoft.com/v1.0"
TARGET_FOLDER = "JNJ" # subfolder under Inbox
# === MSG FILE ===
MSG_PATH = Path(__file__).parent / "FC130007ACFE5DCB0000.msg"
def get_token():
app = msal.ConfidentialClientApplication(
CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET
)
token = app.acquire_token_for_client(scopes=SCOPE)
if "access_token" not in token:
raise RuntimeError(f"Auth failed: {token}")
return token["access_token"]
def parse_msg(path):
"""Parse .msg file and return dict with message properties."""
msg = extract_msg.Message(str(path))
# Read all properties before closing
subject = msg.subject or "(no subject)"
body_html = msg.htmlBody
if isinstance(body_html, bytes):
body_html = body_html.decode("utf-8", errors="replace")
body_text = msg.body or ""
sender_email = msg.sender or ""
sender_name = getattr(msg, "senderName", None) or sender_email
to_raw = msg.to or ""
cc_raw = msg.cc or ""
date_raw = msg.date
att_list = []
for att in msg.attachments:
if att.data and att.longFilename:
att_list.append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": att.longFilename,
"contentType": getattr(att, "mimetype", None) or "application/octet-stream",
"contentBytes": base64.b64encode(att.data).decode(),
})
msg.close()
# Process after close
to_list = [a.strip() for a in to_raw.split(";") if a.strip()]
cc_list = [a.strip() for a in cc_raw.split(";") if a.strip()]
received = str(date_raw) if date_raw else None
return {
"subject": subject,
"body_html": body_html,
"body_text": body_text,
"sender_email": sender_email,
"sender_name": sender_name,
"to": to_list,
"cc": cc_list,
"received": received,
"attachments": att_list,
}
def make_recipient(addr):
"""Create Graph API recipient object from email address."""
# Handle 'Name <email>' format
if "<" in addr and ">" in addr:
name = addr[:addr.index("<")].strip().strip('"')
email = addr[addr.index("<") + 1 : addr.index(">")].strip()
else:
name = addr
email = addr
return {"emailAddress": {"name": name, "address": email}}
def import_msg(msg_path):
token = get_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
print(f"Parsing: {msg_path}")
data = parse_msg(msg_path)
print(f" Subject: {data['subject']}")
print(f" From: {data['sender_name']} <{data['sender_email']}>")
print(f" To: {data['to']}")
print(f" Date: {data['received']}")
print(f" Attachments: {len(data['attachments'])}")
# 1. Create message in mailFolder (Inbox)
payload = {
"subject": data["subject"],
"body": {
"contentType": "HTML" if data["body_html"] else "Text",
"content": data["body_html"] or data["body_text"],
},
"from": make_recipient(
f"{data['sender_name']} <{data['sender_email']}>"
),
"toRecipients": [make_recipient(a) for a in data["to"]],
"ccRecipients": [make_recipient(a) for a in data["cc"]],
"isRead": True,
# PR_MESSAGE_FLAGS (0x0E07) = 1 → read, NOT draft (without MSGFLAG_UNSENT=0x08)
"singleValueExtendedProperties": [
{
"id": "Integer 0x0E07",
"value": "1",
}
],
}
if data["received"]:
# Graph API expects ISO 8601 UTC format
from datetime import datetime, timezone
try:
from dateutil import parser as dtparser
dt = dtparser.parse(data["received"])
payload["receivedDateTime"] = dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception as e:
print(f" Warning: cannot parse date '{data['received']}': {e}")
if data["attachments"]:
payload["attachments"] = data["attachments"]
# Find target folder (Inbox/JNJ)
folder_url = f"{GRAPH_URL}/users/{MAILBOX}/mailFolders/Inbox/childFolders"
r_folders = requests.get(folder_url, headers=headers, timeout=15)
folder_id = None
for f in r_folders.json().get("value", []):
if f["displayName"].lower() == TARGET_FOLDER.lower():
folder_id = f["id"]
break
if not folder_id:
# Create the folder if it doesn't exist
r_create = requests.post(
folder_url, headers=headers,
json={"displayName": TARGET_FOLDER}, timeout=15
)
folder_id = r_create.json()["id"]
print(f" Created folder '{TARGET_FOLDER}'")
url = f"{GRAPH_URL}/users/{MAILBOX}/mailFolders/{folder_id}/messages"
print(f"\nPOST -> Inbox/{TARGET_FOLDER}")
r = requests.post(url, headers=headers, json=payload, timeout=30)
if r.status_code in (200, 201):
msg_id = r.json().get("id", "?")
print(f" OK! Message created, id={msg_id[:40]}...")
return r.json()
else:
print(f" FAILED [{r.status_code}]: {r.text[:500]}")
return None
if __name__ == "__main__":
path = sys.argv[1] if len(sys.argv) > 1 else MSG_PATH
import_msg(Path(path))