Compare commits

..

10 Commits

Author SHA1 Message Date
6d4b3bf6d7 Merge branch 'master' of ssh://192.168.1.76:2222/administrator/insurance 2025-12-15 05:58:33 +01:00
d026e5d783 Add .gitignore 2025-12-15 05:53:32 +01:00
33cdd2d0e2 Add proper .gitignore 2025-12-14 21:33:05 +01:00
3fa7996580 z230 2025-12-11 17:41:06 +01:00
3758a423eb notebookVB 2025-12-10 07:41:35 +01:00
7bc330beba vbnotebook 2025-11-18 07:22:17 +01:00
a764c9723e vbnotebook 2025-11-18 06:57:52 +01:00
michaela.buzalkova
746963157a sestra 2025-10-07 09:06:43 +02:00
70f54d1da5 z230 2025-10-06 16:28:48 +02:00
7fbeba5a1d z230 2025-10-03 15:49:08 +02:00
25 changed files with 1469 additions and 5 deletions

14
.gitignore vendored Normal file
View File

@@ -0,0 +1,14 @@
# Virtual environment
.venv/
# Python
__pycache__/
*.pyc
*.log
# IDE
.idea/
# OS
.DS_Store
Thumbs.db

3
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

10
.idea/Torrents.iml generated Normal file
View File

@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.12 (Torrents)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/Torrents.iml" filepath="$PROJECT_DIR$/.idea/Torrents.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@@ -245,18 +245,17 @@ def main():
"Content-Type": "text/xml; charset=utf-8", "Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process", # Oracle composite usually expects this "SOAPAction": "process", # Oracle composite usually expects this
} }
print(envelope)
# Call service # Call service
resp = session.post(ENDPOINT, data=envelope.encode("utf-8"), resp = session.post(ENDPOINT, data=envelope.encode("utf-8"),
headers=headers, timeout=30, verify=VERIFY) headers=headers, timeout=30, verify=VERIFY)
print("HTTP:", resp.status_code) print("HTTP:", resp.status_code)
# (Optional) Uncomment to see raw XML # (Optional) Uncomment to see raw XML
print(resp.text) # print(resp.text)
# Parse and save # Parse and save
rows, stav = parse_registrace(resp.text) rows, stav = parse_registrace(resp.text)
print(rows,stav)
upsert_rows(RC, K_DATU, rows, stav, resp.text) upsert_rows(RC, K_DATU, rows, stav, resp.text)
time.sleep(random.uniform(1, 5)) time.sleep(random.uniform(1, 5))

308
02.2 Testík.py Normal file
View File

