200 lines
6.0 KiB
Python
200 lines
6.0 KiB
Python
import firebirdsql as fb
|
|
import pandas as pd
|
|
|
|
# TCP to the Firebird 2.5 server. Use the DB path as seen by the *server* (Windows path).
|
|
conn = fb.connect(
|
|
host="192.168.1.4",
|
|
port=3050,
|
|
database=r"z:\Medicus 3\data\MEDICUS.FDB", # raw string for backslashes
|
|
user="SYSDBA",
|
|
password="masterkey",
|
|
charset="WIN1250", # adjust if needed
|
|
)
|
|
|
|
# Tiny helper to fetch directly into DataFrame (avoids the pandas/SQLAlchemy warning)
|
|
def query_df(sql, params=None):
|
|
cur = conn.cursor()
|
|
cur.execute(sql, params or ())
|
|
rows = cur.fetchall()
|
|
cols = [d[0].strip() for d in cur.description] # Firebird pads column names
|
|
return pd.DataFrame(rows, columns=cols)
|
|
|
|
# Smoke test
|
|
print(query_df("SELECT 1 AS ONE FROM RDB$DATABASE"))
|
|
|
|
# Your table
|
|
df = query_df("SELECT FIRST 100 * FROM kar")
|
|
print(df)
|
|
|
|
|
|
|
|
from datetime import datetime
|
|
start = datetime(2025, 1, 1)
|
|
end = datetime(2026, 1, 1)
|
|
|
|
sql = """
|
|
SELECT
|
|
/*vh.idvh,*/
|
|
vh.idpacient,
|
|
kar.prijmeni,
|
|
kar.jmeno,
|
|
kar.rodcis,
|
|
vh.datum,
|
|
/*vh.idhodn,*/
|
|
/*vd.poradi,*/
|
|
/*vd.idmetod,*/
|
|
/* NEW: list of matching dokladd entries within ±7 days, one cell */
|
|
(
|
|
SELECT LIST(CAST(dd.datose AS VARCHAR(10)) || ' ' || dd.kod, ', ')
|
|
FROM dokladd dd
|
|
WHERE dd.rodcis = kar.rodcis
|
|
AND (dd.kod = '01130' or dd.kod = '01131' OR dd.kod = '01132' OR dd.kod = '01133' OR dd.kod = '01134')
|
|
AND dd.datose BETWEEN vh.datum - 7 AND vh.datum + 7
|
|
) AS vykodovano,
|
|
lm.kodtext,
|
|
lm.nazev,
|
|
vd.vysl,
|
|
lj.jedn,
|
|
ls.normdol,
|
|
ls.normhor
|
|
FROM labvh vh
|
|
JOIN labvd vd ON vd.idvh = vh.idvh
|
|
JOIN kar ON kar.idpac = vh.idpacient
|
|
JOIN labmetod lm ON lm.idmetod = vd.idmetod
|
|
JOIN labjedn lj ON lj.idjedn = vd.idjedn
|
|
JOIN labskaly ls ON ls.idskaly = vd.idskaly
|
|
WHERE vh.datum >= ?
|
|
AND vh.datum < ?
|
|
AND lm.nazev CONTAINING 'PSA'
|
|
ORDER BY kar.idpac, vh.datum, vd.poradi;
|
|
"""
|
|
|
|
df_direct = query_df(sql, (start, end))
|
|
|
|
import re
|
|
import numpy as np
|
|
|
|
# --- 0) Helper: parse numeric value from string like "5,6", "<0.1", "3.2 mmol/L" ---
|
|
num_re = re.compile(r'[-+]?\d+(?:[.,]\d+)?(?:[eE][-+]?\d+)?')
|
|
|
|
def to_num(x):
|
|
if x is None:
|
|
return np.nan
|
|
s = str(x).strip()
|
|
if not s:
|
|
return np.nan
|
|
m = num_re.search(s.replace('\u00A0', ' ')) # remove NBSP if any
|
|
if not m:
|
|
return np.nan
|
|
val_str = m.group(0).replace(',', '.')
|
|
try:
|
|
val = float(val_str)
|
|
except ValueError:
|
|
return np.nan
|
|
# Heuristic for qualifiers:
|
|
# "<x" -> take half of x (below detection limit), ">x" -> take x (at least)
|
|
if s.lstrip().startswith('<'):
|
|
return val * 0.5
|
|
if s.lstrip().startswith('>'):
|
|
return val
|
|
return val
|
|
|
|
# --- 1) Prepare numeric columns + ratio in pandas before export ---
|
|
# Assumes df_direct exists with columns 'VYSL' and 'NORMHOR' (case per your SELECT)
|
|
df_direct["VYSL_NUM"] = df_direct["VYSL"].apply(to_num)
|
|
df_direct["NORMHOR_NUM"] = df_direct["NORMHOR"].apply(to_num)
|
|
|
|
# Avoid division by zero/NaN
|
|
den = df_direct["NORMHOR_NUM"].replace(0, np.nan)
|
|
df_direct["RATIO"] = (df_direct["VYSL_NUM"] / den).clip(lower=0) # can exceed 1 if over ULN
|
|
|
|
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from openpyxl import load_workbook
|
|
from openpyxl.utils import get_column_letter
|
|
from openpyxl.styles import Alignment, Border, Side
|
|
from openpyxl.formatting.rule import ColorScaleRule
|
|
from openpyxl.styles import PatternFill
|
|
from openpyxl.formatting.rule import FormulaRule
|
|
|
|
# ---- 1) Build timestamped output path ----
|
|
base_path = Path("u:\Dropbox\!!!Days\Downloads Z230")
|
|
base_path.mkdir(parents=True, exist_ok=True) # ensure folder exists
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_file = base_path / f"lab_results_2025_{timestamp}.xlsx"
|
|
|
|
# ---- 2) Export DataFrame to Excel ----
|
|
# Assumes df_direct already exists (your joined query result)
|
|
df_direct.to_excel(output_file, index=False, sheet_name="PSA")
|
|
|
|
# ---- 3) Open with openpyxl for formatting ----
|
|
wb = load_workbook(output_file)
|
|
ws = wb["PSA"]
|
|
|
|
# Auto width for columns
|
|
for col in ws.columns:
|
|
max_len = 0
|
|
col_letter = get_column_letter(col[0].column)
|
|
for cell in col:
|
|
try:
|
|
if cell.value is not None:
|
|
max_len = max(max_len, len(str(cell.value)))
|
|
except Exception:
|
|
pass
|
|
ws.column_dimensions[col_letter].width = min(max_len + 2, 50) # cap width
|
|
|
|
# Thin border style
|
|
thin_border = Border(
|
|
left=Side(style="thin"),
|
|
right=Side(style="thin"),
|
|
top=Side(style="thin"),
|
|
bottom=Side(style="thin"),
|
|
)
|
|
|
|
# Apply borders to all cells and center A, B, E
|
|
for row in ws.iter_rows(min_row=1, max_row=ws.max_row, min_col=1, max_col=ws.max_column):
|
|
for cell in row:
|
|
cell.border = thin_border
|
|
if cell.column_letter in ["A", "B", "E"]:
|
|
cell.alignment = Alignment(horizontal="center")
|
|
|
|
# Enable filter on header row and freeze it
|
|
ws.auto_filter.ref = ws.dimensions
|
|
ws.freeze_panes = "A2"
|
|
|
|
|
|
# map headers
|
|
hdr = {c.value: i+1 for i, c in enumerate(ws[1])}
|
|
vysl_idx = hdr.get("VYSL")
|
|
ratio_idx = hdr.get("RATIO")
|
|
if not (vysl_idx and ratio_idx):
|
|
raise RuntimeError("Missing required columns: VYSL and/or RATIO")
|
|
|
|
vysl_col = get_column_letter(vysl_idx)
|
|
ratio_col = get_column_letter(ratio_idx)
|
|
max_row = ws.max_row
|
|
rng_vysl = f"{vysl_col}2:{vysl_col}{max_row}"
|
|
|
|
green = PatternFill(start_color="63BE7B", end_color="63BE7B", fill_type="solid")
|
|
yellow = PatternFill(start_color="FFEB84", end_color="FFEB84", fill_type="solid")
|
|
red = PatternFill(start_color="F8696B", end_color="F8696B", fill_type="solid")
|
|
|
|
# Non-overlapping rules; stop when one matches
|
|
ws.conditional_formatting.add(
|
|
rng_vysl,
|
|
FormulaRule(formula=[f"${ratio_col}2<=0.80"], fill=green, stopIfTrue=True)
|
|
)
|
|
ws.conditional_formatting.add(
|
|
rng_vysl,
|
|
FormulaRule(formula=[f"AND(${ratio_col}2>0.80, ${ratio_col}2<1)"], fill=yellow, stopIfTrue=True)
|
|
)
|
|
ws.conditional_formatting.add(
|
|
rng_vysl,
|
|
FormulaRule(formula=[f"${ratio_col}2>=1"], fill=red, stopIfTrue=True)
|
|
)
|
|
|
|
|
|
wb.save(output_file)
|
|
print(f"Saved: {output_file}")
|