Merge remote-tracking branch 'origin/master'

This commit is contained in:
2026-06-16 19:38:58 +02:00
147 changed files with 32915 additions and 15 deletions
@@ -0,0 +1,619 @@
# app.py | v2.3 | 2026-06-10
# FastAPI server pro příjem .msg a .db souborů, upload do Dropboxu a import do Graph API.
# Endpointy: /upload (.msg/.emsg → /msgs + Graph import),
# /upload-db (.db NEBO .db.xz.enc → Fernet desifruj + lzma rozbal → /msgs/db),
# /upload-dropbox (→ Dropbox /!!!Days/Downloads Z230),
# /message-delete, /message-update (sync: smazání, přečtení, přesun složky),
# /mirror-plan (diff manifestu z JNJ vůči schránce → smaže přebytky, vrátí to_add),
# /status (seznam souborů k odeslání na JNJ — jména zašifrována Fernetem),
# /item/{enc_filename} (stažení souboru — enc_filename je Fernet token;
# Accept: application/json → {"data": fernet_b64}, jinak binárka).
from fastapi import FastAPI, Request, UploadFile, File, Form, Header, HTTPException, Response
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import base64
import hashlib
import logging
import lzma
from pathlib import Path
from typing import Optional
from urllib.parse import quote
import os
import dropbox
import msal
import requests as http_requests
import extract_msg
from dateutil import parser as dtparser
from datetime import timezone
from dotenv import load_dotenv
from cryptography.fernet import Fernet
load_dotenv(Path(__file__).parent / ".env")
app = FastAPI()
log = logging.getLogger("msgreceiver")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
TOKEN = "13e1bb01-9fd5-44a8-8ce9-4ee27133d340"
# Šifrovací klíč odvozený z TOKENu (Fernet = AES-128 CBC + HMAC)
_FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(TOKEN.encode()).digest()))
SAVE_DIR = Path("/msgs")
DB_DIR = Path("/msgs/db")
SAVE_DIR.mkdir(parents=True, exist_ok=True)
DB_DIR.mkdir(parents=True, exist_ok=True)
DROPBOX_APP_KEY = os.getenv("DROPBOX_APP_KEY", "")
DROPBOX_APP_SECRET = os.getenv("DROPBOX_APP_SECRET", "")
DROPBOX_REFRESH_TOKEN = os.getenv("DROPBOX_APP_REFRESH_TOKEN", "")
# --- Graph API config ---
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_MAILBOX = "vladimir.buzalka@buzalka.cz"
GRAPH_ROOT_FOLDER = "JNJ" # subfolder under Inbox — root for imported emails
DROPBOX_UPLOAD_TO_JNJ = "/!!!Days/Downloads Z230/UploadToJNJ"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
# Cache: folder path → Graph folder ID
_folder_id_cache: dict[str, str] = {}
_graph_token: Optional[str] = None
def _get_graph_token() -> str:
global _graph_token
msalapp = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
result = msalapp.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in result:
raise RuntimeError(f"Graph auth failed: {result}")
_graph_token = result["access_token"]
return _graph_token
def _graph_headers() -> dict:
token = _graph_token or _get_graph_token()
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def _ensure_folder(path_parts: list[str]) -> str:
"""Ensure folder hierarchy exists under Inbox, return leaf folder ID."""
cache_key = "/".join(path_parts)
if cache_key in _folder_id_cache:
return _folder_id_cache[cache_key]
headers = _graph_headers()
parent_id = "Inbox"
for i, part in enumerate(path_parts):
partial_key = "/".join(path_parts[: i + 1])
if partial_key in _folder_id_cache:
parent_id = _folder_id_cache[partial_key]
continue
# List children of parent
if parent_id == "Inbox":
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/Inbox/childFolders"
else:
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{parent_id}/childFolders"
r = http_requests.get(url, headers=headers, timeout=15)
if r.status_code == 401:
_get_graph_token()
headers = _graph_headers()
r = http_requests.get(url, headers=headers, timeout=15)
found = None
for f in r.json().get("value", []):
if f["displayName"].lower() == part.lower():
found = f["id"]
break
if not found:
# Create folder
cr = http_requests.post(url, headers=headers, json={"displayName": part}, timeout=15)
if cr.status_code in (200, 201):
found = cr.json()["id"]
elif cr.status_code == 409:
# Already exists (race condition) — re-fetch
r2 = http_requests.get(url, headers=headers, timeout=15)
for f in r2.json().get("value", []):
if f["displayName"].lower() == part.lower():
found = f["id"]
break
if not found:
raise RuntimeError(f"Cannot create folder '{part}': {cr.text}")
_folder_id_cache[partial_key] = found
parent_id = found
return parent_id
def _map_jnj_folder(folder: str) -> list[str]:
"""Map JNJ folder path to Graph folder parts under JNJ root.
'/vbuzalka@its.jnj.com/Inbox/TMP' → ['JNJ', 'Inbox', 'TMP']
'/Online Archive - vbuzalka@its.jnj.com/Inbox' → ['JNJ', 'Online Archive', 'Inbox']
"""
parts = [p for p in folder.split("/") if p]
if not parts:
return [GRAPH_ROOT_FOLDER]
# First part is mailbox name — strip it but detect Online Archive
mailbox = parts[0]
rest = parts[1:]
prefix = [GRAPH_ROOT_FOLDER]
if "online archive" in mailbox.lower():
prefix.append("Online Archive")
return prefix + rest if rest else prefix
def _norm_mid(mid: str) -> str:
"""Normalizuj Internet Message-ID pro porovnání (osekej <> a whitespace)."""
return (mid or "").strip().strip("<>").strip()
def _enumerate_jnj_mailbox(cutoff_iso: str) -> dict[str, str]:
"""Vrať {normalizované internetMessageId: graph_id} pro všechny zprávy ve
složkách JNJ/* schránky, které mají receivedDateTime >= cutoff_iso.
Slouží jako 'co už ve schránce je' pro mirror diff. Starší zprávy než cutoff
(např. únorový archiv) se nenačtou — mirror se jich tedy nikdy nedotkne.
"""
jnj_id = _ensure_folder([GRAPH_ROOT_FOLDER])
# BFS přes JNJ root + všechny podsložky
all_folders = [jnj_id]
i = 0
while i < len(all_folders):
fid = all_folders[i]
i += 1
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{fid}/childFolders?$top=100"
while url:
r = _retry_graph(http_requests.get, url, _graph_headers, timeout=20)
data = r.json()
for f in data.get("value", []):
all_folders.append(f["id"])
url = data.get("@odata.nextLink")
# Posbírej message-id z každé složky (filtrováno na okno)
result: dict[str, str] = {}
cutoff_enc = cutoff_iso.replace(":", "%3A")
for fid in all_folders:
url = (
f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{fid}/messages"
f"?$filter=receivedDateTime ge {cutoff_enc}"
f"&$select=id,internetMessageId&$top=200"
)
while url:
r = _retry_graph(http_requests.get, url, _graph_headers, timeout=30)
data = r.json()
for m in data.get("value", []):
mid = _norm_mid(m.get("internetMessageId", ""))
if mid:
result[mid] = m["id"]
url = data.get("@odata.nextLink")
return result
def _make_recipient(addr: str) -> dict:
if "<" in addr and ">" in addr:
name = addr[: addr.index("<")].strip().strip('"')
email = addr[addr.index("<") + 1 : addr.index(">")].strip()
else:
name = addr
email = addr
return {"emailAddress": {"name": name, "address": email}}
def _import_msg_to_graph(msg_path: Path, folder: str) -> Optional[str]:
"""Parse .msg and import into Graph API mailbox. Returns message ID or None."""
try:
msg = extract_msg.Message(str(msg_path))
subject = msg.subject or "(no subject)"
# Čtení těla — extract_msg může selhat na nestandartním kódování (cp1252 apod.)
try:
body_html = msg.htmlBody
if isinstance(body_html, bytes):
body_html = body_html.decode("utf-8", errors="replace")
except Exception:
body_html = None
try:
body_text = msg.body or ""
except Exception:
body_text = ""
try:
sender_email = msg.sender or ""
except Exception:
sender_email = ""
try:
sender_name = getattr(msg, "senderName", None) or sender_email
except Exception:
sender_name = sender_email
try:
to_raw = msg.to or ""
except Exception:
to_raw = ""
try:
cc_raw = msg.cc or ""
except Exception:
cc_raw = ""
try:
date_raw = msg.date
except Exception:
date_raw = None
att_list = []
for att in msg.attachments:
if att.data and att.longFilename:
att_list.append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": att.longFilename,
"contentType": getattr(att, "mimetype", None) or "application/octet-stream",
"contentBytes": base64.b64encode(att.data).decode(),
})
msg.close()
to_list = [a.strip() for a in to_raw.split(";") if a.strip()]
cc_list = [a.strip() for a in cc_raw.split(";") if a.strip()]
# Map folder and ensure it exists
folder_parts = _map_jnj_folder(folder)
folder_id = _ensure_folder(folder_parts)
ext_props = [{"id": "Integer 0x0E07", "value": "1"}]
if date_raw:
try:
dt = dtparser.parse(str(date_raw))
dt_str = dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# PR_MESSAGE_DELIVERY_TIME (0x0E06) — jediný způsob jak nastavit
# receivedDateTime přes Graph API (přímé pole je read-only)
ext_props.append({"id": "SystemTime 0x0E06", "value": dt_str})
except Exception:
dt_str = None
else:
dt_str = None
payload = {
"subject": subject,
"body": {
"contentType": "HTML" if body_html else "Text",
"content": body_html or body_text,
},
"from": _make_recipient(f"{sender_name} <{sender_email}>"),
"toRecipients": [_make_recipient(a) for a in to_list],
"ccRecipients": [_make_recipient(a) for a in cc_list],
"isRead": True,
"singleValueExtendedProperties": ext_props,
}
if dt_str:
payload["sentDateTime"] = dt_str
if att_list:
payload["attachments"] = att_list
headers = _graph_headers()
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{folder_id}/messages"
r = http_requests.post(url, headers=headers, json=payload, timeout=30)
if r.status_code == 401:
_get_graph_token()
headers = _graph_headers()
r = http_requests.post(url, headers=headers, json=payload, timeout=30)
if r.status_code in (200, 201):
msg_id = r.json().get("id", "")
log.info("Graph OK: %s%s", subject[:60], "/".join(folder_parts))
return msg_id
else:
log.error("Graph FAIL [%d]: %s | %s", r.status_code, subject[:60], r.text[:200])
return None
except Exception as e:
log.error("Graph import error for %s: %s", msg_path.name, e)
return None
@app.post("/upload")
async def upload_msg(
file: UploadFile = File(...),
authorization: str = Header(None),
folder: str = Form(""),
):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
is_encrypted = file.filename.endswith(".emsg")
if not file.filename.endswith(".msg") and not is_encrypted:
raise HTTPException(status_code=400, detail="Only .msg or .emsg files accepted")
# Ukládáme vždy jako .msg
msg_filename = file.filename[:-5] + ".msg" if is_encrypted else file.filename
dest = SAVE_DIR / msg_filename
if dest.exists():
return {"status": "exists", "file": msg_filename}
content = await file.read()
if is_encrypted:
content = _FERNET.decrypt(content)
with dest.open("wb") as f:
f.write(content)
# Import to Graph API if folder was provided by client
graph_id = None
if folder:
graph_id = _import_msg_to_graph(dest, folder)
return {
"status": "saved",
"file": msg_filename,
"graph_id": graph_id,
}
@app.post("/upload-db")
async def upload_db(
file: UploadFile = File(...),
authorization: str = Header(None)
):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
fn = file.filename or ""
is_enc = fn.endswith(".db.xz.enc") # jnj_mailbox_sync >= v1.2
if not (is_enc or fn.endswith(".db")):
raise HTTPException(status_code=400, detail="Only .db or .db.xz.enc files accepted")
content = await file.read()
if is_enc:
# Fernet desifra -> lzma rozbal -> plain .db (jako .emsg -> .msg u /upload)
content = lzma.decompress(_FERNET.decrypt(content))
db_filename = fn[: -len(".xz.enc")] # jnjemails_<ts>.db
else:
db_filename = fn
# Smazat stare AZ po uspesnem desifrovani/rozbaleni — pri chybe stara DB zustane.
for old in DB_DIR.glob("*.db"):
old.unlink()
dest = DB_DIR / db_filename
with dest.open("wb") as f:
f.write(content)
return {"status": "saved", "file": db_filename, "bytes": len(content), "encrypted": is_enc}
class MessageDeleteRequest(BaseModel):
graph_id: str
class MessageUpdateRequest(BaseModel):
graph_id: str
is_read: Optional[bool] = None
folder: Optional[str] = None
def _retry_graph(method, url, headers_fn, **kwargs):
"""Call Graph API, refresh token once on 401."""
headers = headers_fn()
r = method(url, headers=headers, **kwargs)
if r.status_code == 401:
_get_graph_token()
headers = headers_fn()
r = method(url, headers=headers, **kwargs)
return r
@app.post("/message-delete")
async def message_delete(req: MessageDeleteRequest, authorization: str = Header(None)):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{req.graph_id}"
r = _retry_graph(http_requests.delete, url, _graph_headers, timeout=15)
if r.status_code in (200, 204):
log.info("Graph DELETE OK: %s", req.graph_id)
return {"status": "deleted"}
raise HTTPException(status_code=500, detail=f"Graph DELETE failed: {r.status_code} {r.text[:200]}")
@app.post("/message-update")
async def message_update(req: MessageUpdateRequest, authorization: str = Header(None)):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
current_graph_id = req.graph_id
result: dict = {"status": "ok"}
# Move first — returns new graph_id which we use for subsequent read-status update
if req.folder:
folder_parts = _map_jnj_folder(req.folder)
folder_id = _ensure_folder(folder_parts)
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{current_graph_id}/move"
r = _retry_graph(http_requests.post, url, _graph_headers,
json={"destinationId": folder_id}, timeout=15)
if r.status_code in (200, 201):
current_graph_id = r.json().get("id", current_graph_id)
result["moved"] = True
log.info("Graph MOVE OK: %s%s", req.graph_id, "/".join(folder_parts))
else:
log.error("Graph MOVE FAIL [%d]: %s", r.status_code, r.text[:200])
result["moved"] = False
if req.is_read is not None:
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{current_graph_id}"
r = _retry_graph(http_requests.patch, url, _graph_headers,
json={"isRead": req.is_read}, timeout=15)
result["read_updated"] = r.status_code in (200, 201)
if not result["read_updated"]:
log.error("Graph PATCH isRead FAIL [%d]: %s", r.status_code, r.text[:200])
result["graph_id"] = current_graph_id
return result
class MirrorPlanRequest(BaseModel):
manifest: list[dict] # [{"message_id": ..., "folder": ..., "is_read": ...}]
cutoff: str # ISO8601 UTC, např. "2026-05-09T00:00:00Z"
@app.post("/mirror-plan")
async def mirror_plan(req: MirrorPlanRequest, authorization: str = Header(None)):
"""Porovná manifest zpráv z JNJ (posledních 30 dní) se stavem schránky.
- smaže ze schránky zprávy které v manifestu nejsou (smazané v JNJ / vypadlé z okna)
- vrátí to_add = message_id které ve schránce chybí (klient je pak nahraje na /upload)
Maže POUZE v rámci okna (cutoff) — starší archiv zůstává nedotčen.
"""
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
# manifest: normalizované id → původní message_id (pro echo zpět klientovi)
manifest_map: dict[str, str] = {}
for e in req.manifest:
mid = _norm_mid(e.get("message_id", ""))
if mid:
manifest_map[mid] = e["message_id"]
mailbox = _enumerate_jnj_mailbox(req.cutoff) # {norm_mid: graph_id}
to_add = [orig for nmid, orig in manifest_map.items() if nmid not in mailbox]
to_delete = [(nmid, gid) for nmid, gid in mailbox.items() if nmid not in manifest_map]
deleted = 0
for nmid, gid in to_delete:
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{gid}"
r = _retry_graph(http_requests.delete, url, _graph_headers, timeout=15)
if r.status_code in (200, 204):
deleted += 1
else:
log.error("mirror delete FAIL [%d]: %s", r.status_code, r.text[:150])
log.info(
"mirror-plan: manifest=%d mailbox=%d → add=%d delete=%d",
len(manifest_map), len(mailbox), len(to_add), deleted,
)
return {
"to_add": to_add,
"deleted": deleted,
"manifest_count": len(manifest_map),
"mailbox_count": len(mailbox),
}
@app.post("/upload-file")
async def upload_file(
file: UploadFile = File(...),
authorization: str = Header(None),
):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
if not DROPBOX_REFRESH_TOKEN:
raise HTTPException(status_code=500, detail="Dropbox not configured")
is_encrypted = file.filename.endswith(".enc")
orig_filename = file.filename[:-4] if is_encrypted else file.filename
raw = await file.read()
file_content = _FERNET.decrypt(raw) if is_encrypted else raw
dbx = dropbox.Dropbox(
app_key=DROPBOX_APP_KEY,
app_secret=DROPBOX_APP_SECRET,
oauth2_refresh_token=DROPBOX_REFRESH_TOKEN,
)
dropbox_path = f"/!!!Days/Downloads Z230/{orig_filename}"
dbx.files_upload(file_content, dropbox_path, mode=dropbox.files.WriteMode.overwrite)
return {"status": "uploaded", "file": orig_filename, "dropbox_path": dropbox_path}
@app.get("/status")
async def pending_files(authorization: str = Header(None)):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
dbx = dropbox.Dropbox(
app_key=DROPBOX_APP_KEY,
app_secret=DROPBOX_APP_SECRET,
oauth2_refresh_token=DROPBOX_REFRESH_TOKEN,
)
try:
result = dbx.files_list_folder(DROPBOX_UPLOAD_TO_JNJ)
files = [e.name for e in result.entries if isinstance(e, dropbox.files.FileMetadata)]
except Exception:
files = []
log.info("pending-files: %d souboru", len(files))
# Jména souborů zašifrujeme — klient vidí v URL jen neprůhledný token (bypass Zscaler)
encrypted_names = [_FERNET.encrypt(name.encode()).decode() for name in files]
return {"files": encrypted_names}
@app.get("/item/{filename:path}")
async def download_file(filename: str, request: Request, authorization: str = Header(None)):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
# filename je Fernet token (zašifrované původní jméno souboru)
try:
orig_filename = _FERNET.decrypt(filename.encode()).decode()
except Exception:
raise HTTPException(status_code=400, detail="Invalid filename token")
dbx = dropbox.Dropbox(
app_key=DROPBOX_APP_KEY,
app_secret=DROPBOX_APP_SECRET,
oauth2_refresh_token=DROPBOX_REFRESH_TOKEN,
)
dropbox_path = f"{DROPBOX_UPLOAD_TO_JNJ}/{orig_filename}"
try:
_, response = dbx.files_download(dropbox_path)
raw = response.content
except Exception as e:
log.error("download-file: nelze stáhnout %s: %s", filename, e)
raise HTTPException(status_code=404, detail=f"Soubor nenalezen: {filename}")
encrypted = _FERNET.encrypt(raw)
if "application/json" in (request.headers.get("accept") or ""):
# v2.3: klient >= v1.2 — obsah jako JSON, ne binární příloha. Korporátní
# filtr (Zscaler/SiteMinder) pak nevidí "stahování souboru" a nespouští
# AV sandbox, který binární odpovědi blokoval (403 + ?_sm_nck=1).
# Fernet token je sám o sobě urlsafe-base64 text → rovnou do JSON.
resp = JSONResponse(content={"data": encrypted.decode()})
else:
# Starý klient (<= v1.1) — binární odpověď jako dřív.
# HTTP hlavičky jsou latin-1 — jméno s ne-ASCII znaky (např. ▲▲) by shodilo
# Response na UnicodeEncodeError (500). ASCII fallback + RFC 5987 filename*.
# Klient hlavičku stejně nečte (jméno zná z dešifrovaného tokenu).
fname = f"{orig_filename}.enc"
ascii_fallback = fname.encode("ascii", "ignore").decode().replace('"', "") or "file.enc"
resp = Response(
content=encrypted,
media_type="application/octet-stream",
headers={"Content-Disposition":
f"attachment; filename=\"{ascii_fallback}\"; filename*=UTF-8''{quote(fname)}"},
)
# Přesun do Sent — až PO úspěšném sestavení odpovědi, aby případný pád
# neodstranil soubor z fronty UploadToJNJ dřív, než ho klient dostane.
sent_path = f"{DROPBOX_UPLOAD_TO_JNJ}/##Trash/{orig_filename}"
try:
dbx.files_move_v2(dropbox_path, sent_path, autorename=True)
log.info("download-file: %s přesunut do Sent", orig_filename)
except Exception as e:
log.warning("download-file: nelze přesunout %s do Sent: %s", orig_filename, e)
return resp
+11 -1
View File
@@ -58,7 +58,7 @@ Bearer token: `13e1bb01-9fd5-44a8-8ce9-4ee27133d340`
| Endpoint | Přijímá | Chování |
|---|---|---|
| `POST /upload` | `.msg` / `.emsg` | `.emsg` Fernet dešifruje → uloží `.msg` do `/msgs`, přeskočí pokud existuje; volitelně import do Graphu |
| `POST /upload` | `.msg` / `.emsg` | `.emsg` Fernet dešifruje → uloží `.msg` do `/msgs`, přeskočí pokud existuje; volitelně import do Graphu. **v2.4:** form pole `overwrite=1` → existující `.msg` **přepíše** (re-upload změněného e-mailu z `jnj_mailbox_sync >= v1.3`); při overwrite se Graph re-import nedělá |
| `POST /upload-db` | `.db` / `.db.xz.enc` | **v2.1:** `.db.xz.enc` Fernet dešifruje + lzma rozbalí → plain `.db`; pak smaže staré `.db` v `/msgs/db` a uloží. Plain `.db` bere i nadále (zpětná kompatibilita) |
| `POST /upload-dropbox` | cokoliv | Nahraje do Dropboxu (overwrite) |
@@ -68,6 +68,16 @@ Bearer token: `13e1bb01-9fd5-44a8-8ce9-4ee27133d340`
> (stdlib) — ověřeno v kontejneru. Nasazení = jen restart (app.py je bind-mount),
> bez rebuildu.
> **v2.4 (2026-06-16):** `/upload` — nové form pole `overwrite=1`. Když `.msg`
> už v `/msgs` existuje, místo `{"status":"exists"}` ho **přepíše** a vrátí
> `{"status":"overwritten"}`. Bez pole zůstává původní idempotentní skip (žádná
> regrese). Slouží pro re-upload **změněného** e-mailu z `jnj_mailbox_sync >= v1.3`
> (detekce změny obsahu — např. dopsaná chyba `SendAsDenied` do neodeslané Sent
> položky). Při overwrite se **Graph re-import nedělá** (klient posílá `folder=""`,
> takže nevznikne duplikát v Graph zrcadle); přepsaný soubor má novější mtime →
> Tower (`jnj_tower_ingest`) ho přeparsuje a upsertne dokument v Mongu dle `_id`.
> Nasazení = jen `docker restart` (bind-mount).
> **v2.3 (2026-06-10):** `/item/{token}` — při `Accept: application/json`
> (klient `janssenpc_file_receive >= v1.2`) vrací `{"data": "<fernet_b64>"}`
> místo binární přílohy. Důvod: JNJ filtr (Zscaler/SiteMinder) blokoval binární
+20 -6
View File
@@ -1,6 +1,12 @@
# app.py | v2.3 | 2026-06-10
# app.py | v2.4 | 2026-06-16
# FastAPI server pro příjem .msg a .db souborů, upload do Dropboxu a import do Graph API.
# Endpointy: /upload (.msg/.emsg → /msgs + Graph import),
# v2.4: /upload + form pole overwrite=1 — když .msg už existuje, PŘEPÍŠE ho (jinak
# jako dřív vrátí "exists"). Slouží pro re-upload změněného e-mailu z
# jnj_mailbox_sync >= v1.3 (detekce změny obsahu, např. dopsaná chyba
# SendAsDenied). Při overwrite se NEdělá Graph re-import (klient posílá
# folder="" → žádný duplikát v Graph zrcadle; jen se obnoví soubor v /msgs,
# Tower si ho přeparsuje a aktualizuje dokument v Mongu).
# Endpointy: /upload (.msg/.emsg → /msgs + Graph import; overwrite=1 přepíše),
# /upload-db (.db NEBO .db.xz.enc → Fernet desifruj + lzma rozbal → /msgs/db),
# /upload-dropbox (→ Dropbox /!!!Days/Downloads Z230),
# /message-delete, /message-update (sync: smazání, přečtení, přesun složky),
@@ -336,6 +342,7 @@ async def upload_msg(
file: UploadFile = File(...),
authorization: str = Header(None),
folder: str = Form(""),
overwrite: str = Form(""),
):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
@@ -347,7 +354,12 @@ async def upload_msg(
# Ukládáme vždy jako .msg
msg_filename = file.filename[:-5] + ".msg" if is_encrypted else file.filename
dest = SAVE_DIR / msg_filename
if dest.exists():
existed = dest.exists()
do_overwrite = overwrite in ("1", "true", "True", "yes")
# v2.4: bez overwrite zustava puvodni idempotentni skip; s overwrite=1
# prepiseme (re-upload zmeneneho e-mailu z jnj_mailbox_sync >= v1.3).
if existed and not do_overwrite:
return {"status": "exists", "file": msg_filename}
content = await file.read()
@@ -357,13 +369,15 @@ async def upload_msg(
with dest.open("wb") as f:
f.write(content)
# Import to Graph API if folder was provided by client
# Graph import jen pri PRVNIM ulozeni a kdyz klient poslal folder.
# Pri overwrite (re-upload) se Graph re-import NEdela — predesle by vznikl
# duplikat v Graph zrcadle; Tower si soubor preparsuje sam (upsert dle _id).
graph_id = None
if folder:
if folder and not existed:
graph_id = _import_msg_to_graph(dest, folder)
return {
"status": "saved",
"status": "overwritten" if (existed and do_overwrite) else "saved",
"file": msg_filename,
"graph_id": graph_id,
}
+29
View File
@@ -0,0 +1,29 @@
============================================================
SPUSTENI V JNJ — jnj_unsent_probe (diagnostika neodeslani)
Zkopiruj cely radek do cmd / PowerShell na JNJ stroji.
Skript JEN CTE, nic nezapisuje ani nenahrava.
============================================================
# 1) Sonda na HUSTAKA (vse, vcetne polozek s Message-ID) — klicovy test:
"C:\Users\vbuzalka\AppData\Local\Programs\Thonny\python.exe" "c:\Users\vbuzalka\OneDrive - JNJ\##JNJPrenos\Python\jnj_unsent_probe_v1.0.py" --to hustak --all
# 2) Sonda na celou kampan ICOTROKINRA (jen podezrele bez Message-ID, 60 dni):
"C:\Users\vbuzalka\AppData\Local\Programs\Thonny\python.exe" "c:\Users\vbuzalka\OneDrive - JNJ\##JNJPrenos\Python\jnj_unsent_probe_v1.0.py" --subject icotrokinra --days 60
# ------------------------------------------------------------
# TIP: vystup rovnou do souboru (pak mi ho posli):
# ------------------------------------------------------------
# 1b) hustak -> soubor na plochu:
"C:\Users\vbuzalka\AppData\Local\Programs\Thonny\python.exe" "c:\Users\vbuzalka\OneDrive - JNJ\##JNJPrenos\Python\jnj_unsent_probe_v1.0.py" --to hustak --all > "%USERPROFILE%\Desktop\probe_hustak.txt" 2>&1
# 2b) icotrokinra -> soubor na plochu:
"C:\Users\vbuzalka\AppData\Local\Programs\Thonny\python.exe" "c:\Users\vbuzalka\OneDrive - JNJ\##JNJPrenos\Python\jnj_unsent_probe_v1.0.py" --subject icotrokinra --days 60 > "%USERPROFILE%\Desktop\probe_icotrokinra.txt" 2>&1
# ============================================================
# (VOLITELNE) jnj_mailbox_sync v1.4 — refresh vc. slozky Archive
# ============================================================
"C:\Users\vbuzalka\AppData\Local\Programs\Thonny\python.exe" "c:\Users\vbuzalka\OneDrive - JNJ\##JNJPrenos\Python\jnj_mailbox_sync_v1.5.py" --mode full-update --days 0
@@ -0,0 +1,73 @@
# jnj_mailbox_sync v1.3.0
**Soubor:** `jnj_mailbox_sync_v1.3.py`
**Datum:** 2026-06-16
**Autor:** vladimir.buzalka
**Běží:** JNJ stroj (Outlook MAPI), Python z Thonny.
## Co to je
Synchronizace JNJ Outlooku (MAPI) → osobní schránka (přes msgreceiver) + bookkeeping
v SQLite (`C:\Users\vbuzalka\SQLITE\jnjemails.db`). Sleduje přesuny e-mailů mezi
složkami a příznak „už není ve schránce" — bez opětovného přenosu těla.
Skenované složky: **Inbox + Sent Items + Deleted Items** (vč. podsložek).
## Novinka v1.3 — detekce změny obsahu (re-upload změněného e-mailu)
**Problém:** e-mail **bez Message-ID** (typicky **NEODESLANÝ** Sent kvůli `SendAsDenied`,
nebo čerstvě odeslaný, kde Exchange ještě nedoplnil Message-ID) má **stabilní EntryID**.
Když do něj Outlook **po zachycení** dopíše chybu odeslání, obsah se změní, ale identita
(`entryid:<EID>`) zůstane → starý sync to vyhodnotil jako „známé, beze změny" a
aktualizovaný (chybový) e-mail už domů **nepřenesl**. Naproti tomu úspěšně odeslaný
e-mail dostane **nové EntryID + Message-ID**, takže se zachytil jako nový. Vznikla
asymetrie: failed-update se ztrácel.
**Řešení:** identita zůstává (Message-ID / `entryid:`), ale navíc se sleduje **verzní otisk**
= `PR_LAST_MODIFICATION_TIME` (`0x30080040`). U **známé položky bez Message-ID**
(`mid` začíná `entryid:`) se otisk porovná; když se posunul, e-mail se znovu uloží
(`SaveAs`) a nahraje s `overwrite=1` → server přepíše původní `.msg` na místě → Tower ho
přeparsuje → dokument v Mongu se aktualizuje (vč. těla s chybou).
- Hlídání je **levné** — druhé čtení property jen u známých no-ID položek (desítky kusů);
položky s Message-ID jsou finalizované a nesledují se.
- Re-upload běží jen v režimech, které smějí nahrávat (**capture, full-update**), a posílá se
s `folder=""` → server **nedělá** Graph re-import (žádný duplikát v Graph zrcadle).
- **Vyžaduje msgreceiver app.py ≥ v2.4** (overwrite na `/upload`). Bez něj se re-upload chová
jako starý skip (nepřepíše, ale nic nerozbije) — pořadí nasazení server → JNJ bez výpadku.
## Nové sloupce SQLite
- `messages.last_mod_time` — PR_LAST_MODIFICATION_TIME při posledním zachycení (otisk).
- `messages.content_uploads` — kolikrát se tělo nahrálo (1 = jen první zachycení).
- `runs.content_updated` — kolik e-mailů se v běhu re-uploadlo kvůli změně obsahu.
(Migrace přes stávající `ALTER TABLE` smyčku — staré `jnjemails.db` se doplní automaticky.)
## Argumenty
`--mode {capture,update-paths,full-update}` (default capture), `--days N`
(0 = celé), `--dry-run`, `--limit N`, `--no-db-upload`.
## Spouštění (JNJ stroj, plné cesty)
```
"C:\Users\vbuzalka\AppData\Local\Programs\Thonny\python.exe" "c:\Users\vbuzalka\OneDrive - JNJ\##JNJPrenos\Python\jnj_mailbox_sync_v1.3.py" --mode full-update --days 60
```
`full-update --days 60` = dorovná chybějící + **re-uploadne změněné** (chybové) Sent položky
za poslední 60 dní. To je doporučený běh pro „aktualizovat i neodeslané".
## Revert
Stará verze: `Trash/jnj_mailbox_sync_v1.2.py` (bez detekce změny). Server v2.4 zůstává
zpětně kompatibilní (overwrite je opt-in), takže revert na JNJ straně nevyžaduje zásah na serveru.
## Historie
- **1.0.0** — režimy capture/update-paths/full-update, sledování přesunů, updated_at.
- **1.1.0** — + Deleted Items do skenovaných složek.
- **1.2.0** — upload SQLite komprimován (lzma/xz max) + šifrován (Fernet) → `.db.xz.enc`.
- **1.3.0** — + detekce změny obsahu přes `PR_LAST_MODIFICATION_TIME`: známé no-ID
položky, které se po zachycení změnily (např. dopsaná chyba `SendAsDenied`), se znovu
nahrávají s `overwrite=1`. Nové sloupce `last_mod_time`, `content_uploads`,
`runs.content_updated`. Vyžaduje app.py ≥ v2.4.
+664
View File
@@ -0,0 +1,664 @@
"""
jnj_mailbox_sync v1.3
Nazev: jnj_mailbox_sync_v1.3.py
Verze: 1.3.0
Datum: 2026-06-16
Autor: vladimir.buzalka
Popis:
Synchronizace JNJ Outlooku (MAPI) -> osobni schranka + bookkeeping v SQLite.
Nasledník inbox_full_sync_v1.1 / jnj_mailbox_sync_v1.2. Sleduje PRESUN emailu
mezi slozkami a priznak "uz neni ve schrance" — BEZ opetovneho prenosu tela.
Scope: primarni schranka, Inbox + Sent Items + Deleted Items vcetne vsech
podsložek. Online Archive se NEskenuje.
Identita emailu = Internet Message-ID (stabilni pres presuny). Kdyz Message-ID
chybi (typicky cerstve odeslane / NEODESLANE Sent polozky — Exchange ho doplni
az po skutecnem transportu), pouzije se fallback "entryid:<EntryID>".
Sloupce cest v SQLite:
folder = cesta pri PRVNIM zachyceni (historie, neprepisuje se)
jnj_folder = AKTUALNI ziva cesta (prepisuje se pri presunu)
updated_at se bumpne pri insertu i kazde zmene — watermark pro domaci sync.
NOVINKA v1.3 — DETEKCE ZMENY OBSAHU (re-upload zmeneneho emailu)
Problem: e-mail bez Message-ID (napr. NEODESLANY Sent kvuli SendAsDenied) ma
STABILNI EntryID. Kdyz do nej Outlook PO zachyceni dopise chybu odeslani,
obsah se zmeni, ale identita (entryid:<EID>) zustane — stary sync to vyhodnotil
jako "zname, beze zmeny" a aktualizovany (chybovy) e-mail uz domu NEPRENESL.
Naproti tomu uspesne odeslany e-mail dostane NOVE EntryID + Message-ID, takze
se zachytil jako novy. Vznikla asymetrie: failed-update se ztracel.
Reseni: identita zustava (Message-ID / entryid:), ale navic se sleduje VERZNI
OTISK = PR_LAST_MODIFICATION_TIME (0x30080040). U ZNAMEHO emailu BEZ Message-ID
(mid zacina "entryid:") se otisk porovna; kdyz se posunul, e-mail se znovu
ulozi (SaveAs) a nahraje s priznakem overwrite=true (server prepise puvodni
.msg na miste -> Tower ho preparsuje -> dokument v Mongu se aktualizuje, vc.
tela s chybou). Tim doteche i "zmeneny hustak". Hlidani je levne — druhe cteni
property jen u znamych no-ID polozek (desitky kusu); polozky s Message-ID jsou
finalizovane a nesleduji se.
Re-upload bezi jen v rezimech, ktere smeji nahravat (capture, full-update),
a posila se BEZ folderu (folder="") => server NEdela Graph re-import (zadny
duplikat v Graph zrcadle); jen prepise /msgs soubor pro Tower parse.
Vyzaduje msgreceiver app.py >= v2.4 (overwrite na /upload). Bez nej se
re-upload chova jako "exists" (stary skip) — neprepise, ale nic nerozbije.
Upload SQLite (zustava z v1.2): DB se pred odeslanim KOMPRIMUJE (lzma/xz, max) a
SIFRUJE (Fernet, klic z TOKENu) a nahrava jako .db.xz.enc.
Rezimy (--mode):
capture (default) Projde cely Inbox+Sent+Deleted, nove emaily ulozi a
nahraje + NOVE re-uploadne zmenene znamé no-ID polozky.
Okno --days se IGNORUJE (bere VSE).
update-paths Jen METADATA cesty/precteno + "opustilo schranku". NIC nenahrava
(ani re-upload).
full-update update-paths + dorovna chybejici (SaveAs+upload) + re-upload
zmenenych znamých no-ID polozek.
Argumenty:
--mode {capture,update-paths,full-update} default capture
--days N velikost okna ve dnech (default 30). 0 = cely Inbox+Sent.
--dry-run NIC nezapise/nenahraje, jen vypise co by udelal.
--limit N zpracovat max N polozek (rychly test).
--no-db-upload na konci nenahravat SQLite na server.
Spousteni:
# Refresh poslednich 60 dni + zachytit zmenene (chybove) Sent polozky:
python jnj_mailbox_sync_v1.3.py --mode full-update --days 60
Zavislosti:
pywin32, requests, cryptography, sqlite3 + lzma (stdlib).
Python 3.10+, Windows, Outlook musi byt spusteny a prihlaseny.
Historie verzi:
1.0.0 2026-06-09 Rezimy capture/update-paths/full-update, sledovani presunu,
not_in_mailbox_anymore, updated_at watermark.
1.1.0 2026-06-10 + Deleted Items do SYNC_FOLDERS.
1.2.0 2026-06-10 Upload SQLite komprimovan (lzma) + sifrovan (Fernet) ->
.db.xz.enc. Vyzaduje app.py >= v2.1.
1.3.0 2026-06-16 + DETEKCE ZMENY OBSAHU pres PR_LAST_MODIFICATION_TIME:
zname no-ID polozky (entryid:), ktere se po zachyceni
zmenily (napr. dopsana chyba SendAsDenied), se znovu
nahravaji s overwrite=true. Nove SQLite sloupce
last_mod_time, content_uploads; runs.content_updated.
Vyzaduje app.py >= v2.4 (overwrite na /upload).
"""
import argparse
import base64
import hashlib
import logging
import lzma
import sqlite3
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
import win32com.client
import requests
import urllib3
from cryptography.fernet import Fernet
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
TOKEN = "13e1bb01-9fd5-44a8-8ce9-4ee27133d340"
UPLOAD_URL = "https://msgs.buzalka.cz/upload"
DB_UPLOAD_URL = "https://msgs.buzalka.cz/upload-db"
DB_PATH = r"C:\Users\vbuzalka\SQLITE\jnjemails.db"
LOG_PATH = r"C:\Users\vbuzalka\SQLITE\jnj_mailbox_sync_errors.log"
PR_INTERNET_MESSAGE_ID = "http://schemas.microsoft.com/mapi/proptag/0x1035001E"
PR_LAST_MOD_TIME = "http://schemas.microsoft.com/mapi/proptag/0x30080040" # PR_LAST_MODIFICATION_TIME
SCRIPT_NAME = "jnj_mailbox_sync"
SCRIPT_VERSION = "1.3.0"
# olFolderInbox=6, olFolderSentMail=5, olFolderDeletedItems=3
SYNC_FOLDERS = [(6, "Inbox"), (5, "Sent Items"), (3, "Deleted Items")]
OLSAVE_MSG = 3 # OlSaveAsType.olMSG
# Sifrovaci klic odvozeny z TOKENu (stejny algoritmus jako server)
_FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(TOKEN.encode()).digest()))
logging.basicConfig(
filename=LOG_PATH,
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# ──────────────────────────────────────────────────────────────────────────────
# ─── SQLite ───────────────────────────────────────────────────────────────────
def init_db(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
subject TEXT,
sender TEXT,
received_at TEXT,
folder TEXT,
source TEXT,
uploaded_at TEXT DEFAULT (datetime('now')),
entry_id TEXT,
graph_id TEXT,
is_read INTEGER DEFAULT 0,
jnj_folder TEXT,
not_in_mailbox_anymore INTEGER DEFAULT 0,
left_mailbox_at TEXT,
updated_at TEXT,
last_mod_time TEXT,
content_uploads INTEGER DEFAULT 1
)
""")
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)")
conn.execute("""
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
script TEXT NOT NULL,
version TEXT,
started_at TEXT NOT NULL,
finished_at TEXT,
mode TEXT,
window_days INTEGER,
dry_run INTEGER DEFAULT 0,
found INTEGER DEFAULT 0,
new_captured INTEGER DEFAULT 0,
path_updated INTEGER DEFAULT 0,
read_updated INTEGER DEFAULT 0,
returned INTEGER DEFAULT 0,
left_mailbox INTEGER DEFAULT 0,
content_updated INTEGER DEFAULT 0,
skipped INTEGER DEFAULT 0,
errors INTEGER DEFAULT 0
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER REFERENCES runs(id),
level TEXT NOT NULL,
event TEXT NOT NULL,
subject TEXT,
folder TEXT,
graph_id TEXT,
detail TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_log_run_id ON log(run_id)")
# Migrace existujici jnjemails.db — pridej chybejici sloupce
for col, ddl in [
("entry_id", "TEXT"), ("graph_id", "TEXT"), ("is_read", "INTEGER DEFAULT 0"),
("jnj_folder", "TEXT"), ("not_in_mailbox_anymore", "INTEGER DEFAULT 0"),
("left_mailbox_at", "TEXT"), ("updated_at", "TEXT"),
("last_mod_time", "TEXT"), ("content_uploads", "INTEGER DEFAULT 1"),
]:
try:
conn.execute(f"ALTER TABLE messages ADD COLUMN {col} {ddl}")
except Exception:
pass
for col, ddl in [
("mode", "TEXT"), ("window_days", "INTEGER"), ("dry_run", "INTEGER DEFAULT 0"),
("found", "INTEGER DEFAULT 0"), ("new_captured", "INTEGER DEFAULT 0"),
("path_updated", "INTEGER DEFAULT 0"), ("read_updated", "INTEGER DEFAULT 0"),
("returned", "INTEGER DEFAULT 0"), ("left_mailbox", "INTEGER DEFAULT 0"),
("content_updated", "INTEGER DEFAULT 0"),
]:
try:
conn.execute(f"ALTER TABLE runs ADD COLUMN {col} {ddl}")
except Exception:
pass
conn.execute("CREATE INDEX IF NOT EXISTS idx_updated_at ON messages(updated_at)")
conn.commit()
def start_run(conn, mode, days, dry):
cur = conn.execute(
"""INSERT INTO runs (script, version, started_at, mode, window_days, dry_run)
VALUES (?, ?, datetime('now'), ?, ?, ?)""",
(SCRIPT_NAME, SCRIPT_VERSION, mode, days, 1 if dry else 0),
)
conn.commit()
return cur.lastrowid
def finish_run(conn, run_id, stats):
conn.execute(
"""UPDATE runs SET finished_at=datetime('now'),
found=?, new_captured=?, path_updated=?, read_updated=?,
returned=?, left_mailbox=?, content_updated=?, skipped=?, errors=?
WHERE id=?""",
(stats["found"], stats["new_captured"], stats["path_updated"],
stats["read_updated"], stats["returned"], stats["left_mailbox"],
stats["content_updated"], stats["skipped"], stats["errors"], run_id),
)
conn.commit()
def db_log(conn, run_id, level, event, subject=None, folder=None, graph_id=None, detail=None):
conn.execute(
"""INSERT INTO log (run_id, level, event, subject, folder, graph_id, detail)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(run_id, level, event, subject, folder, graph_id, detail),
)
conn.commit()
def info(conn, run_id, event, **kw):
db_log(conn, run_id, "INFO", event, **kw)
def error(conn, run_id, event, **kw):
db_log(conn, run_id, "ERROR", event, **kw)
def db_get(conn, mid):
cur = conn.execute(
"""SELECT message_id, folder, jnj_folder, is_read, not_in_mailbox_anymore,
last_mod_time, content_uploads
FROM messages WHERE message_id=?""", (mid,))
r = cur.fetchone()
if not r:
return None
return {"message_id": r[0], "folder": r[1], "jnj_folder": r[2],
"is_read": r[3], "not_in_mailbox_anymore": r[4],
"last_mod_time": r[5], "content_uploads": r[6]}
def apply_update(conn, mid, changes):
sets, vals = [], []
for k, v in changes.items():
sets.append(f"{k}=?")
vals.append(v)
sets.append("updated_at=datetime('now')")
vals.append(mid)
conn.execute(f"UPDATE messages SET {', '.join(sets)} WHERE message_id=?", vals)
conn.commit()
# ─── Outlook / prenos ────────────────────────────────────────────────────────
def get_mid(item) -> str:
try:
mid = item.PropertyAccessor.GetProperty(PR_INTERNET_MESSAGE_ID)
except Exception:
mid = None
return mid or f"entryid:{item.EntryID}"
def get_lastmod(item):
"""PR_LAST_MODIFICATION_TIME jako ISO string (verzni otisk). None pri chybe."""
try:
v = item.PropertyAccessor.GetProperty(PR_LAST_MOD_TIME)
if v is None:
return None
try:
return v.isoformat()
except Exception:
return str(v)
except Exception:
return None
def upload_msg(msg_path, filename, folder="", overwrite=False):
with open(msg_path, "rb") as f:
encrypted = _FERNET.encrypt(f.read())
enc_filename = Path(filename).stem + ".emsg"
data = {"folder": folder}
if overwrite:
data["overwrite"] = "1"
resp = requests.post(
UPLOAD_URL,
headers={"Authorization": f"Bearer {TOKEN}"},
files={"file": (enc_filename, encrypted, "application/octet-stream")},
data=data,
timeout=60,
)
if not resp.ok:
raise requests.HTTPError(f"{resp.status_code} {resp.reason} | {resp.text[:200]}")
return resp.json()
def save_and_upload(item, folder="", overwrite=False):
"""SaveAs do temp -> upload (sifrovane). Vraci (filename, server_json)."""
with tempfile.TemporaryDirectory() as tmp:
safe = f"{item.EntryID[-20:]}.msg"
p = Path(tmp) / safe
item.SaveAs(str(p), OLSAVE_MSG)
result = upload_msg(p, safe, folder, overwrite=overwrite)
return safe, result
def capture_new(conn, run_id, item, mid, current, is_read, subject, stats):
"""Novy email: SaveAs -> upload -> insert. Vraci True pri uspechu."""
_, result = save_and_upload(item, current, overwrite=False)
graph_id = result.get("graph_id")
lm = get_lastmod(item)
try:
received = item.ReceivedTime.isoformat() if item.ReceivedTime else None
except Exception:
received = None
try:
sender = item.SenderEmailAddress or ""
except Exception:
sender = ""
conn.execute(
"""INSERT OR IGNORE INTO messages
(message_id, subject, sender, received_at, folder, source,
entry_id, graph_id, is_read, jnj_folder,
not_in_mailbox_anymore, updated_at, last_mod_time, content_uploads)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, datetime('now'), ?, 1)""",
(mid, subject, sender, received, current, SCRIPT_NAME,
item.EntryID, graph_id, is_read, current, lm),
)
conn.commit()
info(conn, run_id, "captured", subject=subject, folder=current, graph_id=graph_id)
print(f" NEW | {subject[:70]}")
return True
def reupload_changed(item, current):
"""Znovu nahraj zmeneny (znamy) email — overwrite na serveru.
Folder="" => server NEdela Graph re-import (jen prepise /msgs soubor)."""
save_and_upload(item, folder="", overwrite=True)
def process_item(conn, run_id, item, current, stats, seen, mode, dry):
try:
mid = get_mid(item)
except Exception:
return
seen.add(mid)
stats["found"] += 1
try:
is_read = 0 if item.UnRead else 1
except Exception:
is_read = 0
subject = str(getattr(item, "Subject", "") or "")
row = db_get(conn, mid)
# ── Novy email (neni v DB) ────────────────────────────────────────────
if row is None:
if mode in ("capture", "full-update"):
if dry:
stats["new_captured"] += 1
print(f" NEW* | {subject[:70]}")
else:
try:
if capture_new(conn, run_id, item, mid, current, is_read, subject, stats):
stats["new_captured"] += 1
except Exception as e:
stats["errors"] += 1
error(conn, run_id, "capture_error", subject=subject, folder=current, detail=str(e))
print(f" CHYBA NEW | {subject[:50]} | {e}")
else: # update-paths — telo nemame, nelze dorovnat
stats["new_uncaptured"] += 1
return
# ── Znamy email — porovnej zmeny ──────────────────────────────────────
changes = {}
current_known = row.get("jnj_folder") or row.get("folder")
if current_known != current:
changes["jnj_folder"] = current
stats["path_updated"] += 1
if row.get("is_read") != is_read:
changes["is_read"] = is_read
stats["read_updated"] += 1
if row.get("not_in_mailbox_anymore"):
changes["not_in_mailbox_anymore"] = 0
changes["left_mailbox_at"] = None
stats["returned"] += 1
# ── DETEKCE ZMENY OBSAHU (v1.3) ───────────────────────────────────────
# Jen u znamých polozek BEZ Message-ID (mid zacina "entryid:") — tam ma
# EntryID stabilni a obsah se muze zmenit pod stejnou identitou (napr.
# dopsana chyba SendAsDenied). Polozky s Message-ID jsou finalizovane.
# Re-upload jen v rezimech, ktere smeji nahravat, a ne v dry-run.
if (mode in ("capture", "full-update") and mid.startswith("entryid:")):
cur_lm = get_lastmod(item)
if cur_lm and cur_lm != row.get("last_mod_time"):
stats["content_updated"] += 1
if dry:
# DRY-RUN: jen napocitej + ukaz, NIC nenahrava (nahled pred ostrym behem)
print(f" REUP* | {subject[:55]} | obsah zmenen -> by se re-uploadl")
else:
try:
reupload_changed(item, current)
changes["last_mod_time"] = cur_lm
changes["content_uploads"] = (row.get("content_uploads") or 1) + 1
print(f" REUP | {subject[:55]} | obsah zmenen -> re-upload")
info(conn, run_id, "content_reupload", subject=subject, folder=current,
detail=f"last_mod {row.get('last_mod_time')} -> {cur_lm}")
except Exception as e:
stats["content_updated"] -= 1
stats["errors"] += 1
error(conn, run_id, "reupload_error", subject=subject, folder=current, detail=str(e))
print(f" CHYBA REUP | {subject[:50]} | {e}")
if changes:
if not dry:
apply_update(conn, mid, changes)
what = []
if "jnj_folder" in changes:
what.append(f"-> {current}")
if "is_read" in changes:
what.append("precteno" if is_read else "neprecteno")
if "not_in_mailbox_anymore" in changes:
what.append("vraceno do schranky")
if "last_mod_time" in changes:
what.append("obsah aktualizovan")
marker = "*" if dry else " "
print(f" UPD{marker} | {subject[:55]} | {', '.join(what)}")
info(conn, run_id, "path_update", subject=subject, folder=current, detail="; ".join(what))
else:
stats["skipped"] += 1
def walk(conn, run_id, folder, folder_path, cutoff_local, stats, seen, mode, dry, limit):
current = f"{folder_path}/{folder.Name}"
try:
items = folder.Items
if cutoff_local is not None:
restrict = ("@SQL=\"urn:schemas:httpmail:datereceived\" >= '%s'"
% cutoff_local.strftime("%Y/%m/%d %H:%M:%S"))
items = items.Restrict(restrict)
items.Sort("[ReceivedTime]", True) # newest first
except Exception as e:
print(f" CHYBA slozka {current}: {e}")
error(conn, run_id, "folder_error", folder=current, detail=str(e))
return
n = 0
for item in items:
if limit and stats["found"] >= limit:
break
try:
if not str(getattr(item, "MessageClass", "")).upper().startswith("IPM.NOTE"):
continue
except Exception:
continue
process_item(conn, run_id, item, current, stats, seen, mode, dry)
n += 1
print(f" {current}: {n} polozek")
info(conn, run_id, "folder_done", folder=current, detail=str(n))
try:
subs = list(folder.Folders)
except Exception:
subs = []
for sub in subs:
if limit and stats["found"] >= limit:
break
walk(conn, run_id, sub, current, cutoff_local, stats, seen, mode, dry, limit)
def _parse_dt(s):
if not s:
return None
try:
dt = datetime.fromisoformat(s)
if dt.tzinfo:
dt = dt.astimezone().replace(tzinfo=None)
return dt
except Exception:
return None
def flag_left_mailbox(conn, run_id, cutoff_local, seen, scanned_roots, stats, dry):
"""Emaily v DB v okne, ktere jsme ve SKENOVANE casti schranky NEvideli ->
opustily pracovni schranku. Ponecha posledni znamou cestu, nastavi priznak."""
cur = conn.execute(
"""SELECT message_id, received_at, jnj_folder, folder, not_in_mailbox_anymore
FROM messages""")
to_flag = []
for mid, received_at, jnjf, fld, flag in cur.fetchall():
if mid in seen or flag:
continue
path = jnjf or fld or ""
if not any(path.startswith(root) for root in scanned_roots):
continue
rec = _parse_dt(received_at)
if rec is None or rec < cutoff_local:
continue
to_flag.append((mid, path))
for mid, path in to_flag:
if not dry:
conn.execute(
"""UPDATE messages SET not_in_mailbox_anymore=1,
left_mailbox_at=datetime('now'), updated_at=datetime('now')
WHERE message_id=?""", (mid,))
stats["left_mailbox"] += 1
print(f" GONE{'*' if dry else ' '} | {path}")
if not dry and to_flag:
conn.commit()
info(conn, run_id, "left_mailbox", detail=str(len(to_flag)))
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description=f"jnj_mailbox_sync v{SCRIPT_VERSION}")
ap.add_argument("--mode", choices=["capture", "update-paths", "full-update"],
default="capture")
ap.add_argument("--days", type=int, default=30,
help="Okno ve dnech pro update-paths/full-update (0 = vse)")
ap.add_argument("--dry-run", action="store_true",
help="Nic nezapise/nenahraje, jen vypise co by udelal")
ap.add_argument("--limit", type=int, default=0, help="Max N polozek (test)")
ap.add_argument("--no-db-upload", action="store_true")
args = ap.parse_args()
mode, dry = args.mode, args.dry_run
if mode == "capture":
cutoff_local = None
else:
cutoff_local = None if args.days == 0 else (datetime.now() - timedelta(days=args.days))
win = "vse" if cutoff_local is None else f"{args.days} dni (od {cutoff_local:%Y-%m-%d %H:%M})"
print(f"=== jnj_mailbox_sync v{SCRIPT_VERSION} ===")
print(f"Start: {datetime.now():%Y-%m-%d %H:%M:%S}")
print(f"Rezim: {mode} Okno: {win} {'[DRY-RUN — nic se nemeni]' if dry else ''}")
print(f"DB: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
init_db(conn)
run_id = start_run(conn, mode, args.days, dry)
outlook = win32com.client.Dispatch("Outlook.Application")
ns = outlook.GetNamespace("MAPI")
stats = {"found": 0, "new_captured": 0, "new_uncaptured": 0, "path_updated": 0,
"read_updated": 0, "returned": 0, "left_mailbox": 0, "content_updated": 0,
"skipped": 0, "errors": 0}
seen = set()
scanned_roots = set()
for fid, label in SYNC_FOLDERS:
root = ns.GetDefaultFolder(fid)
mailbox = root.Parent.Name
scanned_roots.add(f"/{mailbox}/{root.Name}")
print(f"\n=== {label} ({mailbox}) ===")
walk(conn, run_id, root, f"/{mailbox}", cutoff_local, stats, seen, mode, dry, args.limit)
if mode in ("update-paths", "full-update") and cutoff_local is not None and not (args.limit):
print("\n--- Kontrola 'opustilo schranku' (v okne, Inbox/Sent/Deleted) ---")
flag_left_mailbox(conn, run_id, cutoff_local, seen, scanned_roots, stats, dry)
elif args.limit:
print("\n(--limit aktivni -> detekce 'opustilo schranku' preskocena)")
finish_run(conn, run_id, stats)
# ── Souhrn ─────────────────────────────────────────────────────────────
print(f"\n{'='*60}")
print(f"SOUHRN [{mode}{' / DRY-RUN' if dry else ''}]")
print(f" Nalezeno ve schrance: {stats['found']}")
if mode in ("capture", "full-update"):
lbl = "by se nahralo" if dry else "nahrano"
print(f" Nove zachyceno ({lbl}): {stats['new_captured']}")
else:
print(f" Nove (bez tela, nedorovnano):{stats['new_uncaptured']}")
print(f" Aktualizovana cesta: {stats['path_updated']}")
print(f" Zmena precteno/neprecteno: {stats['read_updated']}")
print(f" Vraceno do schranky: {stats['returned']}")
print(f" Obsah zmenen (re-upload): {stats['content_updated']}")
print(f" Opustilo schranku (GONE): {stats['left_mailbox']}")
print(f" Beze zmeny (skip): {stats['skipped']}")
print(f" Chyby: {stats['errors']}")
print(f"{'='*60}")
if dry:
print("DRY-RUN: SQLite ani server se NEMENILY.")
elif not args.no_db_upload:
print("\nUpload SQLite na server...")
upload_db(DB_PATH)
print(f"\nKonec: {datetime.now():%Y-%m-%d %H:%M:%S}")
if stats["errors"]:
print(f"Chyby logovany do: {LOG_PATH}")
conn.close()
def upload_db(db_path):
"""Komprese (lzma/xz, max) -> Fernet sifra -> upload jako .db.xz.enc."""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"jnjemails_{ts}.db"
try:
with open(db_path, "rb") as f:
raw = f.read()
compressed = lzma.compress(raw, preset=9 | lzma.PRESET_EXTREME)
encrypted = _FERNET.encrypt(compressed)
enc_filename = filename + ".xz.enc"
resp = requests.post(
DB_UPLOAD_URL,
headers={"Authorization": f"Bearer {TOKEN}"},
files={"file": (enc_filename, encrypted, "application/octet-stream")},
timeout=300,
)
mb_raw, mb_xz, mb_enc = (len(raw) / 1048576,
len(compressed) / 1048576,
len(encrypted) / 1048576)
print(f" DB upload: {resp.json()} "
f"({mb_raw:.1f} MB -> xz {mb_xz:.1f} MB -> enc {mb_enc:.1f} MB)")
except Exception as e:
print(f" DB upload CHYBA: {e}")
if __name__ == "__main__":
main()
@@ -0,0 +1,85 @@
# jnj_mailbox_sync v1.4.0
**Soubor:** `jnj_mailbox_sync_v1.4.py`
**Datum:** 2026-06-16
**Autor:** vladimir.buzalka
**Běží:** JNJ stroj (Outlook MAPI), Python z Thonny.
## Co to je
Synchronizace JNJ Outlooku (MAPI) → osobní schránka (přes msgreceiver) + bookkeeping
v SQLite (`C:\Users\vbuzalka\SQLITE\jnjemails.db`). Sleduje přesuny e-mailů mezi
složkami a příznak „už není ve schránce" — bez opětovného přenosu těla.
Skenované složky: **Inbox + Sent Items + Deleted Items + Archive** (vč. podsložek).
## Novinka v1.4 — skenování složky Archive (primární schránka)
Přidána složka **Archive** (jednoklikové archivování v Outlooku) v **primární** schránce.
Archive **není** default folder, takže se hledá podle jména `"Archive"` pod kořenem
primární schránky (`Inbox.Parent`) a přidává se do `scanned_roots` (aby se její položky
nehodnotily jako „opustilo schránku"). **Online Archive** (samostatný store) se i nadále
**neskenuje**. Řeší případy, kdy odeslaná kopie skončila v Archive (jinak chyběla domácímu
přehledu i párování dvojčat).
## Novinka v1.3 — detekce změny obsahu (re-upload změněného e-mailu)
**Problém:** e-mail **bez Message-ID** (typicky **NEODESLANÝ** Sent kvůli `SendAsDenied`,
nebo čerstvě odeslaný, kde Exchange ještě nedoplnil Message-ID) má **stabilní EntryID**.
Když do něj Outlook **po zachycení** dopíše chybu odeslání, obsah se změní, ale identita
(`entryid:<EID>`) zůstane → starý sync to vyhodnotil jako „známé, beze změny" a
aktualizovaný (chybový) e-mail už domů **nepřenesl**. Naproti tomu úspěšně odeslaný
e-mail dostane **nové EntryID + Message-ID**, takže se zachytil jako nový. Vznikla
asymetrie: failed-update se ztrácel.
**Řešení:** identita zůstává (Message-ID / `entryid:`), ale navíc se sleduje **verzní otisk**
= `PR_LAST_MODIFICATION_TIME` (`0x30080040`). U **známé položky bez Message-ID**
(`mid` začíná `entryid:`) se otisk porovná; když se posunul, e-mail se znovu uloží
(`SaveAs`) a nahraje s `overwrite=1` → server přepíše původní `.msg` na místě → Tower ho
přeparsuje → dokument v Mongu se aktualizuje (vč. těla s chybou).
- Hlídání je **levné** — druhé čtení property jen u známých no-ID položek (desítky kusů);
položky s Message-ID jsou finalizované a nesledují se.
- Re-upload běží jen v režimech, které smějí nahrávat (**capture, full-update**), a posílá se
s `folder=""` → server **nedělá** Graph re-import (žádný duplikát v Graph zrcadle).
- **Vyžaduje msgreceiver app.py ≥ v2.4** (overwrite na `/upload`). Bez něj se re-upload chová
jako starý skip (nepřepíše, ale nic nerozbije) — pořadí nasazení server → JNJ bez výpadku.
## Nové sloupce SQLite
- `messages.last_mod_time` — PR_LAST_MODIFICATION_TIME při posledním zachycení (otisk).
- `messages.content_uploads` — kolikrát se tělo nahrálo (1 = jen první zachycení).
- `runs.content_updated` — kolik e-mailů se v běhu re-uploadlo kvůli změně obsahu.
(Migrace přes stávající `ALTER TABLE` smyčku — staré `jnjemails.db` se doplní automaticky.)
## Argumenty
`--mode {capture,update-paths,full-update}` (default capture), `--days N`
(0 = celé), `--dry-run`, `--limit N`, `--no-db-upload`.
## Spouštění (JNJ stroj, plné cesty)
```
"C:\Users\vbuzalka\AppData\Local\Programs\Thonny\python.exe" "c:\Users\vbuzalka\OneDrive - JNJ\##JNJPrenos\Python\jnj_mailbox_sync_v1.4.py" --mode full-update --days 60
```
`full-update --days 60` = dorovná chybějící + **re-uploadne změněné** (chybové) Sent položky
za poslední 60 dní. To je doporučený běh pro „aktualizovat i neodeslané".
## Revert
Stará verze: `Trash/jnj_mailbox_sync_v1.3.py` (bez skenování Archive), `Trash/…_v1.2.py`
(bez detekce změny). Server v2.4 zůstává zpětně kompatibilní (overwrite je opt-in),
takže revert na JNJ straně nevyžaduje zásah na serveru.
## Historie
- **1.0.0** — režimy capture/update-paths/full-update, sledování přesunů, updated_at.
- **1.1.0** — + Deleted Items do skenovaných složek.
- **1.2.0** — upload SQLite komprimován (lzma/xz max) + šifrován (Fernet) → `.db.xz.enc`.
- **1.3.0** — + detekce změny obsahu přes `PR_LAST_MODIFICATION_TIME`: známé no-ID
položky, které se po zachycení změnily (např. dopsaná chyba `SendAsDenied`), se znovu
nahrávají s `overwrite=1`. Nové sloupce `last_mod_time`, `content_uploads`,
`runs.content_updated`. Vyžaduje app.py ≥ v2.4.
- **1.4.0** — + skenování složky **Archive** v primární schránce (hledá se podle jména
pod kořenem schránky, ne přes default folder; Online Archive se neskenuje).
+695
View File
@@ -0,0 +1,695 @@
"""
jnj_mailbox_sync v1.4
Nazev: jnj_mailbox_sync_v1.4.py
Verze: 1.4.0
Datum: 2026-06-16
Autor: vladimir.buzalka
Popis:
Synchronizace JNJ Outlooku (MAPI) -> osobni schranka + bookkeeping v SQLite.
Nasledník inbox_full_sync_v1.1 / jnj_mailbox_sync_v1.2. Sleduje PRESUN emailu
mezi slozkami a priznak "uz neni ve schrance" — BEZ opetovneho prenosu tela.
Scope: primarni schranka, Inbox + Sent Items + Deleted Items + Archive
vcetne vsech podsložek. Slozka Archive (jednoklikove archivovani v Outlooku)
NENI default folder — hleda se podle jmena pod korenem primarni schranky.
Online Archive (samostatny store) se i nadale NEskenuje.
Identita emailu = Internet Message-ID (stabilni pres presuny). Kdyz Message-ID
chybi (typicky cerstve odeslane / NEODESLANE Sent polozky — Exchange ho doplni
az po skutecnem transportu), pouzije se fallback "entryid:<EntryID>".
Sloupce cest v SQLite:
folder = cesta pri PRVNIM zachyceni (historie, neprepisuje se)
jnj_folder = AKTUALNI ziva cesta (prepisuje se pri presunu)
updated_at se bumpne pri insertu i kazde zmene — watermark pro domaci sync.
NOVINKA v1.3 — DETEKCE ZMENY OBSAHU (re-upload zmeneneho emailu)
Problem: e-mail bez Message-ID (napr. NEODESLANY Sent kvuli SendAsDenied) ma
STABILNI EntryID. Kdyz do nej Outlook PO zachyceni dopise chybu odeslani,
obsah se zmeni, ale identita (entryid:<EID>) zustane — stary sync to vyhodnotil
jako "zname, beze zmeny" a aktualizovany (chybovy) e-mail uz domu NEPRENESL.
Naproti tomu uspesne odeslany e-mail dostane NOVE EntryID + Message-ID, takze
se zachytil jako novy. Vznikla asymetrie: failed-update se ztracel.
Reseni: identita zustava (Message-ID / entryid:), ale navic se sleduje VERZNI
OTISK = PR_LAST_MODIFICATION_TIME (0x30080040). U ZNAMEHO emailu BEZ Message-ID
(mid zacina "entryid:") se otisk porovna; kdyz se posunul, e-mail se znovu
ulozi (SaveAs) a nahraje s priznakem overwrite=true (server prepise puvodni
.msg na miste -> Tower ho preparsuje -> dokument v Mongu se aktualizuje, vc.
tela s chybou). Tim doteche i "zmeneny hustak". Hlidani je levne — druhe cteni
property jen u znamych no-ID polozek (desitky kusu); polozky s Message-ID jsou
finalizovane a nesleduji se.
Re-upload bezi jen v rezimech, ktere smeji nahravat (capture, full-update),
a posila se BEZ folderu (folder="") => server NEdela Graph re-import (zadny
duplikat v Graph zrcadle); jen prepise /msgs soubor pro Tower parse.
Vyzaduje msgreceiver app.py >= v2.4 (overwrite na /upload). Bez nej se
re-upload chova jako "exists" (stary skip) — neprepise, ale nic nerozbije.
Upload SQLite (zustava z v1.2): DB se pred odeslanim KOMPRIMUJE (lzma/xz, max) a
SIFRUJE (Fernet, klic z TOKENu) a nahrava jako .db.xz.enc.
Rezimy (--mode):
capture (default) Projde cely Inbox+Sent+Deleted, nove emaily ulozi a
nahraje + NOVE re-uploadne zmenene znamé no-ID polozky.
Okno --days se IGNORUJE (bere VSE).
update-paths Jen METADATA cesty/precteno + "opustilo schranku". NIC nenahrava
(ani re-upload).
full-update update-paths + dorovna chybejici (SaveAs+upload) + re-upload
zmenenych znamých no-ID polozek.
Argumenty:
--mode {capture,update-paths,full-update} default capture
--days N velikost okna ve dnech (default 30). 0 = cely Inbox+Sent.
--dry-run NIC nezapise/nenahraje, jen vypise co by udelal.
--limit N zpracovat max N polozek (rychly test).
--no-db-upload na konci nenahravat SQLite na server.
Spousteni:
# Refresh poslednich 60 dni + zachytit zmenene (chybove) Sent polozky:
python jnj_mailbox_sync_v1.3.py --mode full-update --days 60
Zavislosti:
pywin32, requests, cryptography, sqlite3 + lzma (stdlib).
Python 3.10+, Windows, Outlook musi byt spusteny a prihlaseny.
Historie verzi:
1.0.0 2026-06-09 Rezimy capture/update-paths/full-update, sledovani presunu,
not_in_mailbox_anymore, updated_at watermark.
1.1.0 2026-06-10 + Deleted Items do SYNC_FOLDERS.
1.2.0 2026-06-10 Upload SQLite komprimovan (lzma) + sifrovan (Fernet) ->
.db.xz.enc. Vyzaduje app.py >= v2.1.
1.3.0 2026-06-16 + DETEKCE ZMENY OBSAHU pres PR_LAST_MODIFICATION_TIME:
zname no-ID polozky (entryid:), ktere se po zachyceni
zmenily (napr. dopsana chyba SendAsDenied), se znovu
nahravaji s overwrite=true. Nove SQLite sloupce
last_mod_time, content_uploads; runs.content_updated.
Vyzaduje app.py >= v2.4 (overwrite na /upload).
1.4.0 2026-06-16 + skenovani slozky Archive v PRIMARNI schrance (ne Online
Archive). Archive neni default folder -> hleda se podle
jmena ("Archive") pod korenem primarni schranky a pridava
se do scanned_roots (aby se jeji polozky nehodnotily jako
GONE). Resi pripady, kdy odeslana kopie skoncila v Archive.
"""
import argparse
import base64
import hashlib
import logging
import lzma
import sqlite3
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
import win32com.client
import requests
import urllib3
from cryptography.fernet import Fernet
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
TOKEN = "13e1bb01-9fd5-44a8-8ce9-4ee27133d340"
UPLOAD_URL = "https://msgs.buzalka.cz/upload"
DB_UPLOAD_URL = "https://msgs.buzalka.cz/upload-db"
DB_PATH = r"C:\Users\vbuzalka\SQLITE\jnjemails.db"
LOG_PATH = r"C:\Users\vbuzalka\SQLITE\jnj_mailbox_sync_errors.log"
PR_INTERNET_MESSAGE_ID = "http://schemas.microsoft.com/mapi/proptag/0x1035001E"
PR_LAST_MOD_TIME = "http://schemas.microsoft.com/mapi/proptag/0x30080040" # PR_LAST_MODIFICATION_TIME
SCRIPT_NAME = "jnj_mailbox_sync"
SCRIPT_VERSION = "1.4.0"
# olFolderInbox=6, olFolderSentMail=5, olFolderDeletedItems=3
SYNC_FOLDERS = [(6, "Inbox"), (5, "Sent Items"), (3, "Deleted Items")]
OLSAVE_MSG = 3 # OlSaveAsType.olMSG
# Sifrovaci klic odvozeny z TOKENu (stejny algoritmus jako server)
_FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(TOKEN.encode()).digest()))
logging.basicConfig(
filename=LOG_PATH,
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# ──────────────────────────────────────────────────────────────────────────────
# ─── SQLite ───────────────────────────────────────────────────────────────────
def init_db(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
subject TEXT,
sender TEXT,
received_at TEXT,
folder TEXT,
source TEXT,
uploaded_at TEXT DEFAULT (datetime('now')),
entry_id TEXT,
graph_id TEXT,
is_read INTEGER DEFAULT 0,
jnj_folder TEXT,
not_in_mailbox_anymore INTEGER DEFAULT 0,
left_mailbox_at TEXT,
updated_at TEXT,
last_mod_time TEXT,
content_uploads INTEGER DEFAULT 1
)
""")
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)")
conn.execute("""
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
script TEXT NOT NULL,
version TEXT,
started_at TEXT NOT NULL,
finished_at TEXT,
mode TEXT,
window_days INTEGER,
dry_run INTEGER DEFAULT 0,
found INTEGER DEFAULT 0,
new_captured INTEGER DEFAULT 0,
path_updated INTEGER DEFAULT 0,
read_updated INTEGER DEFAULT 0,
returned INTEGER DEFAULT 0,
left_mailbox INTEGER DEFAULT 0,
content_updated INTEGER DEFAULT 0,
skipped INTEGER DEFAULT 0,
errors INTEGER DEFAULT 0
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER REFERENCES runs(id),
level TEXT NOT NULL,
event TEXT NOT NULL,
subject TEXT,
folder TEXT,
graph_id TEXT,
detail TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_log_run_id ON log(run_id)")
# Migrace existujici jnjemails.db — pridej chybejici sloupce
for col, ddl in [
("entry_id", "TEXT"), ("graph_id", "TEXT"), ("is_read", "INTEGER DEFAULT 0"),
("jnj_folder", "TEXT"), ("not_in_mailbox_anymore", "INTEGER DEFAULT 0"),
("left_mailbox_at", "TEXT"), ("updated_at", "TEXT"),
("last_mod_time", "TEXT"), ("content_uploads", "INTEGER DEFAULT 1"),
]:
try:
conn.execute(f"ALTER TABLE messages ADD COLUMN {col} {ddl}")
except Exception:
pass
for col, ddl in [
("mode", "TEXT"), ("window_days", "INTEGER"), ("dry_run", "INTEGER DEFAULT 0"),
("found", "INTEGER DEFAULT 0"), ("new_captured", "INTEGER DEFAULT 0"),
("path_updated", "INTEGER DEFAULT 0"), ("read_updated", "INTEGER DEFAULT 0"),
("returned", "INTEGER DEFAULT 0"), ("left_mailbox", "INTEGER DEFAULT 0"),
("content_updated", "INTEGER DEFAULT 0"),
]:
try:
conn.execute(f"ALTER TABLE runs ADD COLUMN {col} {ddl}")
except Exception:
pass
conn.execute("CREATE INDEX IF NOT EXISTS idx_updated_at ON messages(updated_at)")
conn.commit()
def start_run(conn, mode, days, dry):
cur = conn.execute(
"""INSERT INTO runs (script, version, started_at, mode, window_days, dry_run)
VALUES (?, ?, datetime('now'), ?, ?, ?)""",
(SCRIPT_NAME, SCRIPT_VERSION, mode, days, 1 if dry else 0),
)
conn.commit()
return cur.lastrowid
def finish_run(conn, run_id, stats):
conn.execute(
"""UPDATE runs SET finished_at=datetime('now'),
found=?, new_captured=?, path_updated=?, read_updated=?,
returned=?, left_mailbox=?, content_updated=?, skipped=?, errors=?
WHERE id=?""",
(stats["found"], stats["new_captured"], stats["path_updated"],
stats["read_updated"], stats["returned"], stats["left_mailbox"],
stats["content_updated"], stats["skipped"], stats["errors"], run_id),
)
conn.commit()
def db_log(conn, run_id, level, event, subject=None, folder=None, graph_id=None, detail=None):
conn.execute(
"""INSERT INTO log (run_id, level, event, subject, folder, graph_id, detail)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(run_id, level, event, subject, folder, graph_id, detail),
)
conn.commit()
def info(conn, run_id, event, **kw):
db_log(conn, run_id, "INFO", event, **kw)
def error(conn, run_id, event, **kw):
db_log(conn, run_id, "ERROR", event, **kw)
def db_get(conn, mid):
cur = conn.execute(
"""SELECT message_id, folder, jnj_folder, is_read, not_in_mailbox_anymore,
last_mod_time, content_uploads
FROM messages WHERE message_id=?""", (mid,))
r = cur.fetchone()
if not r:
return None
return {"message_id": r[0], "folder": r[1], "jnj_folder": r[2],
"is_read": r[3], "not_in_mailbox_anymore": r[4],
"last_mod_time": r[5], "content_uploads": r[6]}
def apply_update(conn, mid, changes):
sets, vals = [], []
for k, v in changes.items():
sets.append(f"{k}=?")
vals.append(v)
sets.append("updated_at=datetime('now')")
vals.append(mid)
conn.execute(f"UPDATE messages SET {', '.join(sets)} WHERE message_id=?", vals)
conn.commit()
# ─── Outlook / prenos ────────────────────────────────────────────────────────
def get_mid(item) -> str:
try:
mid = item.PropertyAccessor.GetProperty(PR_INTERNET_MESSAGE_ID)
except Exception:
mid = None
return mid or f"entryid:{item.EntryID}"
def get_lastmod(item):
"""PR_LAST_MODIFICATION_TIME jako ISO string (verzni otisk). None pri chybe."""
try:
v = item.PropertyAccessor.GetProperty(PR_LAST_MOD_TIME)
if v is None:
return None
try:
return v.isoformat()
except Exception:
return str(v)
except Exception:
return None
def upload_msg(msg_path, filename, folder="", overwrite=False):
with open(msg_path, "rb") as f:
encrypted = _FERNET.encrypt(f.read())
enc_filename = Path(filename).stem + ".emsg"
data = {"folder": folder}
if overwrite:
data["overwrite"] = "1"
resp = requests.post(
UPLOAD_URL,
headers={"Authorization": f"Bearer {TOKEN}"},
files={"file": (enc_filename, encrypted, "application/octet-stream")},
data=data,
timeout=60,
)
if not resp.ok:
raise requests.HTTPError(f"{resp.status_code} {resp.reason} | {resp.text[:200]}")
return resp.json()
def save_and_upload(item, folder="", overwrite=False):
"""SaveAs do temp -> upload (sifrovane). Vraci (filename, server_json)."""
with tempfile.TemporaryDirectory() as tmp:
safe = f"{item.EntryID[-20:]}.msg"
p = Path(tmp) / safe
item.SaveAs(str(p), OLSAVE_MSG)
result = upload_msg(p, safe, folder, overwrite=overwrite)
return safe, result
def capture_new(conn, run_id, item, mid, current, is_read, subject, stats):
"""Novy email: SaveAs -> upload -> insert. Vraci True pri uspechu."""
_, result = save_and_upload(item, current, overwrite=False)
graph_id = result.get("graph_id")
lm = get_lastmod(item)
try:
received = item.ReceivedTime.isoformat() if item.ReceivedTime else None
except Exception:
received = None
try:
sender = item.SenderEmailAddress or ""
except Exception:
sender = ""
conn.execute(
"""INSERT OR IGNORE INTO messages
(message_id, subject, sender, received_at, folder, source,
entry_id, graph_id, is_read, jnj_folder,
not_in_mailbox_anymore, updated_at, last_mod_time, content_uploads)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, datetime('now'), ?, 1)""",
(mid, subject, sender, received, current, SCRIPT_NAME,
item.EntryID, graph_id, is_read, current, lm),
)
conn.commit()
info(conn, run_id, "captured", subject=subject, folder=current, graph_id=graph_id)
print(f" NEW | {subject[:70]}")
return True
def reupload_changed(item, current):
"""Znovu nahraj zmeneny (znamy) email — overwrite na serveru.
Folder="" => server NEdela Graph re-import (jen prepise /msgs soubor)."""
save_and_upload(item, folder="", overwrite=True)
def process_item(conn, run_id, item, current, stats, seen, mode, dry):
try:
mid = get_mid(item)
except Exception:
return
seen.add(mid)
stats["found"] += 1
try:
is_read = 0 if item.UnRead else 1
except Exception:
is_read = 0
subject = str(getattr(item, "Subject", "") or "")
row = db_get(conn, mid)
# ── Novy email (neni v DB) ────────────────────────────────────────────
if row is None:
if mode in ("capture", "full-update"):
if dry:
stats["new_captured"] += 1
print(f" NEW* | {subject[:70]}")
else:
try:
if capture_new(conn, run_id, item, mid, current, is_read, subject, stats):
stats["new_captured"] += 1
except Exception as e:
stats["errors"] += 1
error(conn, run_id, "capture_error", subject=subject, folder=current, detail=str(e))
print(f" CHYBA NEW | {subject[:50]} | {e}")
else: # update-paths — telo nemame, nelze dorovnat
stats["new_uncaptured"] += 1
return
# ── Znamy email — porovnej zmeny ──────────────────────────────────────
changes = {}
current_known = row.get("jnj_folder") or row.get("folder")
if current_known != current:
changes["jnj_folder"] = current
stats["path_updated"] += 1
if row.get("is_read") != is_read:
changes["is_read"] = is_read
stats["read_updated"] += 1
if row.get("not_in_mailbox_anymore"):
changes["not_in_mailbox_anymore"] = 0
changes["left_mailbox_at"] = None
stats["returned"] += 1
# ── DETEKCE ZMENY OBSAHU (v1.3) ───────────────────────────────────────
# Jen u znamých polozek BEZ Message-ID (mid zacina "entryid:") — tam ma
# EntryID stabilni a obsah se muze zmenit pod stejnou identitou (napr.
# dopsana chyba SendAsDenied). Polozky s Message-ID jsou finalizovane.
# Re-upload jen v rezimech, ktere smeji nahravat, a ne v dry-run.
if (mode in ("capture", "full-update") and mid.startswith("entryid:")):
cur_lm = get_lastmod(item)
if cur_lm and cur_lm != row.get("last_mod_time"):
stats["content_updated"] += 1
if dry:
# DRY-RUN: jen napocitej + ukaz, NIC nenahrava (nahled pred ostrym behem)
print(f" REUP* | {subject[:55]} | obsah zmenen -> by se re-uploadl")
else:
try:
reupload_changed(item, current)
changes["last_mod_time"] = cur_lm
changes["content_uploads"] = (row.get("content_uploads") or 1) + 1
print(f" REUP | {subject[:55]} | obsah zmenen -> re-upload")
info(conn, run_id, "content_reupload", subject=subject, folder=current,
detail=f"last_mod {row.get('last_mod_time')} -> {cur_lm}")
except Exception as e:
stats["content_updated"] -= 1
stats["errors"] += 1
error(conn, run_id, "reupload_error", subject=subject, folder=current, detail=str(e))
print(f" CHYBA REUP | {subject[:50]} | {e}")
if changes:
if not dry:
apply_update(conn, mid, changes)
what = []
if "jnj_folder" in changes:
what.append(f"-> {current}")
if "is_read" in changes:
what.append("precteno" if is_read else "neprecteno")
if "not_in_mailbox_anymore" in changes:
what.append("vraceno do schranky")
if "last_mod_time" in changes:
what.append("obsah aktualizovan")
marker = "*" if dry else " "
print(f" UPD{marker} | {subject[:55]} | {', '.join(what)}")
info(conn, run_id, "path_update", subject=subject, folder=current, detail="; ".join(what))
else:
stats["skipped"] += 1
def walk(conn, run_id, folder, folder_path, cutoff_local, stats, seen, mode, dry, limit):
current = f"{folder_path}/{folder.Name}"
try:
items = folder.Items
if cutoff_local is not None:
restrict = ("@SQL=\"urn:schemas:httpmail:datereceived\" >= '%s'"
% cutoff_local.strftime("%Y/%m/%d %H:%M:%S"))
items = items.Restrict(restrict)
items.Sort("[ReceivedTime]", True) # newest first
except Exception as e:
print(f" CHYBA slozka {current}: {e}")
error(conn, run_id, "folder_error", folder=current, detail=str(e))
return
n = 0
for item in items:
if limit and stats["found"] >= limit:
break
try:
if not str(getattr(item, "MessageClass", "")).upper().startswith("IPM.NOTE"):
continue
except Exception:
continue
process_item(conn, run_id, item, current, stats, seen, mode, dry)
n += 1
print(f" {current}: {n} polozek")
info(conn, run_id, "folder_done", folder=current, detail=str(n))
try:
subs = list(folder.Folders)
except Exception:
subs = []
for sub in subs:
if limit and stats["found"] >= limit:
break
walk(conn, run_id, sub, current, cutoff_local, stats, seen, mode, dry, limit)
def _parse_dt(s):
if not s:
return None
try:
dt = datetime.fromisoformat(s)
if dt.tzinfo:
dt = dt.astimezone().replace(tzinfo=None)
return dt
except Exception:
return None
def flag_left_mailbox(conn, run_id, cutoff_local, seen, scanned_roots, stats, dry):
"""Emaily v DB v okne, ktere jsme ve SKENOVANE casti schranky NEvideli ->
opustily pracovni schranku. Ponecha posledni znamou cestu, nastavi priznak."""
cur = conn.execute(
"""SELECT message_id, received_at, jnj_folder, folder, not_in_mailbox_anymore
FROM messages""")
to_flag = []
for mid, received_at, jnjf, fld, flag in cur.fetchall():
if mid in seen or flag:
continue
path = jnjf or fld or ""
if not any(path.startswith(root) for root in scanned_roots):
continue
rec = _parse_dt(received_at)
if rec is None or rec < cutoff_local:
continue
to_flag.append((mid, path))
for mid, path in to_flag:
if not dry:
conn.execute(
"""UPDATE messages SET not_in_mailbox_anymore=1,
left_mailbox_at=datetime('now'), updated_at=datetime('now')
WHERE message_id=?""", (mid,))
stats["left_mailbox"] += 1
print(f" GONE{'*' if dry else ' '} | {path}")
if not dry and to_flag:
conn.commit()
info(conn, run_id, "left_mailbox", detail=str(len(to_flag)))
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description=f"jnj_mailbox_sync v{SCRIPT_VERSION}")
ap.add_argument("--mode", choices=["capture", "update-paths", "full-update"],
default="capture")
ap.add_argument("--days", type=int, default=30,
help="Okno ve dnech pro update-paths/full-update (0 = vse)")
ap.add_argument("--dry-run", action="store_true",
help="Nic nezapise/nenahraje, jen vypise co by udelal")
ap.add_argument("--limit", type=int, default=0, help="Max N polozek (test)")
ap.add_argument("--no-db-upload", action="store_true")
args = ap.parse_args()
mode, dry = args.mode, args.dry_run
if mode == "capture":
cutoff_local = None
else:
cutoff_local = None if args.days == 0 else (datetime.now() - timedelta(days=args.days))
win = "vse" if cutoff_local is None else f"{args.days} dni (od {cutoff_local:%Y-%m-%d %H:%M})"
print(f"=== jnj_mailbox_sync v{SCRIPT_VERSION} ===")
print(f"Start: {datetime.now():%Y-%m-%d %H:%M:%S}")
print(f"Rezim: {mode} Okno: {win} {'[DRY-RUN — nic se nemeni]' if dry else ''}")
print(f"DB: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
init_db(conn)
run_id = start_run(conn, mode, args.days, dry)
outlook = win32com.client.Dispatch("Outlook.Application")
ns = outlook.GetNamespace("MAPI")
stats = {"found": 0, "new_captured": 0, "new_uncaptured": 0, "path_updated": 0,
"read_updated": 0, "returned": 0, "left_mailbox": 0, "content_updated": 0,
"skipped": 0, "errors": 0}
seen = set()
scanned_roots = set()
for fid, label in SYNC_FOLDERS:
root = ns.GetDefaultFolder(fid)
mailbox = root.Parent.Name
scanned_roots.add(f"/{mailbox}/{root.Name}")
print(f"\n=== {label} ({mailbox}) ===")
walk(conn, run_id, root, f"/{mailbox}", cutoff_local, stats, seen, mode, dry, args.limit)
# ── Archive v PRIMARNI schrance (v1.4) ─────────────────────────────────
# Archive (jednoklikove archivovani) NENI default folder -> hleda se podle
# jmena pod korenem primarni schranky (inbox.Parent = koren te same schranky,
# takze Online Archive = jiny store se SEM nepriplete).
try:
mbox_root = ns.GetDefaultFolder(6).Parent
mailbox = mbox_root.Name
archive = None
for f in mbox_root.Folders:
try:
if str(f.Name).strip().lower() == "archive":
archive = f
break
except Exception:
continue
if archive is not None:
scanned_roots.add(f"/{mailbox}/{archive.Name}")
print(f"\n=== Archive ({mailbox}) ===")
walk(conn, run_id, archive, f"/{mailbox}", cutoff_local, stats, seen, mode, dry, args.limit)
else:
print("\n(Archive slozka v primarni schrance nenalezena -> preskakuji)")
except Exception as e:
print(f"\n(Archive scan preskocen: {e})")
if mode in ("update-paths", "full-update") and cutoff_local is not None and not (args.limit):
print("\n--- Kontrola 'opustilo schranku' (v okne, Inbox/Sent/Deleted) ---")
flag_left_mailbox(conn, run_id, cutoff_local, seen, scanned_roots, stats, dry)
elif args.limit:
print("\n(--limit aktivni -> detekce 'opustilo schranku' preskocena)")
finish_run(conn, run_id, stats)
# ── Souhrn ─────────────────────────────────────────────────────────────
print(f"\n{'='*60}")
print(f"SOUHRN [{mode}{' / DRY-RUN' if dry else ''}]")
print(f" Nalezeno ve schrance: {stats['found']}")
if mode in ("capture", "full-update"):
lbl = "by se nahralo" if dry else "nahrano"
print(f" Nove zachyceno ({lbl}): {stats['new_captured']}")
else:
print(f" Nove (bez tela, nedorovnano):{stats['new_uncaptured']}")
print(f" Aktualizovana cesta: {stats['path_updated']}")
print(f" Zmena precteno/neprecteno: {stats['read_updated']}")
print(f" Vraceno do schranky: {stats['returned']}")
print(f" Obsah zmenen (re-upload): {stats['content_updated']}")
print(f" Opustilo schranku (GONE): {stats['left_mailbox']}")
print(f" Beze zmeny (skip): {stats['skipped']}")
print(f" Chyby: {stats['errors']}")
print(f"{'='*60}")
if dry:
print("DRY-RUN: SQLite ani server se NEMENILY.")
elif not args.no_db_upload:
print("\nUpload SQLite na server...")
upload_db(DB_PATH)
print(f"\nKonec: {datetime.now():%Y-%m-%d %H:%M:%S}")
if stats["errors"]:
print(f"Chyby logovany do: {LOG_PATH}")
conn.close()
def upload_db(db_path):
"""Komprese (lzma/xz, max) -> Fernet sifra -> upload jako .db.xz.enc."""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"jnjemails_{ts}.db"
try:
with open(db_path, "rb") as f:
raw = f.read()
compressed = lzma.compress(raw, preset=9 | lzma.PRESET_EXTREME)
encrypted = _FERNET.encrypt(compressed)
enc_filename = filename + ".xz.enc"
resp = requests.post(
DB_UPLOAD_URL,
headers={"Authorization": f"Bearer {TOKEN}"},
files={"file": (enc_filename, encrypted, "application/octet-stream")},
timeout=300,
)
mb_raw, mb_xz, mb_enc = (len(raw) / 1048576,
len(compressed) / 1048576,
len(encrypted) / 1048576)
print(f" DB upload: {resp.json()} "
f"({mb_raw:.1f} MB -> xz {mb_xz:.1f} MB -> enc {mb_enc:.1f} MB)")
except Exception as e:
print(f" DB upload CHYBA: {e}")
if __name__ == "__main__":
main()
+102
View File
@@ -0,0 +1,102 @@
# jnj_mailbox_sync v1.5.0
**Soubor:** `jnj_mailbox_sync_v1.5.py`
**Datum:** 2026-06-16
**Autor:** vladimir.buzalka
**Běží:** JNJ stroj (Outlook MAPI), Python z Thonny.
## Co to je
Synchronizace JNJ Outlooku (MAPI) → osobní schránka (přes msgreceiver) + bookkeeping
v SQLite (`C:\Users\vbuzalka\SQLITE\jnjemails.db`). Sleduje přesuny e-mailů mezi
složkami a příznak „už není ve schránce" — bez opětovného přenosu těla.
Skenované složky: **Inbox + Sent Items + Deleted Items + Archive** (vč. podsložek).
## Novinka v1.5 — provenance verze skriptu na úrovni entry
Do tabulky `messages` přidány dva sloupce (jen pro náhled, **Tower je nezpracovává**
nejsou v mirroru do `jnj_messages`):
| Sloupec | Význam |
|---|---|
| `captured_by_version` | verze skriptu, která entry **poprvé zachytila/odeslala** (set při INSERT) |
| `last_upload_version` | verze, která naposledy **re-uploadla tělo** (set při INSERT i při re-uploadu) |
Smysl: kdykoliv se podívat (`jnjemails` SQL), kterou verzí byl daný e-mail přenesen.
**Pravidlo:** při jakékoliv změně skriptu vždy bumpni verzi (`SCRIPT_VERSION`) — jinak
tahle stopa ztrácí smysl. Migrace přes `ALTER TABLE` (staré řádky = NULL).
## Novinka v1.4 — skenování složky Archive (primární schránka)
Přidána složka **Archive** (jednoklikové archivování v Outlooku) v **primární** schránce.
Archive **není** default folder, takže se hledá podle jména `"Archive"` pod kořenem
primární schránky (`Inbox.Parent`) a přidává se do `scanned_roots` (aby se její položky
nehodnotily jako „opustilo schránku"). **Online Archive** (samostatný store) se i nadále
**neskenuje**. Řeší případy, kdy odeslaná kopie skončila v Archive (jinak chyběla domácímu
přehledu i párování dvojčat).
## Novinka v1.3 — detekce změny obsahu (re-upload změněného e-mailu)
**Problém:** e-mail **bez Message-ID** (typicky **NEODESLANÝ** Sent kvůli `SendAsDenied`,
nebo čerstvě odeslaný, kde Exchange ještě nedoplnil Message-ID) má **stabilní EntryID**.
Když do něj Outlook **po zachycení** dopíše chybu odeslání, obsah se změní, ale identita
(`entryid:<EID>`) zůstane → starý sync to vyhodnotil jako „známé, beze změny" a
aktualizovaný (chybový) e-mail už domů **nepřenesl**. Naproti tomu úspěšně odeslaný
e-mail dostane **nové EntryID + Message-ID**, takže se zachytil jako nový. Vznikla
asymetrie: failed-update se ztrácel.
**Řešení:** identita zůstává (Message-ID / `entryid:`), ale navíc se sleduje **verzní otisk**
= `PR_LAST_MODIFICATION_TIME` (`0x30080040`). U **známé položky bez Message-ID**
(`mid` začíná `entryid:`) se otisk porovná; když se posunul, e-mail se znovu uloží
(`SaveAs`) a nahraje s `overwrite=1` → server přepíše původní `.msg` na místě → Tower ho
přeparsuje → dokument v Mongu se aktualizuje (vč. těla s chybou).
- Hlídání je **levné** — druhé čtení property jen u známých no-ID položek (desítky kusů);
položky s Message-ID jsou finalizované a nesledují se.
- Re-upload běží jen v režimech, které smějí nahrávat (**capture, full-update**), a posílá se
s `folder=""` → server **nedělá** Graph re-import (žádný duplikát v Graph zrcadle).
- **Vyžaduje msgreceiver app.py ≥ v2.4** (overwrite na `/upload`). Bez něj se re-upload chová
jako starý skip (nepřepíše, ale nic nerozbije) — pořadí nasazení server → JNJ bez výpadku.
## Nové sloupce SQLite
- `messages.last_mod_time` — PR_LAST_MODIFICATION_TIME při posledním zachycení (otisk).
- `messages.content_uploads` — kolikrát se tělo nahrálo (1 = jen první zachycení).
- `runs.content_updated` — kolik e-mailů se v běhu re-uploadlo kvůli změně obsahu.
(Migrace přes stávající `ALTER TABLE` smyčku — staré `jnjemails.db` se doplní automaticky.)
## Argumenty
`--mode {capture,update-paths,full-update}` (default capture), `--days N`
(0 = celé), `--dry-run`, `--limit N`, `--no-db-upload`.
## Spouštění (JNJ stroj, plné cesty)
```
"C:\Users\vbuzalka\AppData\Local\Programs\Thonny\python.exe" "c:\Users\vbuzalka\OneDrive - JNJ\##JNJPrenos\Python\jnj_mailbox_sync_v1.5.py" --mode full-update --days 60
```
`full-update --days 60` = dorovná chybějící + **re-uploadne změněné** (chybové) Sent položky
za poslední 60 dní. To je doporučený běh pro „aktualizovat i neodeslané".
## Revert
Stará verze: `Trash/jnj_mailbox_sync_v1.4.py` (bez provenance sloupců),
`…_v1.3.py` (bez skenování Archive), `…_v1.2.py` (bez detekce změny). Server v2.4
zůstává zpětně kompatibilní (overwrite je opt-in), takže revert na JNJ straně
nevyžaduje zásah na serveru.
## Historie
- **1.0.0** — režimy capture/update-paths/full-update, sledování přesunů, updated_at.
- **1.1.0** — + Deleted Items do skenovaných složek.
- **1.2.0** — upload SQLite komprimován (lzma/xz max) + šifrován (Fernet) → `.db.xz.enc`.
- **1.3.0** — + detekce změny obsahu přes `PR_LAST_MODIFICATION_TIME`: známé no-ID
položky, které se po zachycení změnily (např. dopsaná chyba `SendAsDenied`), se znovu
nahrávají s `overwrite=1`. Nové sloupce `last_mod_time`, `content_uploads`,
`runs.content_updated`. Vyžaduje app.py ≥ v2.4.
- **1.4.0** — + skenování složky **Archive** v primární schránce (hledá se podle jména
pod kořenem schránky, ne přes default folder; Online Archive se neskenuje).
- **1.5.0** — + provenance verze na úrovni entry: sloupce `captured_by_version`
a `last_upload_version` (jen náhled, Tower nezpracovává).
+707
View File
@@ -0,0 +1,707 @@
"""
jnj_mailbox_sync v1.5
Nazev: jnj_mailbox_sync_v1.5.py
Verze: 1.5.0
Datum: 2026-06-16
Autor: vladimir.buzalka
Popis:
Synchronizace JNJ Outlooku (MAPI) -> osobni schranka + bookkeeping v SQLite.
Nasledník inbox_full_sync_v1.1 / jnj_mailbox_sync_v1.2. Sleduje PRESUN emailu
mezi slozkami a priznak "uz neni ve schrance" — BEZ opetovneho prenosu tela.
Scope: primarni schranka, Inbox + Sent Items + Deleted Items + Archive
vcetne vsech podsložek. Slozka Archive (jednoklikove archivovani v Outlooku)
NENI default folder — hleda se podle jmena pod korenem primarni schranky.
Online Archive (samostatny store) se i nadale NEskenuje.
Identita emailu = Internet Message-ID (stabilni pres presuny). Kdyz Message-ID
chybi (typicky cerstve odeslane / NEODESLANE Sent polozky — Exchange ho doplni
az po skutecnem transportu), pouzije se fallback "entryid:<EntryID>".
Sloupce cest v SQLite:
folder = cesta pri PRVNIM zachyceni (historie, neprepisuje se)
jnj_folder = AKTUALNI ziva cesta (prepisuje se pri presunu)
updated_at se bumpne pri insertu i kazde zmene — watermark pro domaci sync.
NOVINKA v1.3 — DETEKCE ZMENY OBSAHU (re-upload zmeneneho emailu)
Problem: e-mail bez Message-ID (napr. NEODESLANY Sent kvuli SendAsDenied) ma
STABILNI EntryID. Kdyz do nej Outlook PO zachyceni dopise chybu odeslani,
obsah se zmeni, ale identita (entryid:<EID>) zustane — stary sync to vyhodnotil
jako "zname, beze zmeny" a aktualizovany (chybovy) e-mail uz domu NEPRENESL.
Naproti tomu uspesne odeslany e-mail dostane NOVE EntryID + Message-ID, takze
se zachytil jako novy. Vznikla asymetrie: failed-update se ztracel.
Reseni: identita zustava (Message-ID / entryid:), ale navic se sleduje VERZNI
OTISK = PR_LAST_MODIFICATION_TIME (0x30080040). U ZNAMEHO emailu BEZ Message-ID
(mid zacina "entryid:") se otisk porovna; kdyz se posunul, e-mail se znovu
ulozi (SaveAs) a nahraje s priznakem overwrite=true (server prepise puvodni
.msg na miste -> Tower ho preparsuje -> dokument v Mongu se aktualizuje, vc.
tela s chybou). Tim doteche i "zmeneny hustak". Hlidani je levne — druhe cteni
property jen u znamych no-ID polozek (desitky kusu); polozky s Message-ID jsou
finalizovane a nesleduji se.
Re-upload bezi jen v rezimech, ktere smeji nahravat (capture, full-update),
a posila se BEZ folderu (folder="") => server NEdela Graph re-import (zadny
duplikat v Graph zrcadle); jen prepise /msgs soubor pro Tower parse.
Vyzaduje msgreceiver app.py >= v2.4 (overwrite na /upload). Bez nej se
re-upload chova jako "exists" (stary skip) — neprepise, ale nic nerozbije.
Upload SQLite (zustava z v1.2): DB se pred odeslanim KOMPRIMUJE (lzma/xz, max) a
SIFRUJE (Fernet, klic z TOKENu) a nahrava jako .db.xz.enc.
Rezimy (--mode):
capture (default) Projde cely Inbox+Sent+Deleted, nove emaily ulozi a
nahraje + NOVE re-uploadne zmenene znamé no-ID polozky.
Okno --days se IGNORUJE (bere VSE).
update-paths Jen METADATA cesty/precteno + "opustilo schranku". NIC nenahrava
(ani re-upload).
full-update update-paths + dorovna chybejici (SaveAs+upload) + re-upload
zmenenych znamých no-ID polozek.
Argumenty:
--mode {capture,update-paths,full-update} default capture
--days N velikost okna ve dnech (default 30). 0 = cely Inbox+Sent.
--dry-run NIC nezapise/nenahraje, jen vypise co by udelal.
--limit N zpracovat max N polozek (rychly test).
--no-db-upload na konci nenahravat SQLite na server.
Spousteni:
# Refresh poslednich 60 dni + zachytit zmenene (chybove) Sent polozky:
python jnj_mailbox_sync_v1.3.py --mode full-update --days 60
Zavislosti:
pywin32, requests, cryptography, sqlite3 + lzma (stdlib).
Python 3.10+, Windows, Outlook musi byt spusteny a prihlaseny.
Historie verzi:
1.0.0 2026-06-09 Rezimy capture/update-paths/full-update, sledovani presunu,
not_in_mailbox_anymore, updated_at watermark.
1.1.0 2026-06-10 + Deleted Items do SYNC_FOLDERS.
1.2.0 2026-06-10 Upload SQLite komprimovan (lzma) + sifrovan (Fernet) ->
.db.xz.enc. Vyzaduje app.py >= v2.1.
1.3.0 2026-06-16 + DETEKCE ZMENY OBSAHU pres PR_LAST_MODIFICATION_TIME:
zname no-ID polozky (entryid:), ktere se po zachyceni
zmenily (napr. dopsana chyba SendAsDenied), se znovu
nahravaji s overwrite=true. Nove SQLite sloupce
last_mod_time, content_uploads; runs.content_updated.
Vyzaduje app.py >= v2.4 (overwrite na /upload).
1.4.0 2026-06-16 + skenovani slozky Archive v PRIMARNI schrance (ne Online
Archive). Archive neni default folder -> hleda se podle
jmena ("Archive") pod korenem primarni schranky a pridava
se do scanned_roots (aby se jeji polozky nehodnotily jako
GONE). Resi pripady, kdy odeslana kopie skoncila v Archive.
1.5.0 2026-06-16 + provenance verze skriptu na urovni entry: nove SQLite
sloupce captured_by_version (verze, ktera entry POPRVE
zachytila) a last_upload_version (verze, ktera naposledy
re-uploadla telo). JEN pro nahled — Tower je NEzpracovava
(nejsou v mirroru do jnj_messages). Pravidlo: pri kazde
zmene skriptu verzovat, aby tahle stopa byla uzitecna.
"""
import argparse
import base64
import hashlib
import logging
import lzma
import sqlite3
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
import win32com.client
import requests
import urllib3
from cryptography.fernet import Fernet
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
TOKEN = "13e1bb01-9fd5-44a8-8ce9-4ee27133d340"
UPLOAD_URL = "https://msgs.buzalka.cz/upload"
DB_UPLOAD_URL = "https://msgs.buzalka.cz/upload-db"
DB_PATH = r"C:\Users\vbuzalka\SQLITE\jnjemails.db"
LOG_PATH = r"C:\Users\vbuzalka\SQLITE\jnj_mailbox_sync_errors.log"
PR_INTERNET_MESSAGE_ID = "http://schemas.microsoft.com/mapi/proptag/0x1035001E"
PR_LAST_MOD_TIME = "http://schemas.microsoft.com/mapi/proptag/0x30080040" # PR_LAST_MODIFICATION_TIME
SCRIPT_NAME = "jnj_mailbox_sync"
SCRIPT_VERSION = "1.5.0"
# olFolderInbox=6, olFolderSentMail=5, olFolderDeletedItems=3
SYNC_FOLDERS = [(6, "Inbox"), (5, "Sent Items"), (3, "Deleted Items")]
OLSAVE_MSG = 3 # OlSaveAsType.olMSG
# Sifrovaci klic odvozeny z TOKENu (stejny algoritmus jako server)
_FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(TOKEN.encode()).digest()))
logging.basicConfig(
filename=LOG_PATH,
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# ──────────────────────────────────────────────────────────────────────────────
# ─── SQLite ───────────────────────────────────────────────────────────────────
def init_db(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
subject TEXT,
sender TEXT,
received_at TEXT,
folder TEXT,
source TEXT,
uploaded_at TEXT DEFAULT (datetime('now')),
entry_id TEXT,
graph_id TEXT,
is_read INTEGER DEFAULT 0,
jnj_folder TEXT,
not_in_mailbox_anymore INTEGER DEFAULT 0,
left_mailbox_at TEXT,
updated_at TEXT,
last_mod_time TEXT,
content_uploads INTEGER DEFAULT 1,
captured_by_version TEXT,
last_upload_version TEXT
)
""")
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)")
conn.execute("""
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
script TEXT NOT NULL,
version TEXT,
started_at TEXT NOT NULL,
finished_at TEXT,
mode TEXT,
window_days INTEGER,
dry_run INTEGER DEFAULT 0,
found INTEGER DEFAULT 0,
new_captured INTEGER DEFAULT 0,
path_updated INTEGER DEFAULT 0,
read_updated INTEGER DEFAULT 0,
returned INTEGER DEFAULT 0,
left_mailbox INTEGER DEFAULT 0,
content_updated INTEGER DEFAULT 0,
skipped INTEGER DEFAULT 0,
errors INTEGER DEFAULT 0
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER REFERENCES runs(id),
level TEXT NOT NULL,
event TEXT NOT NULL,
subject TEXT,
folder TEXT,
graph_id TEXT,
detail TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_log_run_id ON log(run_id)")
# Migrace existujici jnjemails.db — pridej chybejici sloupce
for col, ddl in [
("entry_id", "TEXT"), ("graph_id", "TEXT"), ("is_read", "INTEGER DEFAULT 0"),
("jnj_folder", "TEXT"), ("not_in_mailbox_anymore", "INTEGER DEFAULT 0"),
("left_mailbox_at", "TEXT"), ("updated_at", "TEXT"),
("last_mod_time", "TEXT"), ("content_uploads", "INTEGER DEFAULT 1"),
("captured_by_version", "TEXT"), ("last_upload_version", "TEXT"),
]:
try:
conn.execute(f"ALTER TABLE messages ADD COLUMN {col} {ddl}")
except Exception:
pass
for col, ddl in [
("mode", "TEXT"), ("window_days", "INTEGER"), ("dry_run", "INTEGER DEFAULT 0"),
("found", "INTEGER DEFAULT 0"), ("new_captured", "INTEGER DEFAULT 0"),
("path_updated", "INTEGER DEFAULT 0"), ("read_updated", "INTEGER DEFAULT 0"),
("returned", "INTEGER DEFAULT 0"), ("left_mailbox", "INTEGER DEFAULT 0"),
("content_updated", "INTEGER DEFAULT 0"),
]:
try:
conn.execute(f"ALTER TABLE runs ADD COLUMN {col} {ddl}")
except Exception:
pass
conn.execute("CREATE INDEX IF NOT EXISTS idx_updated_at ON messages(updated_at)")
conn.commit()
def start_run(conn, mode, days, dry):
cur = conn.execute(
"""INSERT INTO runs (script, version, started_at, mode, window_days, dry_run)
VALUES (?, ?, datetime('now'), ?, ?, ?)""",
(SCRIPT_NAME, SCRIPT_VERSION, mode, days, 1 if dry else 0),
)
conn.commit()
return cur.lastrowid
def finish_run(conn, run_id, stats):
conn.execute(
"""UPDATE runs SET finished_at=datetime('now'),
found=?, new_captured=?, path_updated=?, read_updated=?,
returned=?, left_mailbox=?, content_updated=?, skipped=?, errors=?
WHERE id=?""",
(stats["found"], stats["new_captured"], stats["path_updated"],
stats["read_updated"], stats["returned"], stats["left_mailbox"],
stats["content_updated"], stats["skipped"], stats["errors"], run_id),
)
conn.commit()
def db_log(conn, run_id, level, event, subject=None, folder=None, graph_id=None, detail=None):
conn.execute(
"""INSERT INTO log (run_id, level, event, subject, folder, graph_id, detail)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(run_id, level, event, subject, folder, graph_id, detail),
)
conn.commit()
def info(conn, run_id, event, **kw):
db_log(conn, run_id, "INFO", event, **kw)
def error(conn, run_id, event, **kw):
db_log(conn, run_id, "ERROR", event, **kw)
def db_get(conn, mid):
cur = conn.execute(
"""SELECT message_id, folder, jnj_folder, is_read, not_in_mailbox_anymore,
last_mod_time, content_uploads
FROM messages WHERE message_id=?""", (mid,))
r = cur.fetchone()
if not r:
return None
return {"message_id": r[0], "folder": r[1], "jnj_folder": r[2],
"is_read": r[3], "not_in_mailbox_anymore": r[4],
"last_mod_time": r[5], "content_uploads": r[6]}
def apply_update(conn, mid, changes):
sets, vals = [], []
for k, v in changes.items():
sets.append(f"{k}=?")
vals.append(v)
sets.append("updated_at=datetime('now')")
vals.append(mid)
conn.execute(f"UPDATE messages SET {', '.join(sets)} WHERE message_id=?", vals)
conn.commit()
# ─── Outlook / prenos ────────────────────────────────────────────────────────
def get_mid(item) -> str:
try:
mid = item.PropertyAccessor.GetProperty(PR_INTERNET_MESSAGE_ID)
except Exception:
mid = None
return mid or f"entryid:{item.EntryID}"
def get_lastmod(item):
"""PR_LAST_MODIFICATION_TIME jako ISO string (verzni otisk). None pri chybe."""
try:
v = item.PropertyAccessor.GetProperty(PR_LAST_MOD_TIME)
if v is None:
return None
try:
return v.isoformat()
except Exception:
return str(v)
except Exception:
return None
def upload_msg(msg_path, filename, folder="", overwrite=False):
with open(msg_path, "rb") as f:
encrypted = _FERNET.encrypt(f.read())
enc_filename = Path(filename).stem + ".emsg"
data = {"folder": folder}
if overwrite:
data["overwrite"] = "1"
resp = requests.post(
UPLOAD_URL,
headers={"Authorization": f"Bearer {TOKEN}"},
files={"file": (enc_filename, encrypted, "application/octet-stream")},
data=data,
timeout=60,
)
if not resp.ok:
raise requests.HTTPError(f"{resp.status_code} {resp.reason} | {resp.text[:200]}")
return resp.json()
def save_and_upload(item, folder="", overwrite=False):
"""SaveAs do temp -> upload (sifrovane). Vraci (filename, server_json)."""
with tempfile.TemporaryDirectory() as tmp:
safe = f"{item.EntryID[-20:]}.msg"
p = Path(tmp) / safe
item.SaveAs(str(p), OLSAVE_MSG)
result = upload_msg(p, safe, folder, overwrite=overwrite)
return safe, result
def capture_new(conn, run_id, item, mid, current, is_read, subject, stats):
"""Novy email: SaveAs -> upload -> insert. Vraci True pri uspechu."""
_, result = save_and_upload(item, current, overwrite=False)
graph_id = result.get("graph_id")
lm = get_lastmod(item)
try:
received = item.ReceivedTime.isoformat() if item.ReceivedTime else None
except Exception:
received = None
try:
sender = item.SenderEmailAddress or ""
except Exception:
sender = ""
conn.execute(
"""INSERT OR IGNORE INTO messages
(message_id, subject, sender, received_at, folder, source,
entry_id, graph_id, is_read, jnj_folder,
not_in_mailbox_anymore, updated_at, last_mod_time, content_uploads,
captured_by_version, last_upload_version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, datetime('now'), ?, 1, ?, ?)""",
(mid, subject, sender, received, current, SCRIPT_NAME,
item.EntryID, graph_id, is_read, current, lm,
SCRIPT_VERSION, SCRIPT_VERSION),
)
conn.commit()
info(conn, run_id, "captured", subject=subject, folder=current, graph_id=graph_id)
print(f" NEW | {subject[:70]}")
return True
def reupload_changed(item, current):
"""Znovu nahraj zmeneny (znamy) email — overwrite na serveru.
Folder="" => server NEdela Graph re-import (jen prepise /msgs soubor)."""
save_and_upload(item, folder="", overwrite=True)
def process_item(conn, run_id, item, current, stats, seen, mode, dry):
try:
mid = get_mid(item)
except Exception:
return
seen.add(mid)
stats["found"] += 1
try:
is_read = 0 if item.UnRead else 1
except Exception:
is_read = 0
subject = str(getattr(item, "Subject", "") or "")
row = db_get(conn, mid)
# ── Novy email (neni v DB) ────────────────────────────────────────────
if row is None:
if mode in ("capture", "full-update"):
if dry:
stats["new_captured"] += 1
print(f" NEW* | {subject[:70]}")
else:
try:
if capture_new(conn, run_id, item, mid, current, is_read, subject, stats):
stats["new_captured"] += 1
except Exception as e:
stats["errors"] += 1
error(conn, run_id, "capture_error", subject=subject, folder=current, detail=str(e))
print(f" CHYBA NEW | {subject[:50]} | {e}")
else: # update-paths — telo nemame, nelze dorovnat
stats["new_uncaptured"] += 1
return
# ── Znamy email — porovnej zmeny ──────────────────────────────────────
changes = {}
current_known = row.get("jnj_folder") or row.get("folder")
if current_known != current:
changes["jnj_folder"] = current
stats["path_updated"] += 1
if row.get("is_read") != is_read:
changes["is_read"] = is_read
stats["read_updated"] += 1
if row.get("not_in_mailbox_anymore"):
changes["not_in_mailbox_anymore"] = 0
changes["left_mailbox_at"] = None
stats["returned"] += 1
# ── DETEKCE ZMENY OBSAHU (v1.3) ───────────────────────────────────────
# Jen u znamých polozek BEZ Message-ID (mid zacina "entryid:") — tam ma
# EntryID stabilni a obsah se muze zmenit pod stejnou identitou (napr.
# dopsana chyba SendAsDenied). Polozky s Message-ID jsou finalizovane.
# Re-upload jen v rezimech, ktere smeji nahravat, a ne v dry-run.
if (mode in ("capture", "full-update") and mid.startswith("entryid:")):
cur_lm = get_lastmod(item)
if cur_lm and cur_lm != row.get("last_mod_time"):
stats["content_updated"] += 1
if dry:
# DRY-RUN: jen napocitej + ukaz, NIC nenahrava (nahled pred ostrym behem)
print(f" REUP* | {subject[:55]} | obsah zmenen -> by se re-uploadl")
else:
try:
reupload_changed(item, current)
changes["last_mod_time"] = cur_lm
changes["content_uploads"] = (row.get("content_uploads") or 1) + 1
changes["last_upload_version"] = SCRIPT_VERSION
print(f" REUP | {subject[:55]} | obsah zmenen -> re-upload")
info(conn, run_id, "content_reupload", subject=subject, folder=current,
detail=f"last_mod {row.get('last_mod_time')} -> {cur_lm}")
except Exception as e:
stats["content_updated"] -= 1
stats["errors"] += 1
error(conn, run_id, "reupload_error", subject=subject, folder=current, detail=str(e))
print(f" CHYBA REUP | {subject[:50]} | {e}")
if changes:
if not dry:
apply_update(conn, mid, changes)
what = []
if "jnj_folder" in changes:
what.append(f"-> {current}")
if "is_read" in changes:
what.append("precteno" if is_read else "neprecteno")
if "not_in_mailbox_anymore" in changes:
what.append("vraceno do schranky")
if "last_mod_time" in changes:
what.append("obsah aktualizovan")
marker = "*" if dry else " "
print(f" UPD{marker} | {subject[:55]} | {', '.join(what)}")
info(conn, run_id, "path_update", subject=subject, folder=current, detail="; ".join(what))
else:
stats["skipped"] += 1
def walk(conn, run_id, folder, folder_path, cutoff_local, stats, seen, mode, dry, limit):
current = f"{folder_path}/{folder.Name}"
try:
items = folder.Items
if cutoff_local is not None:
restrict = ("@SQL=\"urn:schemas:httpmail:datereceived\" >= '%s'"
% cutoff_local.strftime("%Y/%m/%d %H:%M:%S"))
items = items.Restrict(restrict)
items.Sort("[ReceivedTime]", True) # newest first
except Exception as e:
print(f" CHYBA slozka {current}: {e}")
error(conn, run_id, "folder_error", folder=current, detail=str(e))
return
n = 0
for item in items:
if limit and stats["found"] >= limit:
break
try:
if not str(getattr(item, "MessageClass", "")).upper().startswith("IPM.NOTE"):
continue
except Exception:
continue
process_item(conn, run_id, item, current, stats, seen, mode, dry)
n += 1
print(f" {current}: {n} polozek")
info(conn, run_id, "folder_done", folder=current, detail=str(n))
try:
subs = list(folder.Folders)
except Exception:
subs = []
for sub in subs:
if limit and stats["found"] >= limit:
break
walk(conn, run_id, sub, current, cutoff_local, stats, seen, mode, dry, limit)
def _parse_dt(s):
if not s:
return None
try:
dt = datetime.fromisoformat(s)
if dt.tzinfo:
dt = dt.astimezone().replace(tzinfo=None)
return dt
except Exception:
return None
def flag_left_mailbox(conn, run_id, cutoff_local, seen, scanned_roots, stats, dry):
"""Emaily v DB v okne, ktere jsme ve SKENOVANE casti schranky NEvideli ->
opustily pracovni schranku. Ponecha posledni znamou cestu, nastavi priznak."""
cur = conn.execute(
"""SELECT message_id, received_at, jnj_folder, folder, not_in_mailbox_anymore
FROM messages""")
to_flag = []
for mid, received_at, jnjf, fld, flag in cur.fetchall():
if mid in seen or flag:
continue
path = jnjf or fld or ""
if not any(path.startswith(root) for root in scanned_roots):
continue
rec = _parse_dt(received_at)
if rec is None or rec < cutoff_local:
continue
to_flag.append((mid, path))
for mid, path in to_flag:
if not dry:
conn.execute(
"""UPDATE messages SET not_in_mailbox_anymore=1,
left_mailbox_at=datetime('now'), updated_at=datetime('now')
WHERE message_id=?""", (mid,))
stats["left_mailbox"] += 1
print(f" GONE{'*' if dry else ' '} | {path}")
if not dry and to_flag:
conn.commit()
info(conn, run_id, "left_mailbox", detail=str(len(to_flag)))
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description=f"jnj_mailbox_sync v{SCRIPT_VERSION}")
ap.add_argument("--mode", choices=["capture", "update-paths", "full-update"],
default="capture")
ap.add_argument("--days", type=int, default=30,
help="Okno ve dnech pro update-paths/full-update (0 = vse)")
ap.add_argument("--dry-run", action="store_true",
help="Nic nezapise/nenahraje, jen vypise co by udelal")
ap.add_argument("--limit", type=int, default=0, help="Max N polozek (test)")
ap.add_argument("--no-db-upload", action="store_true")
args = ap.parse_args()
mode, dry = args.mode, args.dry_run
if mode == "capture":
cutoff_local = None
else:
cutoff_local = None if args.days == 0 else (datetime.now() - timedelta(days=args.days))
win = "vse" if cutoff_local is None else f"{args.days} dni (od {cutoff_local:%Y-%m-%d %H:%M})"
print(f"=== jnj_mailbox_sync v{SCRIPT_VERSION} ===")
print(f"Start: {datetime.now():%Y-%m-%d %H:%M:%S}")
print(f"Rezim: {mode} Okno: {win} {'[DRY-RUN — nic se nemeni]' if dry else ''}")
print(f"DB: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
init_db(conn)
run_id = start_run(conn, mode, args.days, dry)
outlook = win32com.client.Dispatch("Outlook.Application")
ns = outlook.GetNamespace("MAPI")
stats = {"found": 0, "new_captured": 0, "new_uncaptured": 0, "path_updated": 0,
"read_updated": 0, "returned": 0, "left_mailbox": 0, "content_updated": 0,
"skipped": 0, "errors": 0}
seen = set()
scanned_roots = set()
for fid, label in SYNC_FOLDERS:
root = ns.GetDefaultFolder(fid)
mailbox = root.Parent.Name
scanned_roots.add(f"/{mailbox}/{root.Name}")
print(f"\n=== {label} ({mailbox}) ===")
walk(conn, run_id, root, f"/{mailbox}", cutoff_local, stats, seen, mode, dry, args.limit)
# ── Archive v PRIMARNI schrance (v1.4) ─────────────────────────────────
# Archive (jednoklikove archivovani) NENI default folder -> hleda se podle
# jmena pod korenem primarni schranky (inbox.Parent = koren te same schranky,
# takze Online Archive = jiny store se SEM nepriplete).
try:
mbox_root = ns.GetDefaultFolder(6).Parent
mailbox = mbox_root.Name
archive = None
for f in mbox_root.Folders:
try:
if str(f.Name).strip().lower() == "archive":
archive = f
break
except Exception:
continue
if archive is not None:
scanned_roots.add(f"/{mailbox}/{archive.Name}")
print(f"\n=== Archive ({mailbox}) ===")
walk(conn, run_id, archive, f"/{mailbox}", cutoff_local, stats, seen, mode, dry, args.limit)
else:
print("\n(Archive slozka v primarni schrance nenalezena -> preskakuji)")
except Exception as e:
print(f"\n(Archive scan preskocen: {e})")
if mode in ("update-paths", "full-update") and cutoff_local is not None and not (args.limit):
print("\n--- Kontrola 'opustilo schranku' (v okne, Inbox/Sent/Deleted) ---")
flag_left_mailbox(conn, run_id, cutoff_local, seen, scanned_roots, stats, dry)
elif args.limit:
print("\n(--limit aktivni -> detekce 'opustilo schranku' preskocena)")
finish_run(conn, run_id, stats)
# ── Souhrn ─────────────────────────────────────────────────────────────
print(f"\n{'='*60}")
print(f"SOUHRN [{mode}{' / DRY-RUN' if dry else ''}]")
print(f" Nalezeno ve schrance: {stats['found']}")
if mode in ("capture", "full-update"):
lbl = "by se nahralo" if dry else "nahrano"
print(f" Nove zachyceno ({lbl}): {stats['new_captured']}")
else:
print(f" Nove (bez tela, nedorovnano):{stats['new_uncaptured']}")
print(f" Aktualizovana cesta: {stats['path_updated']}")
print(f" Zmena precteno/neprecteno: {stats['read_updated']}")
print(f" Vraceno do schranky: {stats['returned']}")
print(f" Obsah zmenen (re-upload): {stats['content_updated']}")
print(f" Opustilo schranku (GONE): {stats['left_mailbox']}")
print(f" Beze zmeny (skip): {stats['skipped']}")
print(f" Chyby: {stats['errors']}")
print(f"{'='*60}")
if dry:
print("DRY-RUN: SQLite ani server se NEMENILY.")
elif not args.no_db_upload:
print("\nUpload SQLite na server...")
upload_db(DB_PATH)
print(f"\nKonec: {datetime.now():%Y-%m-%d %H:%M:%S}")
if stats["errors"]:
print(f"Chyby logovany do: {LOG_PATH}")
conn.close()
def upload_db(db_path):
"""Komprese (lzma/xz, max) -> Fernet sifra -> upload jako .db.xz.enc."""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"jnjemails_{ts}.db"
try:
with open(db_path, "rb") as f:
raw = f.read()
compressed = lzma.compress(raw, preset=9 | lzma.PRESET_EXTREME)
encrypted = _FERNET.encrypt(compressed)
enc_filename = filename + ".xz.enc"
resp = requests.post(
DB_UPLOAD_URL,
headers={"Authorization": f"Bearer {TOKEN}"},
files={"file": (enc_filename, encrypted, "application/octet-stream")},
timeout=300,
)
mb_raw, mb_xz, mb_enc = (len(raw) / 1048576,
len(compressed) / 1048576,
len(encrypted) / 1048576)
print(f" DB upload: {resp.json()} "
f"({mb_raw:.1f} MB -> xz {mb_xz:.1f} MB -> enc {mb_enc:.1f} MB)")
except Exception as e:
print(f" DB upload CHYBA: {e}")
if __name__ == "__main__":
main()
+83
View File
@@ -0,0 +1,83 @@
# jnj_tower_ingest v1.4.0
**Soubor:** `jnj_tower_ingest_v1.4.py`
**Datum:** 2026-06-16
**Autor:** vladimir.buzalka
**Běží:** Docker kontejner `python-runner` na Unraid Tower (192.168.1.76), u MongoDB.
## Co to je
Sjednocený **Tower-side ingest** JNJ e-mailů — fáze v jednom běhu (cron `*/5`):
| Fáze | Co dělá |
|---|---|
| **1. PARSE** | `.msg` z `/mnt/JNJEMAILS` → tělo do Mongo `emaily."vbuzalka@its.jnj.com"`. Inkrementálně přes mtime watermark. Přílohy do SeaweedFS (v1.3). **+ v1.4: detekce neodeslaného e-mailu.** |
| **2. SYNC** | nejnovější SQLite (read-only) → zrcadlo `jnj_messages` + `jnj_folder`/stav do `emaily`. NULL-safe. |
| **RECONCILE** (volitelně `--reconcile`) | **v1.4:** smaže provizorní no-ID Sent duplikáty, ke kterým existuje dvojče s reálným Message-ID. |
| **3. ENRICH** | sdílený `5_enrich_fulltext_emails --mailbox` → PG fulltext. Jen při nových dokumentech. |
Pořadí **parse → sync → (reconcile) → enrich**. Klíč = Internet Message-ID = Mongo `_id`.
## Novinka v1.4 (a) — detekce NEODESLANÉHO e-mailu
PARSE při čtení těla hledá stopy chyby odeslání (`SendAsDenied`, „could not be sent",
`TransportSend operation has failed`, `MapiExceptionSendAsDenied`). Když je najde,
dokument dostane:
- `send_failed: true`
- `send_error: "SendAsDenied (ec=1244) 0x80070005-…"` (vytažený kód, pokud je)
Dotaz na neodeslané: `{ send_failed: true }`.
> Pozn.: chybové tělo se v `.msg` objeví **až** poté, co ho Outlook do položky dopíše;
> na Tower ho přinese **re-upload z `jnj_mailbox_sync v1.3`** (+ overwrite na app.py v2.4).
> Archivní kopie zachycená před selháním chybu nenese.
## Novinka v1.4 (b) — fáze RECONCILE (smaž provizorní duplikáty)
Sent položka **bez Message-ID** (`_id` začíná `filename:`/`entryid:`) je jen **přechodný
snímek** (zachycený dřív, než Exchange doplnil Message-ID). Když k ní existuje **dvojče
s reálným Message-ID** — stejní `to` příjemci + stejný `normalized_subject` + `received_at`
do **24 h** — je provizorní kopie redundantní a **smaže se**. **Neodeslané** (bez dvojčete)
**zůstanou** (a mají `send_failed`).
- Match je na **stabilním obsahu** (e-mailové adresy + normalizovaný předmět + čas),
**ne na EntryID** (ten se mezi provizorní a finální kopií liší).
- Běží **jen s `--reconcile`** (default vypnuto — bezpečné pro cron).
- S `--dry-run` jen **vypíše plán** (nic nemaže). Bez `--dry-run` + s `--reconcile` **maže**.
## Argumenty
`--dry-run`, `--full`, `--limit N`, `--reindex`, `--force`,
`--parse-only` / `--sync-only` / `--enrich-only`, `--no-enrich`, `--enrich-always`,
**`--reconcile`** (nově).
## Spouštění
```bash
# Běžný inkrementální běh (cron) — reconcile NEběží:
docker exec python-runner python3 /scripts/jnj_tower_ingest_v1.4.py
# RECONCILE — nejdřív plán (nic nemaže):
docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.4.py --reconcile --dry-run --sync-only
# RECONCILE — ostře (po kontrole plánu):
docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.4.py --reconcile --sync-only
```
(`--sync-only --reconcile` = jen sync + úklid duplikátů, bez parse/enrich; reconcile
potřebuje `jnj_folder` ze sync. Pro samostatný úklid lze i bez `--sync-only`.)
## Revert
`jnj_tower_ingest_v1.3.py` (bez send_failed + reconcile), starší v `Trash/`.
## Historie verzí
- **1.0.0** — sjednocení parse + sync (mtime watermark).
- **1.1.0** — + fáze ENRICH.
- **1.2.0** — SYNC NULL-safe.
- **1.3.0** — PARSE: přílohy do SeaweedFS.
- **1.4.0** — (a) PARSE detekuje neodeslaný e-mail → `send_failed` + `send_error`.
(b) Fáze RECONCILE (`--reconcile`): smaže provizorní no-ID Sent kopie s ID-dvojčetem
(match to+předmět+čas, ne EntryID); neodeslané ponechá.
File diff suppressed because it is too large Load Diff
+272
View File
@@ -0,0 +1,272 @@
"""
jnj_unsent_probe v1.1
Nazev: jnj_unsent_probe_v1.0.py (verze 1.1.0 — bohatsi vypis)
Verze: 1.1.0
Datum: 2026-06-16
Autor: vladimir.buzalka
Bezi: JNJ stroj (Outlook MAPI), Python z Thonny. JEN CTE, nic nezapisuje/nenahrava.
UCEL (diagnostika):
Cte e-maily PRIMO z ziveho Outlooku (MAPI) a vypisuje "identifikatory
neodeslani", ktere se pri exportu do .msg ztraci nebo nejsou spolehlive.
Slouzi k OVERENI, ktery zivy priznak spolehlive oznaci NEODESLANY e-mail
(napr. hustakova nabidka, kterou Exchange odmitl SendAsDenied).
Pro kazdou nalezenou polozku vypise vedle sebe:
- folder, subject, prijemce
- item.Sent (object model bool — odeslano?)
- PR_MESSAGE_FLAGS + dekodovane bity UNSENT / SUBMIT / READ
- ma Internet Message-ID? (PR_0x1035)
- ma PR_CLIENT_SUBMIT_TIME? (0x0039)
- PR_LAST_VERB_EXECUTED (0x1081)
- body_has_error (zive item.Body obsahuje SendAsDenied / could not be sent?)
- pokud ano -> vypise i snippet chyby
DULEZITE: tohle je SONDA. Z jejiho vystupu se rozhodne, ktery priznak je
spolehlivy detektor, a teprve pak se z toho udela produkcni flagovani.
Filtry (argumenty):
--to SUBSTR jen polozky, jejichz prijemce obsahuje SUBSTR (napr. hustak)
--subject SUBSTR jen polozky s SUBSTR v predmetu (napr. icotrokinra)
--days N okno poslednich N dni dle ReceivedTime (default 90; 0 = vse)
--all vypsat VSE (jinak jen "podezrele" = bez Internet Message-ID)
--limit N max N vypsanych polozek (default 60)
--folders LIST carkou oddelene: inbox,sent,drafts,deleted,outbox,archive
(default vse uvedene)
Priklady:
python jnj_unsent_probe_v1.0.py --to hustak --all
python jnj_unsent_probe_v1.0.py --subject icotrokinra --days 60
"""
import argparse
import sys
from datetime import datetime, timedelta
import win32com.client
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# MAPI proptagy
PR_MESSAGE_FLAGS = "http://schemas.microsoft.com/mapi/proptag/0x0E070003"
PR_INTERNET_MSG_ID = "http://schemas.microsoft.com/mapi/proptag/0x1035001E"
PR_CLIENT_SUBMIT_TIME = "http://schemas.microsoft.com/mapi/proptag/0x00390040"
PR_LAST_VERB = "http://schemas.microsoft.com/mapi/proptag/0x10810003"
# MSGFLAG bity
MSGFLAG_READ = 0x1
MSGFLAG_UNSENT = 0x8
MSGFLAG_SUBMIT = 0x4
# Default folder ID (OlDefaultFolders)
DEFAULT_FOLDERS = {
"inbox": 6, "sent": 5, "drafts": 16, "deleted": 3, "outbox": 4,
}
ERR_MARKERS = ("SendAsDenied", "could not be sent", "TransportSend",
"MapiExceptionSendAs", "nemáte oprávnění", "on behalf of")
def prop(item, tag, default=None):
try:
v = item.PropertyAccessor.GetProperty(tag)
return v if v is not None else default
except Exception:
return default
def get_to(item):
try:
return item.To or ""
except Exception:
return ""
def body_error_snippet(item):
"""Zive telo (item.Body) — obsahuje stopu chyby odeslani?"""
try:
b = item.Body or ""
except Exception:
return None
for m in ERR_MARKERS:
i = b.find(m)
if i >= 0:
return b[max(0, i - 10):i + 90].replace("\r", " ").replace("\n", " ")
return None
def describe(item):
subj = str(getattr(item, "Subject", "") or "")[:42]
to = get_to(item)[:32]
try:
sent = bool(item.Sent)
except Exception:
sent = None
flags = prop(item, PR_MESSAGE_FLAGS, 0) or 0
unsent = bool(flags & MSGFLAG_UNSENT)
submit = bool(flags & MSGFLAG_SUBMIT)
read = bool(flags & MSGFLAG_READ)
mid = prop(item, PR_INTERNET_MSG_ID)
if not mid:
mid = prop(item, "http://schemas.microsoft.com/mapi/proptag/0x1035001F") # unicode varianta
has_mid = bool(mid)
submit_time = prop(item, PR_CLIENT_SUBMIT_TIME)
last_verb = prop(item, PR_LAST_VERB)
err = body_error_snippet(item)
try:
rdate = item.ReceivedTime.strftime("%Y-%m-%d %H:%M") if item.ReceivedTime else "?"
except Exception:
rdate = "?"
try:
eid = str(item.EntryID)[-20:]
except Exception:
eid = "?"
return {
"subject": subj, "to": to, "sent": sent, "flags": flags,
"unsent": unsent, "submit": submit, "read": read,
"has_mid": has_mid, "mid_val": (str(mid)[:60] if mid else "-"),
"submit_time": bool(submit_time),
"last_verb": last_verb, "err": err, "rdate": rdate, "eid": eid,
}
def matches(item, args):
if args.to:
if args.to.lower() not in get_to(item).lower():
try:
# zkus i recipients
rec = "; ".join(str(r.Address or r.Name or "") for r in item.Recipients)
except Exception:
rec = ""
if args.to.lower() not in rec.lower():
return False
if args.subject:
if args.subject.lower() not in str(getattr(item, "Subject", "") or "").lower():
return False
return True
def walk(folder, path, args, cutoff, out, counters):
cur = f"{path}/{folder.Name}"
try:
items = folder.Items
try:
items.Sort("[ReceivedTime]", True)
except Exception:
pass
except Exception:
return
for item in items:
if len(out) >= args.limit:
return
try:
if not str(getattr(item, "MessageClass", "")).upper().startswith("IPM.NOTE"):
continue
except Exception:
continue
if cutoff is not None:
try:
rt = item.ReceivedTime
if rt is not None and rt.replace(tzinfo=None) < cutoff:
continue
except Exception:
pass
if not matches(item, args):
continue
counters["seen"] += 1
d = describe(item)
if (not args.all) and d["has_mid"]:
continue # ma Message-ID -> neni podezrely (pokud neni --all)
d["folder"] = cur
out.append(d)
try:
subs = list(folder.Folders)
except Exception:
subs = []
for sub in subs:
if len(out) >= args.limit:
return
walk(sub, cur, args, cutoff, out, counters)
def find_archive(ns):
try:
root = ns.GetDefaultFolder(6).Parent
for f in root.Folders:
try:
if str(f.Name).strip().lower() == "archive":
return f, root.Name
except Exception:
continue
except Exception:
pass
return None, None
def main():
ap = argparse.ArgumentParser(description="jnj_unsent_probe v1.0 (diagnostika)")
ap.add_argument("--to", default="")
ap.add_argument("--subject", default="")
ap.add_argument("--days", type=int, default=90)
ap.add_argument("--all", action="store_true")
ap.add_argument("--limit", type=int, default=60)
ap.add_argument("--folders", default="inbox,sent,drafts,deleted,outbox,archive")
args = ap.parse_args()
cutoff = None if args.days == 0 else (datetime.now() - timedelta(days=args.days))
want = [x.strip().lower() for x in args.folders.split(",") if x.strip()]
print(f"=== jnj_unsent_probe v1.0 ===")
print(f"Filtr: to~'{args.to}' subject~'{args.subject}' okno={'vse' if cutoff is None else str(args.days)+'d'} "
f"| {'VSE' if args.all else 'jen bez Message-ID'} | slozky={want}")
outlook = win32com.client.Dispatch("Outlook.Application")
ns = outlook.GetNamespace("MAPI")
out = []
counters = {"seen": 0}
for name in want:
if len(out) >= args.limit:
break
if name == "archive":
arch, mbox = find_archive(ns)
if arch is not None:
walk(arch, f"/{mbox}", args, cutoff, out, counters)
else:
print(" (Archive nenalezena)")
continue
fid = DEFAULT_FOLDERS.get(name)
if not fid:
continue
try:
root = ns.GetDefaultFolder(fid)
except Exception as e:
print(f" ({name} nedostupna: {e})")
continue
walk(root, f"/{root.Parent.Name}", args, cutoff, out, counters)
print(f"\nProsmatrovano polozek: {counters['seen']} vypsano: {len(out)}\n")
n_unsent = n_noid = n_err = 0
for i, d in enumerate(out, 1):
if d["unsent"]:
n_unsent += 1
if not d["has_mid"]:
n_noid += 1
if d["err"]:
n_err += 1
print(f"[{i}] {d['folder']} ({d['rdate']})")
print(f" subject : {d['subject']}")
print(f" to : {d['to']}")
print(f" Sent={d['sent']} UNSENT={d['unsent']} SUBMIT={d['submit']} "
f"has_MsgID={d['has_mid']} submit_time={d['submit_time']} ERR={'YES' if d['err'] else '-'}")
print(f" MsgID : {d['mid_val']}")
print(f" EntryID[-20:] (=jmeno .msg): {d['eid']}")
if d["err"]:
print(f" ERR : ...{d['err']}...")
print()
print(f"SOUHRN: vypsano={len(out)} UNSENT-flag={n_unsent} bez-MsgID={n_noid} s-chybou-v-tele={n_err}")
if __name__ == "__main__":
main()