@@ -0,0 +1,308 @@
#kód bude vkládat i řádky pro pacienty bez registrovaného lékař v oboru 001#!/usr/bin/env python3 -*- coding: utf-8 -*- """ Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL), parse the response, upsert rows into medevio.vzp_registrace, and print what is being saved. """ # pip install requests_pkcs12 pymysql from requests_pkcs12 import Pkcs12Adapter import requests import xml.etree.ElementTree as ET from datetime import date import pymysql from pymysql.cursors import DictCursor from pprint import pprint from functions import get_medicus_connection from functions import get_mysql_connection import time, random,socket # ------------------- CONFIG ------------------- ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive PFX_PATH = r"mbcert.pfx" # <-- your .pfx path PFX_PASS = "Vlado7309208104++" # <-- your export password VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem" Patient + query RC = "7309208104" # rodné číslo without slash RC = "280616/091" # rodné
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Call VZP RegistracePojistencePZSB2B for one patient (001 = VPL),
parse the response, upsert rows into medevio.vzp_registrace,
and print what is being saved.
"""
# pip install requests_pkcs12 pymysql
from requests_pkcs12 import Pkcs12Adapter
import requests
import xml.etree.ElementTree as ET
from datetime import date
import pymysql
from pymysql.cursors import DictCursor
from pprint import pprint
from functions import get_medicus_connection
from functions import get_mysql_connection
import time, random,socket
# ------------------- CONFIG -------------------
ENDPOINT = "https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/RegistracePojistencePZSB2B" # case-sensitive
PFX_PATH = r"mbcert.pfx" # <-- your .pfx path
PFX_PASS = "Vlado7309208104++" # <-- your export password
VERIFY = True # or path to CA PEM, e.g. r"C:\certs\vzp_ca.pem"
# Patient + query
RC = "7309208104" # rodné číslo without slash
# RC = "280616/091" # rodné číslo without slash
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"] # VPL (adult GP)
# MySQL
if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"):
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
elif socket.gethostname().strip() == "SESTRA":
MYSQL_CFG = dict(
host="127.0.0.1",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=True, # or False if you prefer manual commit
)
# Namespaces (from your response/WSDL)
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"rp": "http://xmlns.gemsystem.cz/B2B/RegistracePojistencePZSB2B/1",
}
# ------------------- HELPERS -------------------
def normalize_rc(rc: str) -> str:
return rc.replace("/", "").strip()
def build_envelope(rc: str, k_datu: str, odb_list: list[str]) -> str:
odb_xml = "".join([f"<ns1:kodOdbornosti>{kod}</ns1:kodOdbornosti>" for kod in odb_list])
inner = f"""
<ns1:registracePojistencePZSB2B xmlns:ns1="{NS['rp']}">
<ns1:cisloPojistence>{rc}</ns1:cisloPojistence>
<ns1:kDatu>{k_datu}</ns1:kDatu>
<ns1:seznamOdbornosti>
{odb_xml}
</ns1:seznamOdbornosti>
</ns1:registracePojistencePZSB2B>""".strip()
return f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="{NS['soap']}">
<soap:Body>
{inner}
</soap:Body>
</soap:Envelope>"""
def parse_registrace(xml_text: str):
"""
Return (rows, stav_vyrizeni) where rows is a list of dicts
for each <odbornost> item in the response.
"""
root = ET.fromstring(xml_text)
items = root.findall(".//rp:seznamOdbornosti/rp:odbornost", NS)
out = []
for it in items:
def g(tag, ctx=it):
el = ctx.find(f"rp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
poj = it.find("rp:zdravotniPojistovna", NS)
poj_kod = poj.find("rp:kod", NS).text.strip() if (poj is not None and poj.find("rp:kod", NS) is not None) else None
poj_zkr = poj.find("rp:zkratka", NS).text.strip() if (poj is not None and poj.find("rp:zkratka", NS) is not None) else None
spec = it.find("rp:odbornost", NS)
odb_kod = spec.find("rp:kod", NS).text.strip() if (spec is not None and spec.find("rp:kod", NS) is not None) else None
odb_naz = spec.find("rp:nazev", NS).text.strip() if (spec is not None and spec.find("rp:nazev", NS) is not None) else None
out.append(dict(
icz=g("ICZ"),
icp=g("ICP"),
nazev_icp=g("nazevICP"),
nazev_szz=g("nazevSZZ"),
poj_kod=poj_kod,
poj_zkratka=poj_zkr,
odbornost_kod=odb_kod,
odbornost_nazev=odb_naz,
datum_registrace=g("datumRegistrace"),
datum_zahajeni=g("datumZahajeni"),
datum_ukonceni=g("datumUkonceni"),
))
st = root.find(".//rp:stavVyrizeniPozadavku", NS)
stav_vyrizeni = st.text.strip() if (st is not None and st.text) else None
return out, stav_vyrizeni
def upsert_rows(
rc: str,
query_date: str,
rows: list[dict],
stav_vyrizeni: str,
xml_text: str,
requested_odb_list: list[str] | None = None,
) -> int:
"""
Insert/update medevio.vzp_registrace.
If no <odbornost> items are returned, insert placeholder row(s)
for each requested specialty (e.g., "001") so the negative result is visible.
"""
sql = """
INSERT INTO vzp_registrace
(rc, query_date, odbornost_kod, odbornost_nazev,
icz, icp, nazev_icp, nazev_szz,
poj_kod, poj_zkratka,
datum_registrace, datum_zahajeni, datum_ukonceni,
stav_vyrizeni, response_xml)
VALUES
(%(rc)s, %(query_date)s, %(odbornost_kod)s, %(odbornost_nazev)s,
%(icz)s, %(icp)s, %(nazev_icp)s, %(nazev_szz)s,
%(poj_kod)s, %(poj_zkratka)s,
%(datum_registrace)s, %(datum_zahajeni)s, %(datum_ukonceni)s,
%(stav_vyrizeni)s, %(response_xml)s)
ON DUPLICATE KEY UPDATE
odbornost_nazev = VALUES(odbornost_nazev),
nazev_icp = VALUES(nazev_icp),
nazev_szz = VALUES(nazev_szz),
poj_kod = VALUES(poj_kod),
poj_zkratka = VALUES(poj_zkratka),
datum_registrace= VALUES(datum_registrace),
datum_zahajeni = VALUES(datum_zahajeni),
datum_ukonceni = VALUES(datum_ukonceni),
stav_vyrizeni = VALUES(stav_vyrizeni),
response_xml = VALUES(response_xml),
updated_at = CURRENT_TIMESTAMP
"""
rc_norm = normalize_rc(rc)
qd = query_date or date.today().isoformat()
payloads: list[dict] = []
if rows:
# Positive path: save what came from the API
for r in rows:
payloads.append({
"rc": rc_norm,
"query_date": qd,
**r,
"stav_vyrizeni": stav_vyrizeni,
"response_xml": xml_text,
})
else:
# Negative path: no registration items -> create placeholders
if not requested_odb_list:
requested_odb_list = ["001"]
for kod in requested_odb_list:
payloads.append({
"rc": rc_norm,
"query_date": qd,
"odbornost_kod": kod,
"odbornost_nazev": None,
"icz": None,
"icp": None,
"nazev_icp": None,
"nazev_szz": None,
"poj_kod": None,
"poj_zkratka": None,
"datum_registrace": None,
"datum_zahajeni": None,
"datum_ukonceni": None,
# Keep what VZP said (e.g., 'X'), and raw XML for audit
"stav_vyrizeni": stav_vyrizeni,
"response_xml": xml_text,
})
# Print what we're going to save
print("\n=== Will save the following payload(s) to medevio.vzp_registrace ===")
for p in payloads:
pprint(p)
if not payloads:
print("No payloads prepared (this should not happen).")
return 0
connsql = get_mysql_connection()
try:
with connsql.cursor() as cur:
cur.executemany(sql, payloads)
connsql.commit()
finally:
connsql.close()
print(f"\nUpserted rows: {len(payloads)}")
return len(payloads)
def prepare_processed_rcs():
consql=get_mysql_connection()
cursql=consql.cursor()
sql="""
WITH ranked AS (
SELECT
vreg.*,
ROW_NUMBER() OVER (
PARTITION BY rc
ORDER BY query_date DESC
) AS rn
FROM vzp_registrace AS vreg
)
SELECT rc
FROM ranked
WHERE rn = 1
"""
cursql.execute(sql)
rows=cursql.fetchall()
print(f"Pocet jiz zpracovanych rodnych cisel v MYSQL MEDEVIO je {len(rows)}")
rc_set_vzp = {row["rc"] for row in rows}
return (rc_set_vzp)
# ------------------- MAIN FLOW -------------------
def main():
con = get_medicus_connection()
cur = con.cursor()
cur.execute("select rodcis, prijmeni, jmeno from kar where rodcis starting with '1'")
# cur.execute("select first 2 rodcis, prijmeni, jmeno from kar where rodcis starting with '0'")
# Vytvor seznam rodnych cisel, která už máme
rc_set_vzp = prepare_processed_rcs()
rows = cur.fetchall()
print(f"Pocet vybranych radku z tabulky KAR je: {len(rows)}")
for row in rows:
if row[0] in rc_set_vzp:
continue
else:
print(row[0], row[1], row[2])
K_DATU = date.today().isoformat() # YYYY-MM-DD
ODBORNOSTI = ["001"]
RC=row[0]
# Build SOAP envelope
envelope = build_envelope(RC, K_DATU, ODBORNOSTI)
# mTLS session
session = requests.Session()
session.mount("https://", Pkcs12Adapter(pkcs12_filename=PFX_PATH, pkcs12_password=PFX_PASS))
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process", # Oracle composite usually expects this
}
print(envelope)
# Call service
resp = session.post(ENDPOINT, data=envelope.encode("utf-8"),
headers=headers, timeout=30, verify=VERIFY)
print("HTTP:", resp.status_code)
# (Optional) Uncomment to see raw XML
print(resp.text)
# Parse and save
rows, stav = parse_registrace(resp.text)
print(rows,stav)
upsert_rows(RC, K_DATU, rows, stav, resp.text, requested_odb_list=ODBORNOSTI)
time.sleep(random.uniform(1, 5))
if __name__ == "__main__":
main()

