Files
reporty/PSA/01 PSA.py
T
2026-04-08 20:56:26 +02:00

266 lines
8.2 KiB
Python

import firebirdsql as fb,os
import pandas as pd
# TCP to the Firebird 2.5 server. Use the DB path as seen by the *server* (Windows path).
conn = fb.connect(
host="localhost",
port=3050,
database=r"c:\Medicus 3\data\MEDICUS.FDB", # local test Medicus
user="SYSDBA",
password="masterkey",
charset="WIN1250", # adjust if needed
)
# Tiny helper to fetch directly into DataFrame (avoids the pandas/SQLAlchemy warning)
def query_df(sql, params=None):
cur = conn.cursor()
cur.execute(sql, params or ())
rows = cur.fetchall()
cols = [d[0].strip() for d in cur.description] # Firebird pads column names
return pd.DataFrame(rows, columns=cols)
# Smoke test
print(query_df("SELECT 1 AS ONE FROM RDB$DATABASE"))
# Your table
df = query_df("SELECT FIRST 100 * FROM kar")
print(df)
from datetime import datetime
sql = """
SELECT
/*vh.idvh,*/
vh.idpacient,
kar.prijmeni,
kar.jmeno,
kar.rodcis,
vh.datum,
/*vh.idhodn,*/
/*vd.poradi,*/
/*vd.idmetod,*/
(
SELECT MAX(dd.datose)
FROM dokladd dd
WHERE dd.rodcis = kar.rodcis
AND dd.kod = '01130'
AND dd.datose < vh.datum
) AS minule,
(
SELECT LIST(CAST(dd.datose AS VARCHAR(10)) || ' ' || dd.kod, ', ')
FROM dokladd dd
WHERE dd.rodcis = kar.rodcis
AND (dd.kod = '01130' OR dd.kod = '01131' OR dd.kod = '01132' OR dd.kod = '01133' OR dd.kod = '01134')
AND dd.datose BETWEEN vh.datum - 7 AND vh.datum + 7
) AS vykodovano,
lm.kodtext,
lm.nazev,
vd.vysl,
lj.jedn,
ls.normdol,
ls.normhor
FROM labvh vh
JOIN labvd vd ON vd.idvh = vh.idvh
JOIN kar ON kar.idpac = vh.idpacient
JOIN labmetod lm ON lm.idmetod = vd.idmetod
JOIN labjedn lj ON lj.idjedn = vd.idjedn
JOIN labskaly ls ON ls.idskaly = vd.idskaly
WHERE lm.nazev CONTAINING 'PSA'
/*ORDER BY kar.idpac, vh.datum, vd.poradi;*/
ORDER BY vh.datum desc;
"""
df_direct = query_df(sql)
import re
import numpy as np
# --- MINULE: expand with ", další XXXXX" based on codes billed around that date ---
df_dokladd = query_df("""
SELECT rodcis, datose, kod FROM dokladd
WHERE kod = '01131' OR kod = '01132' OR kod = '01133'
""")
df_dokladd['DATOSE'] = pd.to_datetime(df_dokladd['DATOSE'])
def compute_minule_str(row):
minule = row['MINULE']
if minule is None or (isinstance(minule, float) and np.isnan(minule)):
return None
minule_ts = pd.Timestamp(minule)
rodcis = row['RODCIS']
mask = (
(df_dokladd['RODCIS'] == rodcis) &
(df_dokladd['DATOSE'] >= minule_ts - pd.Timedelta(days=7)) &
(df_dokladd['DATOSE'] <= minule_ts + pd.Timedelta(days=7))
)
codes = df_dokladd.loc[mask, 'KOD'].tolist()
if '01133' in codes:
dalsi_str = 'NIKDY'
elif '01131' in codes:
dalsi_str = (minule_ts + pd.DateOffset(years=4)).strftime('%Y-%m-%d')
elif '01132' in codes:
dalsi_str = (minule_ts + pd.DateOffset(years=2)).strftime('%Y-%m-%d')
else:
dalsi_str = ''
date_str = minule_ts.strftime('%Y-%m-%d')
return f"{date_str}, další {dalsi_str}" if dalsi_str else date_str
df_direct['MINULE'] = df_direct.apply(compute_minule_str, axis=1)
# --- DALŠÍ: next PSA billing date based on codes in VYKODOVANO ---
def compute_dalsi(row):
vykod = str(row['VYKODOVANO'] or '')
datum = row['DATUM']
if '01133' in vykod:
return 'NIKDY'
if '01131' in vykod:
return (pd.Timestamp(datum) + pd.DateOffset(years=4)).strftime('%Y-%m-%d')
if '01132' in vykod:
return (pd.Timestamp(datum) + pd.DateOffset(years=2)).strftime('%Y-%m-%d')
return None
df_direct['DALŠÍ'] = df_direct.apply(compute_dalsi, axis=1)
# Reorder: DALŠÍ immediately after VYKODOVANO
cols = list(df_direct.columns)
cols.remove('DALŠÍ')
cols.insert(cols.index('VYKODOVANO') + 1, 'DALŠÍ')
df_direct = df_direct[cols]
# --- 0) Helper: parse numeric value from string like "5,6", "<0.1", "3.2 mmol/L" ---
num_re = re.compile(r'[-+]?\d+(?:[.,]\d+)?(?:[eE][-+]?\d+)?')
def to_num(x):
if x is None:
return np.nan
s = str(x).strip()
if not s:
return np.nan
m = num_re.search(s.replace('\u00A0', ' ')) # remove NBSP if any
if not m:
return np.nan
val_str = m.group(0).replace(',', '.')
try:
val = float(val_str)
except ValueError:
return np.nan
# Heuristic for qualifiers:
# "<x" -> take half of x (below detection limit), ">x" -> take x (at least)
if s.lstrip().startswith('<'):
return val * 0.5
if s.lstrip().startswith('>'):
return val
return val
# --- 1) Prepare numeric columns + ratio in pandas before export ---
# Assumes df_direct exists with columns 'VYSL' and 'NORMHOR' (case per your SELECT)
df_direct["VYSL_NUM"] = df_direct["VYSL"].apply(to_num)
df_direct["NORMHOR_NUM"] = df_direct["NORMHOR"].apply(to_num)
# Avoid division by zero/NaN
den = df_direct["NORMHOR_NUM"].replace(0, np.nan)
df_direct["RATIO"] = (df_direct["VYSL_NUM"] / den).clip(lower=0) # can exceed 1 if over ULN
from datetime import datetime
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
from openpyxl.styles import Alignment, Border, Side
from openpyxl.formatting.rule import ColorScaleRule
from openpyxl.styles import PatternFill
from openpyxl.formatting.rule import FormulaRule
base_path = Path(r"u:\Dropbox\!!!Days\Downloads Z230")
base_path.mkdir(parents=True, exist_ok=True)
# ================= DELETE OLD PSA REPORTS ==================
for fname in os.listdir(base_path):
if fname.endswith("PSA report.xlsx"):
try:
os.remove(base_path / fname)
print(f"🗑️ Deleted old PSA report: {fname}")
except Exception as e:
print(f"⚠️ Could not delete {fname}: {e}")
# ================= CREATE NEW FILENAME ==================
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
output_file = base_path / f"{timestamp} PSA report.xlsx"
print(f"📄 New PSA report will be saved as: {output_file}")
# ---- 2) Export DataFrame to Excel ----
# Assumes df_direct already exists (your joined query result)
df_direct.to_excel(output_file, index=False, sheet_name="PSA")
# ---- 3) Open with openpyxl for formatting ----
wb = load_workbook(output_file)
ws = wb["PSA"]
# Auto width for columns
for col in ws.columns:
max_len = 0
col_letter = get_column_letter(col[0].column)
for cell in col:
try:
if cell.value is not None:
max_len = max(max_len, len(str(cell.value)))
except Exception:
pass
ws.column_dimensions[col_letter].width = min(max_len + 2, 50) # cap width
# Thin border style
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin"),
)
# Apply borders to all cells and center A, B, E
for row in ws.iter_rows(min_row=1, max_row=ws.max_row, min_col=1, max_col=ws.max_column):
for cell in row:
cell.border = thin_border
if cell.column_letter in ["A", "B", "E"]:
cell.alignment = Alignment(horizontal="center")
# Enable filter on header row and freeze it
ws.auto_filter.ref = ws.dimensions
ws.freeze_panes = "A2"
# map headers
hdr = {c.value: i+1 for i, c in enumerate(ws[1])}
vysl_idx = hdr.get("VYSL")
ratio_idx = hdr.get("RATIO")
if not (vysl_idx and ratio_idx):
raise RuntimeError("Missing required columns: VYSL and/or RATIO")
vysl_col = get_column_letter(vysl_idx)
ratio_col = get_column_letter(ratio_idx)
max_row = ws.max_row
rng_vysl = f"{vysl_col}2:{vysl_col}{max_row}"
green = PatternFill(start_color="63BE7B", end_color="63BE7B", fill_type="solid")
yellow = PatternFill(start_color="FFEB84", end_color="FFEB84", fill_type="solid")
red = PatternFill(start_color="F8696B", end_color="F8696B", fill_type="solid")
# Non-overlapping rules; stop when one matches
ws.conditional_formatting.add(
rng_vysl,
FormulaRule(formula=[f"${ratio_col}2<=0.80"], fill=green, stopIfTrue=True)
)
ws.conditional_formatting.add(
rng_vysl,
FormulaRule(formula=[f"AND(${ratio_col}2>0.80, ${ratio_col}2<1)"], fill=yellow, stopIfTrue=True)
)
ws.conditional_formatting.add(
rng_vysl,
FormulaRule(formula=[f"${ratio_col}2>=1"], fill=red, stopIfTrue=True)
)
wb.save(output_file)
print(f"Saved: {output_file}")