Merge remote-tracking branch 'origin/master'

This commit is contained in:
Vladimir Buzalka
2026-04-24 06:46:57 +02:00
4 changed files with 309 additions and 3 deletions
+80
View File
@@ -0,0 +1,80 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Porovná pojišťovnu z VZP odpovědi (MySQL) vs. pojišťovnu v Medicusu (Firebird)
pro nejvyšší k_datu v tabulce vzp_stav_pojisteni.
"""
import sys
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from Knihovny.mysql_db import connect_mysql
from Knihovny.medicus_db import MedicusDB
HOST = "192.168.1.10"
DB_PATH = r"M:\Medicus\Data\Medicus.fdb"
# ── připojení ──────────────────────────────────────────────────────────────
mysql = connect_mysql()
db = MedicusDB(HOST, DB_PATH)
# ── nejvyšší k_datu ────────────────────────────────────────────────────────
with mysql.cursor() as cur:
cur.execute("SELECT MAX(k_datu) FROM vzp_stav_pojisteni")
max_datum = cur.fetchone()[0]
if max_datum is None:
print("Tabulka vzp_stav_pojisteni je prázdná.")
sys.exit(1)
print(f"Porovnávám k datu: {max_datum}\n")
# ── načtení VZP výsledků pro max_datum ────────────────────────────────────
with mysql.cursor() as cur:
cur.execute(
"SELECT rc, kod_pojistovny FROM vzp_stav_pojisteni WHERE k_datu = %s",
(max_datum,)
)
vzp_data = {row[0]: row[1] for row in cur.fetchall()}
# ── načtení registrovaných pacientů z Medicusu ────────────────────────────
pacienti = db.get_active_registered_patients()
medicus_data = {rc.strip(): str(poj).strip() for rc, _, _, poj in pacienti if rc}
# ── porovnání ──────────────────────────────────────────────────────────────
shoda = []
rozdil = []
chybi_vzp = []
for rc, med_poj in medicus_data.items():
if rc not in vzp_data:
chybi_vzp.append((rc, med_poj))
continue
vzp_poj = (vzp_data[rc] or "").strip()
if med_poj == vzp_poj:
shoda.append(rc)
else:
rozdil.append((rc, med_poj, vzp_poj))
# ── výsledek ───────────────────────────────────────────────────────────────
print(f"Shoduje se: {len(shoda)}")
print(f"Liší se: {len(rozdil)}")
print(f"Chybí ve VZP: {len(chybi_vzp)}")
if rozdil:
print("\n--- ROZDÍLY (rc | Medicus | VZP) ---")
for rc, med, vzp in rozdil:
print(f" {rc:15s} Medicus={med:5s} VZP={vzp}")
if chybi_vzp:
print("\n--- CHYBÍ VE VZP (nebylo kontrolováno k tomuto datu) ---")
for rc, poj in chybi_vzp[:20]:
print(f" {rc:15s} poj={poj}")
if len(chybi_vzp) > 20:
print(f" ... a dalších {len(chybi_vzp) - 20}")
db.close()
mysql.close()
+111
View File
@@ -0,0 +1,111 @@
"""
EmailMessagingGraph.py
----------------------
Private Microsoft Graph mail sender
Application permissions, shared mailbox
"""
import base64
import msal
import requests
from functools import lru_cache
from pathlib import Path
from typing import Union, List
# =========================
# PRIVATE CONFIG (ONLY YOU)
# =========================
TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
SENDER = "reports@buzalka.cz"
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPE = ["https://graph.microsoft.com/.default"]
@lru_cache(maxsize=1)
def _get_token() -> str:
app = msal.ConfidentialClientApplication(
CLIENT_ID,
authority=AUTHORITY,
client_credential=CLIENT_SECRET,
)
token = app.acquire_token_for_client(scopes=SCOPE)
if "access_token" not in token:
raise RuntimeError(f"Graph auth failed: {token}")
return token["access_token"]
def send_mail(
to: Union[str, List[str]],
subject: str,
body: str = "",
*,
html: bool = False,
attachments: Union[str, Path, List[Union[str, Path]], None] = None,
):
"""
Send email via Microsoft Graph.
:param to: email or list of emails
:param subject: subject
:param body: email body (default empty)
:param html: True = HTML, False = plain text
:param attachments: file path or list of file paths to attach
"""
if isinstance(to, str):
to = [to]
if attachments is None:
attachments = []
elif isinstance(attachments, (str, Path)):
attachments = [attachments]
attachment_payloads = []
for path in attachments:
path = Path(path)
attachment_payloads.append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": path.name,
"contentType": "application/octet-stream",
"contentBytes": base64.b64encode(path.read_bytes()).decode(),
})
payload = {
"message": {
"subject": subject,
"body": {
"contentType": "HTML" if html else "Text",
"content": body,
},
"toRecipients": [
{"emailAddress": {"address": addr}} for addr in to
],
**({"attachments": attachment_payloads} if attachment_payloads else {}),
},
"saveToSentItems": "true",
}
headers = {
"Authorization": f"Bearer {_get_token()}",
"Content-Type": "application/json",
}
r = requests.post(
f"https://graph.microsoft.com/v1.0/users/{SENDER}/sendMail",
headers=headers,
json=payload,
timeout=30,
)
if r.status_code != 202:
raise RuntimeError(
f"sendMail failed [{r.status_code}]: {r.text}"
)
+9 -3
View File
@@ -1,6 +1,12 @@
import socket
import sys
import pymysql
def _print(msg):
print(msg, file=sys.stdout, flush=True) if sys.stdout.encoding and sys.stdout.encoding.lower() in ("utf-8", "utf8") \
else print(msg.encode("utf-8", errors="replace").decode("ascii", errors="replace"), flush=True)
_LOCAL_HOSTS = {"lekar", "sestra", "lenovo"}
@@ -21,10 +27,10 @@ def connect_mysql(user="root", password="Vlado9674+", database="medevio",
for host in candidates:
try:
conn = pymysql.connect(host=host, **params)
print(f"[mysql_db] Připojeno přes {host} (hostname: {hostname})")
_print(f"[mysql_db] Pripojeno pres {host} (hostname: {hostname})")
return conn
except Exception as e:
print(f"[mysql_db] {host} selhal: {e}")
_print(f"[mysql_db] {host} selhal: {e}")
last_error = e
raise RuntimeError(f"MySQL nedostupné na žádné adrese. Poslední chyba: {last_error}")
raise RuntimeError(f"MySQL nedostupne na zadne adrese. Posledni chyba: {last_error}")
@@ -0,0 +1,109 @@
"""
Stáhne daily Str8ts puzzle jako PDF ze solitaire.org a uloží do stejné složky.
Název souboru: yyyy-mm-dd Daily Str8ts puzzle.pdf
"""
import asyncio
import sys
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
from datetime import date
from pathlib import Path
from playwright.async_api import async_playwright
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "Knihovny"))
from EmailMessagingGraph import send_mail
from najdi_dropbox import get_dropbox_root
OUTPUT_DIR = Path(get_dropbox_root()) / "!!!Days" / "Downloads Z230"
URL = "https://www.solitaire.org/daily-str8ts/"
RECIPIENT = ["vladimir.buzalka@buzalka.cz", "alica.buzalkova@buzalka.cz"]
EMAIL_BODY = """Str8ts — pravidla
Hrací pole 9×9, každá buňka je buď bílá nebo černá.
1. Číslice 19 — do bílých buněk piš čísla 19.
2. Žádné opakování v řádku/sloupci — stejné číslo se nesmí opakovat v celém řádku ani sloupci (jako Sudoku).
3. Straights (sekvence) — bílé buňky oddělené černými tvoří skupiny. Čísla v každé skupině musí tvořit sadu po sobě jdoucích čísel (v libovolném pořadí). Např. skupinka tří buněk může obsahovat {3,4,5} nebo {7,8,9}, ale ne {1,3,5}.
4. Délka sekvence — skupina o délce n musí obsahovat právě n různých čísel jdoucích za sebou.
5. Černé buňky s číslem — někdy mají předvyplněné číslo jako nápovědu; toto číslo se nepočítá do sekvencí, ale blokuje opakování v řádku/sloupci.
Rozdíl od Sudoku: nemusíš vyplnit 19 do každé skupiny — jen zajistit, že čísla v každé skupině tvoří „straight" (jako v pokeru).
"""
async def main():
today = date.today().strftime("%Y-%m-%d")
output_path = OUTPUT_DIR / f"{today} Daily Str8ts puzzle.pdf"
if output_path.exists():
print(f"Soubor již existuje: {output_path}")
return
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
viewport={"width": 1280, "height": 900},
)
page = await context.new_page()
print(f"Načítám {URL} ...")
await page.goto(URL, wait_until="networkidle", timeout=60_000)
# Najdi iframe s hrou (solitaire.org vkládá hru do iframe)
game_frame = None
for frame in page.frames:
if frame.url != page.url and frame.url.strip() not in ("", "about:blank"):
game_frame = frame
print(f" Nalezen iframe: {frame.url}")
break
target = game_frame if game_frame else page
# Zkus kliknout na Print tlačítko (různé možné selektory)
print_selectors = [
"text=Print",
"button:has-text('Print')",
"[title*='print' i]",
"[aria-label*='print' i]",
".print-button",
"#print",
]
clicked = False
for sel in print_selectors:
try:
await target.click(sel, timeout=3_000)
clicked = True
print(f" Kliknuto na Print ({sel})")
await page.wait_for_timeout(1_500)
break
except Exception:
pass
if not clicked:
print(" Tlačítko Print nenalezeno — ukládám celou stránku jako PDF.")
# Uložit jako PDF
await page.pdf(
path=str(output_path),
format="A4",
print_background=True,
margin={"top": "10mm", "bottom": "10mm", "left": "10mm", "right": "10mm"},
)
print(f"PDF uloženo: {output_path}")
await browser.close()
send_mail(
to=RECIPIENT,
subject="Posílám dnešní Str8ts puzzle v příloze",
body=EMAIL_BODY,
attachments=output_path,
)
print(f"Email odeslán na {RECIPIENT}")
if __name__ == "__main__":
asyncio.run(main())