56
10 LoginOnce.py Normal file
View File

@@ -0,0 +1,56 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from pathlib import Path
from playwright.sync_api import sync_playwright
COOKIE_FILE = Path("sktorrent_cookies.json")
LOGIN_URL = "https://sktorrent.eu/torrent/torrents_v2.php?active=0"
def save_login_cookies(context):
"""Save only uid + pass cookies."""
cookies = context.cookies()
login_cookies = [
c for c in cookies
if c["domain"] == "sktorrent.eu" and c["name"] in ("uid", "pass")
]
with open(COOKIE_FILE, "w") as f:
json.dump(login_cookies, f, indent=2)
print("✅ Login cookies saved to", COOKIE_FILE)
def load_cookies(context):
"""Load saved cookies if available."""
if COOKIE_FILE.exists():
with open(COOKIE_FILE, "r") as f:
cookies = json.load(f)
context.add_cookies(cookies)
print("🔄 Loaded saved cookies.")
return True
return False
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
cookies_loaded = load_cookies(context)
page = context.new_page()
page.goto(LOGIN_URL)
# Check if we are already logged in
if page.locator('input[name="uid"]').count() == 0:
print("✅ Already logged in using cookies!")
else:
print("\n➡️ Please log in manually in the opened browser.")
print("➡️ Once logged in and you see your account page, press ENTER here.\n")
input("Press ENTER when finished... ")
save_login_cookies(context)
print("🎉 Done!")
page.wait_for_timeout(3000)

BIN
10 Tests/MBcert.pfx Normal file

Binary file not shown.

View File

@@ -0,0 +1,67 @@
import pandas as pd
import fdb
import pymysql
# ---------------------------------
# FIREBIRD CONNECTION
# ---------------------------------
fb = fdb.connect(
host="192.168.1.4",
database=r"z:\Medicus 3\data\MEDICUS.FDB",
user="SYSDBA",
password="masterkey",
charset="WIN1250"
)
cur = fb.cursor()
sql_fb = """
SELECT kar.rodcis
FROM registr
JOIN kar ON registr.idpac = kar.idpac
WHERE registr.datum_zruseni IS NULL
AND registr.priznak IN ('A','D','V')
"""
cur.execute(sql_fb)
rows_fb = cur.fetchall()
df_fb = pd.DataFrame(rows_fb, columns=["rc"])
print("FB count:", len(df_fb))
# ---------------------------------
# MYSQL CONNECTION
# ---------------------------------
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4"
)
sql_mysql = """
SELECT rc
FROM vzp_stav_pojisteni AS v
WHERE v.k_datu = CURDATE()
AND v.id = (
SELECT MAX(id)
FROM vzp_stav_pojisteni
WHERE rc = v.rc
AND k_datu = CURDATE()
);
"""
df_mysql = pd.read_sql(sql_mysql, mysql)
print("MySQL count:", len(df_mysql))
# ---------------------------------
# FIND MISSING RC
# ---------------------------------
df_missing = df_fb[~df_fb["rc"].isin(df_mysql["rc"])]
print("\nMissing patients:")
print(df_missing)
fb.close()
mysql.close()

42
10 Tests/medicus_db.py Normal file
View File

@@ -0,0 +1,42 @@
import fdb
class MedicusDB:
def __init__(self, host, db_path, user="SYSDBA", password="masterkey", charset="WIN1250"):
self.conn = fdb.connect(
host=host,
database=db_path,
user=user,
password=password,
charset=charset
)
self.cur = self.conn.cursor()
def query(self, sql, params=None):
self.cur.execute(sql, params or ())
return self.cur.fetchall()
def query_dict(self, sql, params=None):
self.cur.execute(sql, params or ())
cols = [d[0].strip().lower() for d in self.cur.description]
return [dict(zip(cols, row)) for row in self.cur.fetchall()]
def get_active_registered_patients(self):
sql = """
SELECT
kar.rodcis,
kar.prijmeni,
kar.jmeno,
kar.poj
FROM registr
JOIN kar ON registr.idpac = kar.idpac
WHERE registr.datum_zruseni IS NULL
AND registr.priznak IN ('A','D','V')
AND kar.rodcis IS NOT NULL
AND kar.rodcis <> ''
"""
return self.query(sql) # or self.query_dict(sql)
def close(self):
self.conn.close()

79
10 Tests/rozdíl.py Normal file
View File

@@ -0,0 +1,79 @@
import pandas as pd
import pymysql
from medicus_db import MedicusDB
import fdb
# FULL OUTPUT SETTINGS
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 0)
pd.set_option("display.max_colwidth", None)
# ===========
#
# ===========================
# FIREBIRD → načtení registrovaných pacientů
# ======================================
db = MedicusDB("192.168.1.4", r"z:\Medicus 3\data\MEDICUS.FDB")
rows_fb = db.get_active_registered_patients() # vrací rc, prijmeni, jmeno, poj
db.close()
df_fb = pd.DataFrame(rows_fb, columns=["rc", "prijmeni", "jmeno", "poj_medicus"])
df_fb["poj_medicus"] = df_fb["poj_medicus"].astype(str).str.strip()
print("FB count:", len(df_fb))
# ======================================
# MYSQL → načtení dnešních výsledků
# ======================================
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4"
)
sql_mysql = """
SELECT rc,
kod_pojistovny AS poj_mysql,
nazev_pojistovny,
stav,
stav_vyrizeni
FROM vzp_stav_pojisteni AS v
WHERE v.k_datu = CURDATE()
AND v.id = (
SELECT MAX(id)
FROM vzp_stav_pojisteni
WHERE rc = v.rc
AND k_datu = CURDATE()
);
"""
df_mysql = pd.read_sql(sql_mysql, mysql)
df_mysql["poj_mysql"] = df_mysql["poj_mysql"].astype(str).str.strip()
print("MySQL count:", len(df_mysql))
# ======================================
# LEFT JOIN: Medicus ↔ MySQL podle RC
# ======================================
df_merge = df_fb.merge(df_mysql, on="rc", how="left")
# ======================================
# Najít rozdíly pojišťovny
# ======================================
df_diff = df_merge[df_merge["poj_medicus"] != df_merge["poj_mysql"]]
print("\nPacienti s rozdílnou pojišťovnou:")
print(df_diff[["rc", "prijmeni", "jmeno", "poj_medicus", "poj_mysql", "nazev_pojistovny"]])
# Pokud chceš uložit do Excelu:
# df_diff.to_excel("rozdil_pojistoven.xlsx", index=False)

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import logging
from medicus_db import MedicusDB
from vzpb2b_client import VZPB2BClient
import pymysql
from datetime import date
# ==========================================
# LOGGING SETUP
# ==========================================
logging.basicConfig(
filename="insurance_check.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
encoding="utf-8"
)
console = logging.getLogger("console")
console.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
console.addHandler(handler)
def log_info(msg):
logging.info(msg)
console.info(msg)
def log_error(msg):
logging.error(msg)
console.error(msg)
# ==========================================
# MYSQL CONNECTION
# ==========================================
mysql = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
charset="utf8mb4",
autocommit=True
)
def save_insurance_status(mysql_conn, rc, k_datu, result, xml_text):
sql = """
INSERT INTO vzp_stav_pojisteni
(rc, k_datu, stav, kod_pojistovny, nazev_pojistovny,
pojisteni_kod, stav_vyrizeni, response_xml)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
try:
with mysql_conn.cursor() as cur:
cur.execute(sql, (
rc,
k_datu,
result["stav"],
result["kodPojistovny"],
result["nazevPojistovny"],
result["pojisteniKod"],
result["stavVyrizeni"],
xml_text
))
except Exception as e:
log_error(f"❌ MYSQL ERROR for RC {rc}: {e}")
log_error(f"---- RAW XML ----\n{xml_text}\n-----------------")
raise
# ==========================================
# CONFIGURATION
# ==========================================
HOST = "192.168.1.4"
DB_PATH = r"z:\Medicus 3\data\MEDICUS.FDB"
PFX_PATH = r"MBcert.pfx"
PFX_PASSWORD = "Vlado7309208104++"
ENV = "prod"
ICZ = "00000000"
DIC = "00000000"
# ==========================================
# INIT CONNECTIONS
# ==========================================
db = MedicusDB(HOST, DB_PATH)
vzp = VZPB2BClient(ENV, PFX_PATH, PFX_PASSWORD, icz=ICZ, dic=DIC)
# ==========================================
# FETCH REGISTERED PATIENTS
# ==========================================
patients = db.get_active_registered_patients()
log_info(f"Checking {len(patients)} registered patients...\n")
k_datu = date.today().isoformat()
# ==========================================
# LOOP ONE PATIENT PER SECOND
# ==========================================
for rodcis, prijmeni, jmeno, poj in patients:
log_info(f"=== Checking {prijmeni} {jmeno} ({rodcis}) ===")
xml = vzp.stav_pojisteni(rc=rodcis, k_datu=k_datu)
# 1) Check if response looks like XML
if not xml.strip().startswith("<"):
log_error(f"❌ INVALID XML for RC {rodcis}")
log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------")
time.sleep(2)
continue
# 2) Try parsing XML
try:
result = vzp.parse_stav_pojisteni(xml)
except Exception as e:
log_error(f"❌ XML PARSE ERROR for RC {rodcis}: {e}")
log_error(f"---- RAW RESPONSE ----\n{xml}\n----------------------")
time.sleep(2)
continue
log_info(f"Result: {result}")
# 3) Save into MySQL (with logging)
try:
save_insurance_status(mysql, rodcis, k_datu, result, xml)
except Exception:
log_error(f"❌ FAILURE inserting to MySQL for {rodcis}")
continue
time.sleep(2)
db.close()
log_info("\nDONE.")

View File

@@ -0,0 +1,19 @@
cur.execute("select rodcis,prijmeni,jmeno from kar where datum_zruseni is null and kar.vyrazen!='A' and kar.rodcis is not null and idicp!=0 order by ockzaz.datum desc")
from vzpb2b_client import VZPB2BClient
client = VZPB2BClient(
env="production",
pfx_path="mbcert.pfx",
pfx_password="Vlado7309208104++",
icz="00000000",
dic="00000000"
)
response = client.stav_pojisteni("0308020152")
# print(response)
print(client.parse_stav_pojisteni(response))

12
10 Tests/test.py Normal file
View File

@@ -0,0 +1,12 @@
from vzpb2b_client import VZPB2BClient
client = VZPB2BClient(
env="simu", # or "prod"
pfx_path="mbcert.pfx",
pfx_password="Vlado7309208104++",
icz="00000000",
dic="00000000"
)
result_xml = client.over_prukaz_pojistence("80203111194350000001")
print(result_xml)

213
10 Tests/vzpb2b_client.py Normal file
View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from requests_pkcs12 import Pkcs12Adapter
import requests
import uuid
from datetime import date
class VZPB2BClient:
def __init__(self, env: str, pfx_path: str, pfx_password: str,
icz: str = "00000000", dic: str = "00000000"):
# Normalize environment name
env = env.lower().strip()
if env in ("prod", "production", "live", "real"):
self.env = "prod"
elif env in ("simu", "simulace", "test", "testing"):
self.env = "simu"
else:
raise ValueError(f"Unknown environment '{env}'. Use 'simu' or 'prod'.")
self.pfx_path = pfx_path
self.pfx_password = pfx_password
self.icz = icz
self.dic = dic
# Prepare mTLS session
session = requests.Session()
session.mount(
"https://",
Pkcs12Adapter(pkcs12_filename=pfx_path, pkcs12_password=pfx_password)
)
self.session = session
# --------------------------------------------------------------
# URL BUILDER
# --------------------------------------------------------------
def _build_endpoint(self, service_name: str) -> str:
"""
SIMU:
https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/SIMU<Service>?sluzba=SIMU<Service>
PROD:
https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/<Service>
"""
if self.env == "simu":
simu_service = f"SIMU{service_name}"
return (
f"https://simu.b2b.vzp.cz/B2BProxy/HttpProxy/"
f"{simu_service}?sluzba={simu_service}"
)
# Production
return (
f"https://prod.b2b.vzp.cz/B2BProxy/HttpProxy/{service_name}"
)
# --------------------------------------------------------------
# SOAP HEADER BUILDER
# --------------------------------------------------------------
def _header(self) -> str:
idZpravy = uuid.uuid4().hex[:12] # must be alphanumeric, max 12 chars
return f"""
<com:idZpravy>{idZpravy}</com:idZpravy>
<com:idSubjektu>
<com:icz>{self.icz}</com:icz>
<com:dic>{self.dic}</com:dic>
</com:idSubjektu>
"""
# --------------------------------------------------------------
# OVERPRUKAZ — EHIC CHECK
# --------------------------------------------------------------
def over_prukaz_pojistence(self, cislo_prukazu: str, k_datu: str = None) -> str:
"""
Calls OverPrukazPojistenceB2B (SIMU or PROD depending on env).
Returns raw XML string.
"""
service = "OverPrukazPojistenceB2B"
endpoint = self._build_endpoint(service)
if not k_datu:
k_datu = date.today().isoformat()
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:vzp="http://xmlns.gemsystem.cz/OverPrukazPojistenceB2B"
xmlns:com="http://xmlns.gemsystem.cz/CommonB2B">
<soap:Header>
{self._header()}
</soap:Header>
<soap:Body>
<vzp:OverPrukazPojistenceB2BPozadavek>
<vzp:cisloPrukazu>{cislo_prukazu}</vzp:cisloPrukazu>
<vzp:kDatu>{k_datu}</vzp:kDatu>
</vzp:OverPrukazPojistenceB2BPozadavek>
</soap:Body>
</soap:Envelope>
"""
headers = {"Content-Type": "text/xml; charset=utf-8"}
print(f"Calling: {endpoint}")
response = self.session.post(
endpoint,
data=soap.encode("utf-8"),
headers=headers,
timeout=30
)
print("HTTP:", response.status_code)
return response.text
def stav_pojisteni(self, rc: str, k_datu: str = None, prijmeni: str = None):
"""
Calls stavPojisteniB2B (SIMU or PROD).
"""
service = "stavPojisteniB2B"
endpoint = self._build_endpoint(service)
if not k_datu:
k_datu = date.today().isoformat()
prijmeni_xml = f"<vzp:prijmeni>{prijmeni}</vzp:prijmeni>" if prijmeni else ""
soap = f"""<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:vzp="http://xmlns.gemsystem.cz/stavPojisteniB2B"
xmlns:com="http://xmlns.gemsystem.cz/CommonB2B">
<soap:Header>
{self._header()}
</soap:Header>
<soap:Body>
<vzp:stavPojisteniB2B>
<vzp:cisloPojistence>{rc}</vzp:cisloPojistence>
{prijmeni_xml}
<vzp:kDatu>{k_datu}</vzp:kDatu>
</vzp:stavPojisteniB2B>
</soap:Body>
</soap:Envelope>
"""
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "process"
}
print(f"Calling: {endpoint}")
resp = self.session.post(endpoint, data=soap.encode("utf-8"),
headers=headers, timeout=30)
print("HTTP:", resp.status_code)
return resp.text
def parse_stav_pojisteni(self, xml_text: str):
"""
Parses stavPojisteniB2B SOAP response into a Python dict.
Returned structure:
{
"stavVyrizeni": int,
"stav": str | None,
"kodPojistovny": str | None,
"nazevPojistovny": str | None,
"pojisteniKod": str | None
}
"""
import xml.etree.ElementTree as ET
NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"vzp": "http://xmlns.gemsystem.cz/stavPojisteniB2B"
}
root = ET.fromstring(xml_text)
# ---- Extract status ----
stav_vyr = root.find(".//vzp:stavVyrizeniPozadavku", NS)
stav_vyr = int(stav_vyr.text.strip()) if stav_vyr is not None else None
# ---- If no stavPojisteni element present (e.g. 0 or some errors) ----
node_stav = root.find(".//vzp:stavPojisteni", NS)
if node_stav is None:
return {
"stavVyrizeni": stav_vyr,
"stav": None,
"kodPojistovny": None,
"nazevPojistovny": None,
"pojisteniKod": None,
}
def get(tag):
el = node_stav.find(f"vzp:{tag}", NS)
return el.text.strip() if el is not None and el.text else None
return {
"stavVyrizeni": stav_vyr,
"stav": get("stav"),
"kodPojistovny": get("kodPojistovny"),
"nazevPojistovny": get("nazevPojistovny"),
"pojisteniKod": get("pojisteniKod"),
}

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from pathlib import Path
from playwright.sync_api import sync_playwright
COOKIE_FILE = Path("sktorrent_cookies.json")
LOGIN_URL = "https://sktorrent.eu/torrent/torrents_v2.php?active=0"
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
# Load saved cookies
if COOKIE_FILE.exists():
with open(COOKIE_FILE, "r") as f:
cookies = json.load(f)
context.add_cookies(cookies)
print("🔄 Loaded cookies.")
else:
print("❌ Cookie file not found. Run the manual login script first.")
exit()
page = context.new_page()
page.goto(LOGIN_URL)
page.wait_for_load_state("networkidle")
# Check if login form is visible
if page.locator('input[name="uid"]').count() == 0:
print("✅ Already logged in using cookies!")
else:
print("❌ Cookies invalid or expired. Please re-login manually to refresh cookies.")
page.wait_for_timeout(3000)

0
30 OpenTextLIsting v5.py Normal file
View File

0
30 OpenTextListing v2.py Normal file
View File

0
30 OpenTextListing v3.py Normal file
View File

0
30 OpenTextListing v4.py Normal file
View File

230
30 OpenTextListing.py Normal file
View File

@@ -0,0 +1,230 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pymysql
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
import time
import re
import urllib.parse as urlparse
from pathlib import Path
import json
# ============================================================
# 1) MySQL CONNECTION
# ============================================================
db = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="torrents",
charset="utf8mb4",
autocommit=True
)
cursor = db.cursor()
# ============================================================
# 2) Selenium setup
# ============================================================
COOKIE_FILE = Path("sktorrent_cookies.json")
URL = "https://sktorrent.eu/torrent/torrents.php?active=0"
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument("--disable-notifications")
chrome_options.add_argument("--disable-popup-blocking")
chrome_options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=chrome_options)
driver.get("https://sktorrent.eu")
# Load cookies
if COOKIE_FILE.exists():
with open(COOKIE_FILE, "r") as f:
cookies = json.load(f)
for c in cookies:
driver.add_cookie(c)
print("🍪 Cookies loaded.")
driver.get(URL)
time.sleep(2)
# Try to close inline popup if present
try:
close_btn = driver.find_element(By.XPATH, "//a[text()='CLOSE X']")
close_btn.click()
print("🧹 Popup closed.")
except:
pass
# ============================================================
# 3) Extract table rows
# ============================================================
rows = driver.find_elements(By.CSS_SELECTOR, "table tr")
print("Total rows found:", len(rows))
real_rows = []
for row in rows:
cells = row.find_elements(By.TAG_NAME, "td")
if len(cells) >= 5: # real torrent rows
real_rows.append(cells)
print("Real data rows:", len(real_rows))
print("")
# ============================================================
# 4) Function to extract all fields from one row
# ============================================================
def parse_row(cells):
# --------------------------
# 1⃣ CATEGORY
# --------------------------
category = cells[0].text.strip()
# --------------------------
# 2⃣ TITLES + DETAILS LINK
# --------------------------
a_tag = cells[1].find_element(By.TAG_NAME, "a")
visible_name = a_tag.text.strip()
full_title = a_tag.get_attribute("title")
details_link = a_tag.get_attribute("href")
# --------------------------
# 3⃣ TORRENT HASH
# --------------------------
parsed = urlparse.urlparse(details_link)
query = urlparse.parse_qs(parsed.query)
# skip rows without ?id=
if "id" not in query:
print("⚠️ Skipping row with no torrent ID →", details_link)
return None
torrent_hash = query["id"][0]
# --------------------------
# 4⃣ TEXT BLOCK (size + date)
# --------------------------
text_block = cells[1].get_attribute("innerText")
text_block_clean = " ".join(text_block.split())
size_match = re.search(r"Velkost ([0-9\.]+ ?[KMG]B)", text_block_clean, re.IGNORECASE)
added_match = re.search(r"Pridany (.+?)(?:\sObrázok|$)", text_block_clean, re.IGNORECASE)
size_pretty = size_match.group(1) if size_match else None
added_pretty = added_match.group(1) if added_match else None
# Convert “18/11/2025 o 07:00” → “2025-11-18 07:00:00”
added_mysql = None
if added_pretty:
added_mysql = re.sub(r" o ", " ", added_pretty)
day, month, year_time = added_mysql.split("/")
year, time_part = year_time.split(" ")
added_mysql = f"{year}-{month}-{day} {time_part}:00"
# --------------------------
# 5⃣ IMAGE PREVIEW
# --------------------------
img_link = None
try:
image_a = cells[1].find_element(By.XPATH, ".//a[contains(text(),'Obrázok')]")
mouseover = image_a.get_attribute("onmouseover")
img_match = re.search(r"src=([^ ]+)", mouseover)
if img_match:
img_link = img_match.group(1).replace("'", "").strip()
if img_link.startswith("//"):
img_link = "https:" + img_link
except:
pass
# --------------------------
# 6⃣ SEEDERS
# --------------------------
seeders_a = cells[3].find_element(By.TAG_NAME, "a")
seeders_number = int(seeders_a.text.strip())
seeders_link = seeders_a.get_attribute("href")
# --------------------------
# 7⃣ LEECHERS
# --------------------------
leechers_a = cells[4].find_element(By.TAG_NAME, "a")
leechers_number = int(leechers_a.text.strip())
leechers_link = leechers_a.get_attribute("href")
# --------------------------
# Return dictionary for MySQL
# --------------------------
return {
"torrent_hash": torrent_hash,
"details_link": details_link,
"category": category,
"title_visible": visible_name,
"title_full": full_title,
"size_pretty": size_pretty,
"added_datetime": added_mysql,
"preview_image": img_link,
"seeders": seeders_number,
"seeders_link": seeders_link,
"leechers": leechers_number,
"leechers_link": leechers_link,
}
# ============================================================
# 5) MySQL INSERT
# ============================================================
insert_sql = """
INSERT INTO torrents (
torrent_hash, details_link, category, title_visible, title_full,
size_pretty, added_datetime, preview_image,
seeders, seeders_link, leechers, leechers_link
) VALUES (
%(torrent_hash)s, %(details_link)s, %(category)s, %(title_visible)s, %(title_full)s,
%(size_pretty)s, %(added_datetime)s, %(preview_image)s,
%(seeders)s, %(seeders_link)s, %(leechers)s, %(leechers_link)s
)
ON DUPLICATE KEY UPDATE
details_link = VALUES(details_link),
category = VALUES(category),
title_visible = VALUES(title_visible),
title_full = VALUES(title_full),
size_pretty = VALUES(size_pretty),
added_datetime = VALUES(added_datetime),
preview_image = VALUES(preview_image),
seeders = VALUES(seeders),
seeders_link = VALUES(seeders_link),
leechers = VALUES(leechers),
leechers_link = VALUES(leechers_link);
"""
# ============================================================
# 6) PROCESS ALL REAL ROWS
# ============================================================
for cells in real_rows:
data = parse_row(cells)
if not data:
continue
print("💾 Saving:", data["title_visible"])
cursor.execute(insert_sql, data)
print("\n✅ DONE — All torrents saved to MySQL.")
driver.quit()

219
40 ParseviaRequests.py Normal file
View File

@@ -0,0 +1,219 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import requests
from bs4 import BeautifulSoup
import pymysql
from datetime import datetime
# ==============================
# CONFIG
# ==============================
BASE_URL = "https://sktorrent.eu/torrent/torrents_v2.php?active=0"
COOKIES_FILE = "sktorrent_cookies.json" # Your exported cookies.txt ( Netscape format )
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
HEADERS = {"User-Agent": USER_AGENT}
DB_CFG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "torrents",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
# ==============================
# COOKIE LOADER
# ==============================
def load_cookies(path):
cookies = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
if line.startswith("#") or "\t" not in line:
continue
parts = line.strip().split("\t")
if len(parts) >= 7:
cookies[parts[5]] = parts[6]
print(f"🍪 Loaded {len(cookies)} cookies.")
return cookies
# ==============================
# MYSQL INSERT
# ==============================
def insert_torrent(db, t):
sql = """
INSERT IGNORE INTO torrents (
category,
title_visible,
title_full,
size_pretty,
added_datetime,
seeders,
seeders_link,
leechers,
leechers_link,
preview_image,
details_link,
torrent_hash
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
with db.cursor() as cur:
cur.execute(sql, (
t["category"],
t["title_visible"],
t["title_full"],
t["size_pretty"],
t["added_datetime"],
t["seeders"],
t["seeders_link"],
t["leechers"],
t["leechers_link"],
t["preview_image"],
t["details_link"],
t["torrent_hash"],
))
db.commit()
# ==============================
# PARSER
# ==============================
def parse_torrent_row(cols):
"""Parse a <tr> with exactly the structure of a torrent row."""
# --- category ---
category = cols[0].get_text(strip=True)
# --- download link (ignore) ---
# second <td> is download.gif
# --- main column ---
main_td = cols[2]
a_title = main_td.find("a", href=re.compile("details.php"))
if not a_title:
return None
title_visible = a_title.get_text(strip=True)
title_full = a_title.get("title", "").strip()
details_link = "https://sktorrent.eu/torrent/" + a_title.get("href")
# Extract torrent hash from ?id=.....
m = re.search(r"id=([A-Fa-f0-9]{40})", a_title.get("href"))
if not m:
return None
torrent_hash = m.group(1)
# Extract size + added date from the text below <br>
text = main_td.get_text(" ", strip=True)
# example: "GR ... Velkost 1.7 GB | Pridany 18/11/2025 o 07:00"
size_match = re.search(r"Velkost ([\d\.]+ ?[GMK]B)", text)
date_match = re.search(r"Pridany (\d{2}/\d{2}/\d{4}) o (\d{2}:\d{2})", text)
size_pretty = size_match.group(1) if size_match else None
added_datetime = None
if date_match:
d, t = date_match.groups()
added_datetime = datetime.strptime(d + " " + t, "%d/%m/%Y %H:%M")
# Extract preview img from onmouseover
img = None
img_a = main_td.find("a", onmouseover=True)
if img_a:
html = img_a.get("onmouseover", "")
m2 = re.search(r"img src=//([^ ]+)", html)
if m2:
img = "https://" + m2.group(1)
# --- seeders ---
seed_a = cols[4].find("a")
seeders = int(seed_a.get_text(strip=True)) if seed_a else 0
seeders_link = "https://sktorrent.eu/torrent/" + seed_a.get("href") if seed_a else None
# --- leechers ---
leech_a = cols[5].find("a")
leechers = int(leech_a.get_text(strip=True)) if leech_a else 0
leechers_link = "https://sktorrent.eu/torrent/" + leech_a.get("href") if leech_a else None
return {
"category": category,
"title_visible": title_visible,
"title_full": title_full,
"size_pretty": size_pretty,
"added_datetime": added_datetime,
"seeders": seeders,
"seeders_link": seeders_link,
"leechers": leechers,
"leechers_link": leechers_link,
"preview_image": img,
"details_link": details_link,
"torrent_hash": torrent_hash,
}
# ==============================
# MAIN
# ==============================
def main():
cookies = load_cookies(COOKIES_FILE)
session = requests.Session()
session.headers.update(HEADERS)
session.cookies.update(cookies)
print("🌍 Downloading HTML...")
r = session.get(BASE_URL, timeout=30)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
tbody = soup.find("tbody")
if not tbody:
print("❌ Could not find <tbody>")
return
rows = tbody.find_all("tr")
print(f"Found {len(rows)} <tr> rows.")
db = pymysql.connect(**DB_CFG)
inserted = 0
skipped = 0
for tr in rows:
cols = tr.find_all("td")
if len(cols) != 7:
continue # ignore header & separator rows
data = parse_torrent_row(cols)
if not data:
skipped += 1
continue
insert_torrent(db, data)
inserted += 1
print(f"✔ Inserted {data['torrent_hash']}")
print(f"\n===== DONE =====")
print(f"Inserted: {inserted}")
print(f"Skipped: {skipped}")
if __name__ == "__main__":
main()

View File

@@ -22,7 +22,7 @@ def get_medicus_connection():
fdb.Connection object on success fdb.Connection object on success
None on failure None on failure
""" """
if socket.gethostname().strip()=="NTBVBHP470G10": if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"):
MEDICUS_CFG = dict( MEDICUS_CFG = dict(
dsn=r"192.168.1.4:z:\medicus 3\data\medicus.fdb", dsn=r"192.168.1.4:z:\medicus 3\data\medicus.fdb",
user="SYSDBA", user="SYSDBA",
@@ -50,7 +50,7 @@ def get_mysql_connection():
""" """
Return a PyMySQL connection or None if the connection fails. Return a PyMySQL connection or None if the connection fails.
""" """
if socket.gethostname().strip() == "NTBVBHP470G10": if socket.gethostname().strip() in ("NTBVBHP470G10","Z230"):
MYSQL_CFG = dict( MYSQL_CFG = dict(
host="192.168.1.76", host="192.168.1.76",
port=3307, port=3307,