"""Standalone automated setup and runner for the pharmacy client connector.

Run from the pharmacy workstation:

    py PharmacyConnector.py

The script:

1. Verifies required Python packages and installs any missing ones.
2. Checks the existing connector config and its DPAPI-protected secrets.
3. If the connector is not activated, prompts for the SSH/admin setup inputs.
4. Fetches the cloud admin token over SSH, creates an activation code, activates
   the local connector, and saves the config using this script's DPAPI logic.
5. Runs the connector. On Windows, an already-activated connector relaunches
   itself with pythonw.exe so it can run without a visible console window.

Pass --no-run if you only want setup/activation.
"""

from __future__ import annotations

import argparse
import base64
import ctypes
import getpass
import hashlib
import hmac
import importlib
import json
import mimetypes
import os
import re
import socket
import subprocess
import sys
import time
from dataclasses import asdict, dataclass, field
from datetime import date, datetime, timezone
from decimal import Decimal
from html import escape
from pathlib import Path
from types import ModuleType
from typing import Any, Callable, Iterable
from urllib.parse import urlencode, urlparse


CONNECTOR_VERSION = "0.1.0-standalone"
WINDOW_BOOTSTRAP_ENV = "PHARMACY_CONNECTOR_WINDOW_BOOTSTRAPPED"
DEFAULT_CLOUD_URL = "https://www.crimsonscript.com"
DEFAULT_SSH_HOST = "199.19.73.94"
DEFAULT_SSH_USER = "root"
DEFAULT_ADMIN_TOKEN_COMMAND = "grep '^CLOUD_PHARMACY_ADMIN_TOKEN=' /etc/cloud-pharmacy.env | cut -d= -f2-"
DEFAULT_SQL_DRIVER = "ODBC Driver 17 for SQL Server"
DEFAULT_SQL_DATABASE = "PharmSQL"
DEFAULT_SQL_USER = "sa"
CONNECTOR_FETCHMANY_CHUNK_SIZE = max(1, int(os.environ.get("PHARMACY_CONNECTOR_FETCHMANY_CHUNK_SIZE", "1000")))
CONNECTOR_LONG_POLL_SECONDS = max(0, int(os.environ.get("PHARMACY_CONNECTOR_LONG_POLL_SECONDS", "20")))

REQUIRED_IMPORTS = (
    ("requests", "requests"),
    ("pyodbc", "pyodbc"),
    ("win32crypt", "pywin32"),
    ("paramiko", "paramiko"),
    ("playwright.sync_api", "playwright"),
)

JOB_SQL_QUERY = "sql.query"
JOB_SQL_EXECUTE = "sql.execute"
JOB_FILE_FETCH = "file.fetch"
JOB_FILE_RENDER = "file.render"
JOB_FILE_DELETE = "file.delete"
JOB_FILE_WRITE = "file.write"
JOB_CONNECTOR_HEALTH = "connector.health"
JOB_KINRAY_REFRESH = "kinray.refresh"

STATUS_SUCCEEDED = "succeeded"
STATUS_FAILED = "failed"

FORBIDDEN_QUERY_KEYWORDS = {
    "alter",
    "backup",
    "create",
    "delete",
    "drop",
    "exec",
    "execute",
    "grant",
    "insert",
    "merge",
    "restore",
    "revoke",
    "truncate",
    "update",
}
WRITE_KEYWORDS = {"alter", "create", "delete", "drop", "insert", "merge", "truncate", "update"}
TOKEN_RE = re.compile(r"[A-Za-z_][A-Za-z0-9_]*")
KINRAY_LOGIN_URL = "https://kinrayweblink.cardinalhealth.com/login"
KINRAY_REPORTS_URL = "https://kinrayweblink.cardinalhealth.com/tools/report"
KINRAY_NORMAL_CHROME_UA = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    "AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/148.0.0.0 Safari/537.36"
)
KINRAY_DEFAULT_START_DATE = date(2024, 4, 1)
KINRAY_DEFAULT_TIMEOUT_MS = 30_000
KINRAY_REPORT_TIMEOUT_MS = 120_000


@dataclass
class ConfigStatus:
    usable: bool
    reason: str


def default_config_path() -> Path:
    program_data = os.environ.get("PROGRAMDATA")
    if program_data:
        return Path(program_data) / "CloudPharmacyConnector" / "connector.json"
    return Path.home() / ".cloud_pharmacy_connector" / "connector.json"


def default_log_path() -> Path:
    return default_config_path().with_name("connector.log")


def windows_has_console() -> bool:
    if os.name != "nt":
        return True
    try:
        return bool(ctypes.windll.kernel32.GetConsoleWindow())
    except Exception:
        return bool(getattr(sys, "stdout", None) and sys.stdout.isatty())


def configure_background_stdio() -> None:
    if os.name != "nt" or windows_has_console():
        return
    log_path = default_log_path()
    try:
        log_path.parent.mkdir(parents=True, exist_ok=True)
        log_file = open(log_path, "a", encoding="utf-8", buffering=1)
    except OSError:
        fallback_root = Path(os.environ.get("LOCALAPPDATA") or Path.home())
        log_path = fallback_root / "CloudPharmacyConnector" / "connector.log"
        log_path.parent.mkdir(parents=True, exist_ok=True)
        log_file = open(log_path, "a", encoding="utf-8", buffering=1)
    sys.stdout = log_file
    sys.stderr = log_file


def windows_python_executable(want_console: bool) -> str:
    executable = Path(sys.executable)
    target_name = "python.exe" if want_console else "pythonw.exe"
    candidate = executable.with_name(target_name)
    if candidate.exists():
        return str(candidate)
    if not want_console and executable.name.lower() == "python.exe":
        return str(executable)
    return sys.executable


def strip_window_bootstrap_args(argv: list[str]) -> list[str]:
    return [arg for arg in argv if arg not in {"--interactive-setup", "--no-window-bootstrap"}]


def spawn_windows_silent(argv: list[str] | None = None) -> None:
    if os.name != "nt":
        return
    creationflags = getattr(subprocess, "CREATE_NO_WINDOW", 0) | getattr(subprocess, "DETACHED_PROCESS", 0)
    env = os.environ.copy()
    env[WINDOW_BOOTSTRAP_ENV] = "1"
    subprocess.Popen(
        [windows_python_executable(want_console=False), str(Path(__file__).resolve()), *strip_window_bootstrap_args(argv or sys.argv[1:])],
        stdin=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        close_fds=True,
        creationflags=creationflags,
        env=env,
    )


def spawn_windows_interactive_setup(argv: list[str] | None = None) -> None:
    if os.name != "nt":
        return
    creationflags = getattr(subprocess, "CREATE_NEW_CONSOLE", 0)
    env = os.environ.copy()
    env[WINDOW_BOOTSTRAP_ENV] = "1"
    subprocess.Popen(
        [
            windows_python_executable(want_console=True),
            str(Path(__file__).resolve()),
            *strip_window_bootstrap_args(argv or sys.argv[1:]),
            "--interactive-setup",
            "--no-window-bootstrap",
        ],
        creationflags=creationflags,
        env=env,
    )


class ConnectorError(RuntimeError):
    def __init__(self, code: str, message: str) -> None:
        super().__init__(message)
        self.code = code


class KinrayMfaRequired(RuntimeError):
    pass


@dataclass(frozen=True)
class KinrayConfig:
    username: str
    password: str
    start_date: date
    end_date: date
    download_dir: Path
    profile_dir: Path
    auth_state_path: Path
    headless: bool
    timeout_ms: int = KINRAY_DEFAULT_TIMEOUT_MS

    @property
    def output_filename(self) -> str:
        return f"kinray_invoice_detail_{self.start_date.isoformat()}_to_{self.end_date.isoformat()}.zip"


def default_sql_server() -> str:
    return fr"{socket.gethostname()}\SQLEXPRESS"


def default_sql_connection_string(sql_password: str) -> str:
    sql_server = default_sql_server()
    return (
        f"DRIVER={{{DEFAULT_SQL_DRIVER}}};"
        f"SERVER={sql_server};"
        f"DATABASE={DEFAULT_SQL_DATABASE};"
        f"UID={DEFAULT_SQL_USER};"
        f"PWD={sql_password}"
    )


def utc_now() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat()


def canonical_json(data: Any) -> bytes:
    return json.dumps(data, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode("utf-8")


def unsigned_payload(envelope: dict[str, Any]) -> dict[str, Any]:
    payload = dict(envelope)
    payload.pop("server_signature", None)
    return payload


def unsigned_result_payload(result: dict[str, Any]) -> dict[str, Any]:
    payload = dict(result)
    payload.pop("connector_signature", None)
    return payload


def sign_envelope(envelope: dict[str, Any], signing_secret: str) -> str:
    digest = hmac.new(signing_secret.encode("utf-8"), canonical_json(unsigned_payload(envelope)), hashlib.sha256).hexdigest()
    return f"sha256={digest}"


def verify_signature(envelope: dict[str, Any], signing_secret: str) -> bool:
    signature = str(envelope.get("server_signature") or "")
    if not signature:
        return False
    return hmac.compare_digest(signature, sign_envelope(envelope, signing_secret))


def sign_result(result: dict[str, Any], signing_secret: str) -> str:
    digest = hmac.new(signing_secret.encode("utf-8"), canonical_json(unsigned_result_payload(result)), hashlib.sha256).hexdigest()
    return f"sha256={digest}"


def attach_result_signature(result: dict[str, Any], signing_secret: str) -> dict[str, Any]:
    signed = dict(result)
    signed["connector_signature"] = sign_result(signed, signing_secret)
    return signed


def require_https_url(url: str) -> str:
    normalized = str(url).rstrip("/")
    if urlparse(normalized).scheme.lower() != "https":
        raise RuntimeError("Connector cloud URL must use https.")
    return normalized


def sql_hash(sql: str) -> str:
    return hashlib.sha256(sql.encode("utf-8")).hexdigest()


def strip_sql_comments(sql: str) -> str:
    sql = re.sub(r"/\*.*?\*/", " ", sql, flags=re.DOTALL)
    return re.sub(r"--[^\r\n]*", " ", sql)


def normalize_sql(sql: str) -> str:
    return re.sub(r"\s+", " ", strip_sql_comments(sql).strip())


def sql_tokens(sql: str) -> list[str]:
    return [match.group(0).lower() for match in TOKEN_RE.finditer(strip_sql_comments(sql))]


def statement_count(sql: str) -> int:
    cleaned = normalize_sql(sql)
    if not cleaned:
        return 0
    return len([part for part in cleaned.split(";") if part.strip()])


def validate_select_sql(sql: str) -> None:
    cleaned = normalize_sql(sql)
    if not cleaned:
        raise ConnectorError("unsafe_sql_query", "SQL query is empty.")
    if statement_count(cleaned) != 1:
        raise ConnectorError("unsafe_sql_query", "Only one SELECT statement is allowed.")
    tokens = sql_tokens(cleaned)
    if not tokens or tokens[0] not in {"select", "with"}:
        raise ConnectorError("unsafe_sql_query", "sql.query jobs must start with SELECT or WITH.")
    forbidden = FORBIDDEN_QUERY_KEYWORDS.intersection(tokens)
    if forbidden:
        raise ConnectorError("unsafe_sql_query", "sql.query contains forbidden keyword(s): " + ", ".join(sorted(forbidden)))


def validate_execute_sql(sql: str) -> None:
    cleaned = normalize_sql(sql)
    if not cleaned:
        raise ConnectorError("unsafe_sql_execute", "SQL execute text is empty.")
    tokens = set(sql_tokens(cleaned))
    if not WRITE_KEYWORDS.intersection(tokens):
        raise ConnectorError("unsafe_sql_execute", "sql.execute must be a named write operation.")
    if {"exec", "execute"}.intersection(tokens):
        raise ConnectorError("unsafe_sql_execute", "Stored procedure execution is not allowed.")


def parse_iso_date(value: Any, fallback: date) -> date:
    if not value:
        return fallback
    try:
        return date.fromisoformat(str(value)[:10])
    except ValueError:
        return fallback


def kinray_base_dir() -> Path:
    root = os.environ.get("LOCALAPPDATA") or os.environ.get("PROGRAMDATA") or str(Path.home())
    return Path(root) / "CloudPharmacyConnector" / "kinray"


def kinray_legacy_auth_dir() -> Path:
    root = os.environ.get("LOCALAPPDATA") or str(Path.home() / "AppData" / "Local")
    return Path(root) / "KinrayAutomation" / "auth"


def kinray_first_visible(scope: Any, candidates: Iterable[Callable[[], Any] | str], timeout_ms: int = 2_500):
    from playwright.sync_api import Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError

    for candidate in candidates:
        locator = candidate() if callable(candidate) else scope.locator(candidate)
        try:
            if locator.first.wait_for(state="visible", timeout=timeout_ms) is None:
                return locator.first
        except (PlaywrightTimeoutError, PlaywrightError):
            continue
    return None


def kinray_click_first(scope: Any, candidates: Iterable[Callable[[], Any] | str], name: str) -> None:
    locator = kinray_first_visible(scope, candidates)
    if locator is None:
        raise RuntimeError(f"Could not find {name}.")
    locator.click()


def kinray_fill_first(scope: Any, candidates: Iterable[Callable[[], Any] | str], value: str, name: str) -> None:
    locator = kinray_first_visible(scope, candidates)
    if locator is None:
        raise RuntimeError(f"Could not find {name}.")
    locator.fill(value)


def kinray_wait_for_network(page: Any, timeout_ms: int = KINRAY_DEFAULT_TIMEOUT_MS) -> None:
    from playwright.sync_api import TimeoutError as PlaywrightTimeoutError

    try:
        page.wait_for_load_state("domcontentloaded", timeout=timeout_ms)
    except PlaywrightTimeoutError:
        pass


def kinray_wait_for_page_navigation(page: Any, timeout_ms: int = KINRAY_REPORT_TIMEOUT_MS) -> None:
    from playwright.sync_api import Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError

    for attempt in range(1, 4):
        try:
            page.wait_for_load_state("domcontentloaded", timeout=timeout_ms)
            return
        except PlaywrightTimeoutError:
            return
        except PlaywrightError as exc:
            if "navigating" not in str(exc).lower() or attempt == 3:
                raise
            page.wait_for_timeout(2_000)


def kinray_wait_for_auth_redirects(page: Any, timeout_ms: int = 60_000) -> None:
    from playwright.sync_api import TimeoutError as PlaywrightTimeoutError

    try:
        page.wait_for_url(
            lambda url: "/auth-code/callback" not in str(url) and "/signin/" not in str(url),
            timeout=timeout_ms,
        )
    except PlaywrightTimeoutError:
        pass
    kinray_wait_for_network(page, min(timeout_ms, KINRAY_DEFAULT_TIMEOUT_MS))


def kinray_save_auth_state(context: Any, config: KinrayConfig) -> None:
    from playwright.sync_api import Error as PlaywrightError

    config.auth_state_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        context.storage_state(path=config.auth_state_path, indexed_db=True)
    except TypeError:
        context.storage_state(path=config.auth_state_path)
    except PlaywrightError:
        pass


def kinray_safe_goto(page: Any, url: str, timeout_ms: int = KINRAY_REPORT_TIMEOUT_MS) -> None:
    from playwright.sync_api import Error as PlaywrightError

    for attempt in range(1, 4):
        try:
            page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
            return
        except PlaywrightError as exc:
            if "interrupted by another navigation" not in str(exc) or attempt == 3:
                raise
            kinray_wait_for_auth_redirects(page, timeout_ms=30_000)


def kinray_is_login_form_visible(page: Any) -> bool:
    username = kinray_first_visible(
        page,
        [
            lambda: page.get_by_label(re.compile("user|email|login", re.I)),
            "input[name*='user' i]",
            "input[name*='email' i]",
            "input[type='email']",
            "input[type='text']",
        ],
        timeout_ms=1_000,
    )
    password = kinray_first_visible(
        page,
        [
            lambda: page.get_by_label(re.compile("password", re.I)),
            "input[name*='pass' i]",
            "input[type='password']",
        ],
        timeout_ms=1_000,
    )
    return username is not None and password is not None


def kinray_login(page: Any, config: KinrayConfig) -> None:
    if not page.url.startswith("http"):
        kinray_safe_goto(page, KINRAY_LOGIN_URL)
    kinray_wait_for_network(page, config.timeout_ms)
    if not kinray_is_login_form_visible(page):
        return
    kinray_fill_first(
        page,
        [
            lambda: page.get_by_label(re.compile("user|email|login", re.I)),
            "input[name*='user' i]",
            "input[name*='email' i]",
            "input[type='email']",
            "input[type='text']",
        ],
        config.username,
        "username/email field",
    )
    kinray_fill_first(
        page,
        [
            lambda: page.get_by_label(re.compile("password", re.I)),
            "input[name*='pass' i]",
            "input[type='password']",
        ],
        config.password,
        "password field",
    )
    kinray_click_first(
        page,
        [
            lambda: page.get_by_role("button", name=re.compile("sign in|log in|login", re.I)),
            "button[type='submit']",
            "input[type='submit']",
        ],
        "sign-in button",
    )
    kinray_wait_for_network(page, config.timeout_ms)
    kinray_handle_mfa_if_present(page, config)


def kinray_check_remember_device_options(page: Any) -> None:
    from playwright.sync_api import Error as PlaywrightError

    remember_text = re.compile(
        "do not challenge me on this device again|don't challenge me on this device again|"
        "remember this device|keep me signed in|stay signed in|don't ask again|do not ask again",
        re.I,
    )
    for candidate in [
        lambda: page.get_by_role("checkbox", name=remember_text),
        lambda: page.get_by_label(remember_text),
        "input[type='checkbox']",
    ]:
        locator = candidate() if callable(candidate) else page.locator(candidate)
        try:
            count = min(locator.count(), 8)
        except PlaywrightError:
            continue
        for index in range(count):
            option = locator.nth(index)
            try:
                if option.is_visible(timeout=1_000) and not option.is_checked():
                    option.check(force=True)
                    return
            except PlaywrightError:
                continue


def kinray_prompt_mfa_code_in_browser(page: Any, call_button: Any | None, config: KinrayConfig) -> str:
    result = {"code": "", "cancelled": False, "call_requested": False}
    helper = page.context.new_page()

    def request_phone_call() -> str:
        if call_button is None:
            return "No phone call option was found. Enter the verification code shown by Kinray."
        result["call_requested"] = True
        return "Requesting phone call..."

    def submit_code(code: str) -> str:
        result["code"] = str(code or "").strip()
        return "Code received. You can return to the pharmacy page."

    def cancel_mfa() -> str:
        result["cancelled"] = True
        return "Verification cancelled."

    helper.expose_function("requestKinrayPhoneCall", request_phone_call)
    helper.expose_function("submitKinrayCode", submit_code)
    helper.expose_function("cancelKinrayMfa", cancel_mfa)
    call_disabled = "false" if call_button is not None else "true"
    initial_status = (
        "Click Place Call to request the verification phone call."
        if call_button is not None
        else "Enter the verification code shown by Kinray."
    )
    helper.set_content(
        f"""
<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <title>Kinray Verification</title>
  <style>
    body {{ font-family: Segoe UI, Arial, sans-serif; margin: 0; background: #f4f6f8; color: #1f2933; }}
    main {{ max-width: 520px; margin: 48px auto; background: white; border: 1px solid #d9e2ec; border-radius: 8px; padding: 24px; }}
    h1 {{ font-size: 22px; margin: 0 0 12px; }}
    p {{ line-height: 1.45; }}
    label {{ display: block; font-weight: 600; margin: 18px 0 6px; }}
    input {{ box-sizing: border-box; width: 100%; font-size: 22px; padding: 10px 12px; letter-spacing: 2px; }}
    .actions {{ display: flex; gap: 10px; margin-top: 18px; }}
    button {{ border: 0; border-radius: 6px; background: #174ea6; color: white; padding: 10px 14px; font-weight: 600; cursor: pointer; }}
    button.secondary {{ background: #52606d; }}
    button:disabled {{ background: #bcccdc; cursor: not-allowed; }}
    #status {{ background: #eef4ff; border-left: 4px solid #174ea6; padding: 10px 12px; }}
  </style>
</head>
<body>
  <main>
    <h1>Kinray Verification</h1>
    <p id="status">{escape(initial_status)}</p>
    <label for="code">Verification code</label>
    <input id="code" autocomplete="one-time-code" inputmode="numeric" autofocus>
    <div class="actions">
      <button id="call" type="button">Place Call</button>
      <button id="submit" type="button">Submit Code</button>
      <button id="cancel" class="secondary" type="button">Cancel</button>
    </div>
  </main>
  <script>
    const statusEl = document.getElementById("status");
    const codeEl = document.getElementById("code");
    const callEl = document.getElementById("call");
    callEl.disabled = {call_disabled};
    callEl.addEventListener("click", async () => {{
      callEl.disabled = true;
      statusEl.textContent = "Requesting phone call...";
      try {{
        statusEl.textContent = await window.requestKinrayPhoneCall();
      }} catch (err) {{
        callEl.disabled = false;
        statusEl.textContent = "Could not request call: " + err;
      }}
      codeEl.focus();
    }});
    document.getElementById("submit").addEventListener("click", async () => {{
      const code = codeEl.value.trim();
      if (!code) {{
        statusEl.textContent = "Enter the verification code.";
        codeEl.focus();
        return;
      }}
      statusEl.textContent = await window.submitKinrayCode(code);
    }});
    document.getElementById("cancel").addEventListener("click", async () => {{
      statusEl.textContent = await window.cancelKinrayMfa();
    }});
    codeEl.addEventListener("keydown", event => {{
      if (event.key === "Enter") document.getElementById("submit").click();
    }});
  </script>
</body>
</html>
        """,
        wait_until="domcontentloaded",
    )
    helper.bring_to_front()
    try:
        deadline = time.monotonic() + max(300, int(config.timeout_ms / 1000) * 10)
        while time.monotonic() < deadline:
            if result["call_requested"]:
                result["call_requested"] = False
                try:
                    kinray_request_mfa_phone_call(page, call_button, config)
                    helper.evaluate(
                        "message => { document.getElementById('status').textContent = message; document.getElementById('code').focus(); }",
                        "Phone call requested. Enter the verification code from the call.",
                    )
                except Exception as exc:
                    helper.evaluate(
                        "message => { document.getElementById('status').textContent = message; document.getElementById('call').disabled = false; }",
                        f"Could not request call: {exc}",
                    )
            if result["code"]:
                return result["code"]
            if result["cancelled"]:
                return ""
            if helper.is_closed():
                return ""
            helper.wait_for_timeout(250)
        raise RuntimeError("Timed out waiting for Kinray verification code.")
    finally:
        if not helper.is_closed():
            helper.close()


def kinray_request_mfa_phone_call(page: Any, call_button: Any, config: KinrayConfig) -> None:
    call_button.click()
    kinray_wait_for_network(page, config.timeout_ms)


def kinray_handle_mfa_if_present(page: Any, config: KinrayConfig) -> None:
    mfa_marker = kinray_first_visible(
        page,
        [
            lambda: page.get_by_text(re.compile("verification|verify|two.step|2.step|security code|passcode", re.I)),
            "input[name*='code' i]",
            "input[id*='code' i]",
            "input[type='tel']",
        ],
        timeout_ms=7_000,
    )
    if mfa_marker is None:
        return
    if config.headless:
        raise KinrayMfaRequired("Kinray requires two-step verification.")

    call_button = kinray_first_visible(
        page,
        [
            "a.call-request-button",
            "a.link-button:has-text('Call')",
            lambda: page.get_by_role("link", name=re.compile("^\\s*call\\s*$", re.I)),
            lambda: page.get_by_role("button", name=re.compile("^\\s*call\\s*$|phone call|voice call", re.I)),
            "button:has-text('Call')",
            "input[type='button'][value='Call']",
            "input[type='submit'][value='Call']",
        ],
        timeout_ms=3_000,
    )
    phone_option = kinray_first_visible(
        page,
        [
            lambda: page.get_by_role("radio", name=re.compile("call|phone|voice", re.I)),
            lambda: page.get_by_label(re.compile("call|phone|voice", re.I)),
            lambda: page.get_by_text(re.compile("call me|phone call|voice call", re.I)),
        ],
        timeout_ms=3_000,
    )
    if phone_option is not None:
        phone_option.click()
        call_button = kinray_first_visible(
            page,
            [
                "a.call-request-button",
                "a.link-button:has-text('Call')",
                lambda: page.get_by_role("link", name=re.compile("^\\s*call\\s*$", re.I)),
                lambda: page.get_by_role("button", name=re.compile("^\\s*call\\s*$|phone call|voice call", re.I)),
                "button:has-text('Call')",
                "input[type='button'][value='Call']",
                "input[type='submit'][value='Call']",
            ],
            timeout_ms=3_000,
        )

    code = kinray_prompt_mfa_code_in_browser(page, call_button, config)
    if not code:
        raise RuntimeError("Verification code was empty.")
    kinray_fill_first(
        page,
        [
            lambda: page.get_by_label(re.compile("code|passcode|verification", re.I)),
            "input[name*='code' i]",
            "input[id*='code' i]",
            "input[type='tel']",
            "input[type='text']",
        ],
        code,
        "MFA verification code field",
    )
    kinray_check_remember_device_options(page)
    kinray_click_first(
        page,
        [
            lambda: page.get_by_role("button", name=re.compile("verify|submit|continue|sign in|next", re.I)),
            "button[type='submit']",
            "input[type='submit']",
        ],
        "MFA submit button",
    )
    kinray_wait_for_auth_redirects(page)


def kinray_navigate_to_reports(page: Any, config: KinrayConfig) -> None:
    kinray_wait_for_auth_redirects(page, timeout_ms=30_000)
    kinray_safe_goto(page, KINRAY_REPORTS_URL)
    kinray_wait_for_network(page, config.timeout_ms)
    kinray_handle_mfa_if_present(page, config)
    if kinray_is_login_form_visible(page):
        kinray_login(page, config)
        kinray_safe_goto(page, KINRAY_REPORTS_URL)
        kinray_wait_for_network(page, config.timeout_ms)
        kinray_handle_mfa_if_present(page, config)
    if kinray_first_visible(
        page,
        [
            lambda: page.get_by_text(re.compile("Invoice Detail|report", re.I)),
            lambda: page.get_by_label(re.compile("report", re.I)),
            "select[name*='report' i]",
            "select[id*='report' i]",
        ],
        timeout_ms=15_000,
    ) is None:
        raise RuntimeError("Reports page did not load expected report controls.")


def kinray_month_short(target_date: date) -> str:
    return target_date.strftime("%b").upper()


def kinray_month_full(target_date: date) -> str:
    return target_date.strftime("%B")


def kinray_visible_calendar_years(page: Any) -> list[int]:
    from playwright.sync_api import Error as PlaywrightError

    try:
        values = page.locator(".mat-calendar-body-cell").evaluate_all(
            """cells => cells
                .map(cell => (cell.innerText || '').trim())
                .filter(text => /^\\d{4}$/.test(text))
                .map(text => Number(text))"""
        )
        return [int(value) for value in values]
    except PlaywrightError:
        return []


def kinray_click_calendar_cell(page: Any, text_pattern: Any, name: str) -> None:
    from playwright.sync_api import TimeoutError as PlaywrightTimeoutError

    cell = page.locator(".mat-calendar-body-cell").filter(has_text=text_pattern).first
    try:
        cell.wait_for(state="visible", timeout=5_000)
        cell.click()
    except PlaywrightTimeoutError as exc:
        raise RuntimeError(f"Could not find calendar {name}.") from exc


def kinray_select_calendar_year(page: Any, target_year: int) -> None:
    from playwright.sync_api import TimeoutError as PlaywrightTimeoutError

    for _ in range(10):
        year_cell = page.locator(".mat-calendar-body-cell").filter(has_text=re.compile(rf"^\s*{target_year}\s*$")).first
        try:
            year_cell.wait_for(state="visible", timeout=1_000)
            year_cell.click()
            return
        except PlaywrightTimeoutError:
            years = kinray_visible_calendar_years(page)
            if not years:
                raise RuntimeError("Calendar year grid did not render.")
            if target_year < min(years):
                page.locator("button.mat-calendar-previous-button").click()
            elif target_year > max(years):
                page.locator("button.mat-calendar-next-button").click()
            else:
                raise RuntimeError(f"Calendar year {target_year} was not selectable.")
    raise RuntimeError(f"Could not navigate calendar to year {target_year}.")


def kinray_select_date_with_calendar(page: Any, input_name: str, target_date: date) -> None:
    from playwright.sync_api import TimeoutError as PlaywrightTimeoutError

    field = page.locator(f"input[name='{input_name}']").first
    field.wait_for(state="visible", timeout=15_000)
    field.locator("xpath=ancestor::mat-form-field[1]//button[@aria-label='Open calendar']").click()
    page.wait_for_selector(".mat-datepicker-content, mat-datepicker-content, .mat-calendar", timeout=10_000)
    page.locator(".mat-calendar-period-button").first.click()
    kinray_select_calendar_year(page, target_date.year)
    kinray_click_calendar_cell(page, re.compile(rf"^\s*{kinray_month_short(target_date)}\s*$", re.I), "month")
    day_label = f"{kinray_month_full(target_date)} {target_date.day}, {target_date.year}"
    day_cell = page.locator(f".mat-calendar-body-cell[aria-label='{day_label}']").first
    try:
        day_cell.wait_for(state="visible", timeout=5_000)
        day_cell.click()
    except PlaywrightTimeoutError as exc:
        raise RuntimeError(f"Could not select calendar day {day_label}.") from exc


def kinray_formatted_for_input(target_date: date, input_type: str | None) -> str:
    return target_date.isoformat() if input_type == "date" else target_date.strftime("%m/%d/%Y")


def kinray_fill_date_field(page: Any, labels: list[str], target_date: date, field_name: str) -> None:
    candidates: list[Any] = []
    for label in labels:
        candidates.append(lambda label=label: page.get_by_label(re.compile(label, re.I)))
        candidates.append(f"input[name*='{label}' i]")
        candidates.append(f"input[id*='{label}' i]")
    locator = kinray_first_visible(page, candidates, timeout_ms=3_000)
    if locator is None:
        raise RuntimeError(f"Could not find {field_name} date field.")
    locator.fill(kinray_formatted_for_input(target_date, locator.get_attribute("type")))
    locator.evaluate(
        """element => {
            element.dispatchEvent(new Event('input', { bubbles: true }));
            element.dispatchEvent(new Event('change', { bubbles: true }));
            element.blur();
        }"""
    )


def kinray_select_invoice_detail_report(page: Any) -> None:
    from playwright.sync_api import Error as PlaywrightError

    mat_select = kinray_first_visible(
        page,
        [
            "mat-select[name='reports']",
            lambda: page.get_by_role("combobox", name=re.compile("report|select report", re.I)),
        ],
        timeout_ms=10_000,
    )
    if mat_select is not None:
        mat_select.click()
        option = page.locator(".cdk-overlay-container mat-option, [role='option']").filter(
            has_text=re.compile(r"^\s*Invoice Detail\s*$", re.I)
        ).first
        option.wait_for(state="visible", timeout=10_000)
        option.click()
        page.locator("input[name='startDate']").wait_for(state="visible", timeout=15_000)
        page.locator("input[name='endDate']").wait_for(state="visible", timeout=15_000)
        return
    report_choice = kinray_first_visible(
        page,
        [
            lambda: page.get_by_label(re.compile("report", re.I)),
            "select[name*='report' i]",
            "select[id*='report' i]",
        ],
        timeout_ms=5_000,
    )
    if report_choice is not None:
        try:
            report_choice.select_option(label="Invoice Detail")
            return
        except PlaywrightError:
            pass
    kinray_click_first(
        page,
        [
            lambda: page.get_by_role("combobox", name=re.compile("report", re.I)),
            lambda: page.get_by_text(re.compile("select report|report", re.I)),
            "div[role='combobox']",
        ],
        "report selector",
    )
    kinray_click_first(
        page,
        [
            lambda: page.get_by_role("option", name=re.compile("Invoice Detail", re.I)),
            lambda: page.get_by_text(re.compile("^\\s*Invoice Detail\\s*$", re.I)),
        ],
        "Invoice Detail option",
    )


def kinray_run_report(page: Any, context: Any, config: KinrayConfig) -> Any:
    from playwright.sync_api import TimeoutError as PlaywrightTimeoutError

    kinray_select_invoice_detail_report(page)
    if kinray_first_visible(page, ["input[name='startDate']", "input[name='endDate']"], timeout_ms=2_000):
        kinray_select_date_with_calendar(page, "startDate", config.start_date)
        kinray_select_date_with_calendar(page, "endDate", config.end_date)
    else:
        kinray_fill_date_field(page, ["start", "from", "begin"], config.start_date, "start")
        kinray_fill_date_field(page, ["end", "to", "through"], config.end_date, "end")
    view_button = kinray_first_visible(
        page,
        [
            lambda: page.get_by_role("button", name=re.compile("view report|run report|submit|view", re.I)),
            "button[type='submit']",
            "input[type='submit']",
        ],
        timeout_ms=5_000,
    )
    if view_button is None:
        raise RuntimeError("Could not find View Report button.")
    page.wait_for_function(
        """() => {
            const buttons = [...document.querySelectorAll('button')];
            const button = buttons.find(b => /view report/i.test((b.innerText || '').trim()));
            return button && !button.disabled;
        }""",
        timeout=15_000,
    )
    try:
        with context.expect_page(timeout=60_000) as popup_info:
            view_button.click()
        report_page = popup_info.value
    except PlaywrightTimeoutError:
        report_page = page
    kinray_wait_for_page_navigation(report_page)
    return report_page


def kinray_page_and_frames(page: Any) -> list[Any]:
    return [page, *page.frames]


def kinray_wait_for_report_ready(page: Any) -> None:
    from playwright.sync_api import Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError

    deadline = time.time() + (KINRAY_REPORT_TIMEOUT_MS / 1000)
    while time.time() < deadline:
        try:
            page.wait_for_load_state("domcontentloaded", timeout=5_000)
        except (PlaywrightTimeoutError, PlaywrightError):
            pass
        for scope in kinray_page_and_frames(page):
            invalid = kinray_first_visible(
                scope,
                [lambda scope=scope: scope.get_by_text(re.compile("Invalid connection", re.I))],
                timeout_ms=500,
            )
            if invalid is not None:
                raise RuntimeError("Report viewer returned 'Invalid connection'.")
            marker = kinray_first_visible(
                scope,
                [
                    ".wingTestWorkbenchTBExportAction",
                    "button[title*='Export' i]",
                    lambda scope=scope: scope.get_by_title(re.compile("Export", re.I)),
                    lambda scope=scope: scope.get_by_text(re.compile("Invoice Details|File\\s+Query\\s+Analyze", re.I)),
                    "table",
                ],
                timeout_ms=1_500,
            )
            if marker is not None:
                return
        page.wait_for_timeout(2_000)
    raise RuntimeError("Report viewer did not appear before timeout.")


def kinray_handle_report_popup(report_page: Any, config: KinrayConfig) -> Any:
    if kinray_is_login_form_visible(report_page):
        kinray_login(report_page, config)
    kinray_wait_for_report_ready(report_page)
    return report_page


def kinray_export_candidates(scope: Any) -> list[Any]:
    return [
        ".wingTestWorkbenchTBExportAction",
        "button[title*='Export' i]",
        "[aria-label*='Export' i]",
        lambda: scope.get_by_role("button", name=re.compile("export", re.I)),
        lambda: scope.get_by_title(re.compile("export", re.I)),
        lambda: scope.get_by_text(re.compile("^\\s*export\\s*$", re.I)),
    ]


def kinray_csv_candidates(scope: Any) -> list[Any]:
    return [
        "li[role='option']:has-text('CSV')",
        ".wingTestExportMasterListEntry:has-text('CSV')",
        lambda: scope.get_by_role("option", name=re.compile("^\\s*CSV\\s*$", re.I)),
        lambda: scope.get_by_role("radio", name=re.compile("^\\s*csv\\s*$|comma", re.I)),
        lambda: scope.get_by_label(re.compile("^\\s*csv\\s*$|comma", re.I)),
        lambda: scope.get_by_text(re.compile("^\\s*csv\\s*$|comma separated", re.I)),
        "input[value*='CSV' i]",
    ]


def kinray_choose_csv_export(page: Any) -> Any:
    for scope in kinray_page_and_frames(page):
        csv_option = kinray_first_visible(scope, kinray_csv_candidates(scope), timeout_ms=3_000)
        if csv_option is not None:
            csv_option.click()
            return scope
    raise RuntimeError("Could not select CSV in the export dialog.")


def kinray_export_confirm_candidates(scope: Any) -> list[Any]:
    return [
        "button.wingTestConfirmExportButton",
        "button[title='Export']",
        lambda: scope.get_by_role("button", name=re.compile("^\\s*export\\s*$", re.I)),
        "button:has-text('Export')",
    ]


def kinray_unique_output_path(directory: Path, filename: str) -> Path:
    directory.mkdir(parents=True, exist_ok=True)
    candidate = directory / filename
    if not candidate.exists():
        return candidate
    stem = candidate.stem
    suffix = candidate.suffix
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    for counter in range(1, 1000):
        numbered = directory / f"{stem}_{timestamp}_{counter}{suffix}"
        if not numbered.exists():
            return numbered
    raise RuntimeError(f"Could not find an available filename for {filename}.")


def kinray_export_csv(page: Any, config: KinrayConfig) -> Path:
    export_scope = None
    for attempt in range(1, 6):
        page.wait_for_timeout(2_000 if attempt == 1 else 4_000)
        export_button = None
        for scope in kinray_page_and_frames(page):
            export_button = kinray_first_visible(scope, kinray_export_candidates(scope), timeout_ms=1_500)
            if export_button is not None:
                break
        if export_button is not None:
            export_button.click()
        else:
            page.keyboard.press("Control+E")
        page.wait_for_timeout(2_000)
        try:
            export_scope = kinray_choose_csv_export(page)
            break
        except RuntimeError:
            continue
    if export_scope is None:
        raise RuntimeError("Could not open export dialog after multiple attempts.")
    final_path = kinray_unique_output_path(config.download_dir, config.output_filename)
    with page.expect_download(timeout=KINRAY_REPORT_TIMEOUT_MS) as download_info:
        kinray_click_first(export_scope, kinray_export_confirm_candidates(export_scope), "Export button in export dialog")
    download = download_info.value
    download.save_as(final_path)
    return final_path


def kinray_launch_context(playwright: Any, config: KinrayConfig) -> Any:
    from playwright.sync_api import Error as PlaywrightError

    config.profile_dir.mkdir(parents=True, exist_ok=True)
    launch_options = {
        "user_data_dir": str(config.profile_dir),
        "headless": config.headless,
        "accept_downloads": True,
        "user_agent": KINRAY_NORMAL_CHROME_UA,
        "viewport": {"width": 1280, "height": 720},
        "locale": "en-US",
        "timezone_id": "America/New_York",
    }
    try:
        return playwright.chromium.launch_persistent_context(channel="chrome", **launch_options)
    except PlaywrightError:
        return playwright.chromium.launch_persistent_context(**launch_options)


def kinray_download_invoice_report(config: KinrayConfig) -> Path:
    from playwright.sync_api import sync_playwright

    context = None
    with sync_playwright() as playwright:
        context = kinray_launch_context(playwright, config)
        context.set_default_timeout(config.timeout_ms)
        context.set_default_navigation_timeout(KINRAY_REPORT_TIMEOUT_MS)
        try:
            page = context.new_page()
            kinray_safe_goto(page, KINRAY_LOGIN_URL)
            kinray_login(page, config)
            kinray_navigate_to_reports(page, config)
            page.wait_for_timeout(1_000)
            kinray_save_auth_state(context, config)
            report_page = kinray_run_report(page, context, config)
            page = kinray_handle_report_popup(report_page, config)
            output_path = kinray_export_csv(page, config)
            kinray_save_auth_state(context, config)
            return output_path
        finally:
            if context is not None:
                context.close()


def dpapi_protect(data: bytes) -> str:
    try:
        import win32crypt  # type: ignore

        protected = win32crypt.CryptProtectData(data, "Cloud Pharmacy Connector", None, None, None, 0)
        return base64.b64encode(protected).decode("ascii")
    except Exception as exc:
        raise RuntimeError("Windows DPAPI/pywin32 is required to store connector secrets.") from exc


def dpapi_unprotect(value: str) -> bytes:
    try:
        import win32crypt  # type: ignore

        _description, data = win32crypt.CryptUnprotectData(base64.b64decode(value), None, None, None, 0)
        return bytes(data)
    except Exception as exc:
        raise RuntimeError("Could not decrypt connector config with this Windows user/machine.") from exc


@dataclass
class ConnectorConfig:
    cloud_url: str
    tenant_id: str
    connector_id: str
    connector_token: str
    signing_secret: str
    sql_connection_string: str
    poll_interval_seconds: int = 5
    allowed_file_roots: list[str] = field(default_factory=list)

    def __post_init__(self) -> None:
        self.cloud_url = require_https_url(self.cloud_url)

    def save(self, path: Path) -> None:
        path.parent.mkdir(parents=True, exist_ok=True)
        public = {
            "cloud_url": self.cloud_url,
            "tenant_id": self.tenant_id,
            "connector_id": self.connector_id,
            "poll_interval_seconds": self.poll_interval_seconds,
            "allowed_file_roots": self.allowed_file_roots,
        }
        secrets = {
            "connector_token": self.connector_token,
            "signing_secret": self.signing_secret,
            "sql_connection_string": self.sql_connection_string,
        }
        public["protected_secrets"] = dpapi_protect(json.dumps(secrets).encode("utf-8"))
        path.write_text(json.dumps(public, indent=2, sort_keys=True), encoding="utf-8")

    @classmethod
    def load(cls, path: Path) -> "ConnectorConfig":
        public = json.loads(path.read_text(encoding="utf-8"))
        secrets = json.loads(dpapi_unprotect(str(public["protected_secrets"])).decode("utf-8"))
        return cls(
            cloud_url=str(public["cloud_url"]).rstrip("/"),
            tenant_id=str(public["tenant_id"]),
            connector_id=str(public["connector_id"]),
            connector_token=str(secrets["connector_token"]),
            signing_secret=str(secrets["signing_secret"]),
            sql_connection_string=str(secrets["sql_connection_string"]),
            poll_interval_seconds=int(public.get("poll_interval_seconds", 5)),
            allowed_file_roots=[str(item) for item in public.get("allowed_file_roots", [])],
        )


def tenant_dashboard_url(config: ConnectorConfig) -> str:
    return f"{config.cloud_url}/?{urlencode({'tenant_id': config.tenant_id})}"


@dataclass
class JobEnvelope:
    tenant_id: str
    connector_id: str
    job_id: str
    operation_id: str
    job_type: str
    parameters: dict[str, Any] = field(default_factory=dict)
    timeout_seconds: int = 60
    max_rows: int = 10_000
    max_bytes: int = 10 * 1024 * 1024
    created_by: str = "system"
    created_at: str = field(default_factory=utc_now)
    server_signature: str = ""

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "JobEnvelope":
        return cls(
            tenant_id=str(data.get("tenant_id", "")),
            connector_id=str(data.get("connector_id", "")),
            job_id=str(data.get("job_id", "")),
            operation_id=str(data.get("operation_id", "")),
            job_type=str(data.get("job_type", "")),
            parameters=dict(data.get("parameters") or {}),
            timeout_seconds=int(data.get("timeout_seconds", 60)),
            max_rows=int(data.get("max_rows", 10_000)),
            max_bytes=int(data.get("max_bytes", 10 * 1024 * 1024)),
            created_by=str(data.get("created_by", "system")),
            created_at=str(data.get("created_at") or utc_now()),
            server_signature=str(data.get("server_signature", "")),
        )

    def to_dict(self) -> dict[str, Any]:
        return asdict(self)


@dataclass
class JobResult:
    tenant_id: str
    connector_id: str
    job_id: str
    status: str
    started_at: str
    finished_at: str
    row_count: int = 0
    result: dict[str, Any] | list[Any] | None = None
    result_ref: str | None = None
    error_code: str | None = None
    error_message: str | None = None
    connector_version: str = CONNECTOR_VERSION
    connector_signature: str = ""

    def to_dict(self) -> dict[str, Any]:
        return asdict(self)


class LocalConnector:
    def __init__(self, config: ConnectorConfig) -> None:
        requests = importlib.import_module("requests")

        self.config = config
        self.session = requests.Session()
        self.session.headers.update(
            {
                "Authorization": f"Bearer {config.connector_token}",
                "User-Agent": f"cloud-pharmacy-connector/{CONNECTOR_VERSION}",
            }
        )

    def poll_once(self) -> bool:
        cloud_url = require_https_url(self.config.cloud_url)
        response = self.session.post(
            f"{cloud_url}/connector/jobs/poll",
            json={"version": CONNECTOR_VERSION, "wait_seconds": CONNECTOR_LONG_POLL_SECONDS},
            timeout=max(45, CONNECTOR_LONG_POLL_SECONDS + 15),
        )
        if response.status_code == 204:
            return False
        response.raise_for_status()
        envelope = JobEnvelope.from_dict(response.json())
        result = self.execute_envelope(envelope)
        signed_result = attach_result_signature(result.to_dict(), self.config.signing_secret)
        upload = self.session.post(
            f"{cloud_url}/connector/jobs/{envelope.job_id}/result",
            json=signed_result,
            timeout=45,
        )
        upload.raise_for_status()
        return True

    def run_forever(self) -> None:
        print(f"Connector running for {self.config.connector_id}. Cloud: {tenant_dashboard_url(self.config)}")
        while True:
            try:
                handled = self.poll_once()
                if not handled and CONNECTOR_LONG_POLL_SECONDS <= 0:
                    time.sleep(self.config.poll_interval_seconds)
            except KeyboardInterrupt:
                raise
            except Exception as exc:
                print(f"[{utc_now()}] polling failed: {exc}")
                time.sleep(max(5, self.config.poll_interval_seconds))

    def execute_envelope(self, envelope: JobEnvelope) -> JobResult:
        started_at = utc_now()
        try:
            if envelope.tenant_id != self.config.tenant_id or envelope.connector_id != self.config.connector_id:
                raise ConnectorError("wrong_connector", "Job was not addressed to this connector.")
            if not verify_signature(envelope.to_dict(), self.config.signing_secret):
                raise ConnectorError("bad_signature", "Job signature verification failed.")
            payload = self.execute_job(envelope)
            return JobResult(
                tenant_id=envelope.tenant_id,
                connector_id=envelope.connector_id,
                job_id=envelope.job_id,
                status=STATUS_SUCCEEDED,
                started_at=started_at,
                finished_at=utc_now(),
                row_count=int(payload.pop("_row_count", 0)),
                result=payload,
            )
        except ConnectorError as exc:
            return JobResult(
                tenant_id=envelope.tenant_id,
                connector_id=envelope.connector_id,
                job_id=envelope.job_id,
                status=STATUS_FAILED,
                started_at=started_at,
                finished_at=utc_now(),
                error_code=exc.code,
                error_message=str(exc),
            )
        except Exception as exc:
            return JobResult(
                tenant_id=envelope.tenant_id,
                connector_id=envelope.connector_id,
                job_id=envelope.job_id,
                status=STATUS_FAILED,
                started_at=started_at,
                finished_at=utc_now(),
                error_code=type(exc).__name__,
                error_message=str(exc),
            )

    def execute_job(self, envelope: JobEnvelope) -> dict[str, Any]:
        if envelope.job_type == JOB_CONNECTOR_HEALTH:
            return self.health(bool(envelope.parameters.get("include_database", True)))
        if envelope.job_type == JOB_SQL_QUERY:
            return self.sql_query(envelope)
        if envelope.job_type == JOB_SQL_EXECUTE:
            return self.sql_execute(envelope)
        if envelope.job_type == JOB_FILE_FETCH:
            return self.file_fetch(envelope)
        if envelope.job_type == JOB_FILE_DELETE:
            return self.file_delete(envelope)
        if envelope.job_type == JOB_FILE_WRITE:
            return self.file_write(envelope)
        if envelope.job_type == JOB_KINRAY_REFRESH:
            return self.kinray_refresh(envelope)
        if envelope.job_type == JOB_FILE_RENDER:
            raise ConnectorError("not_implemented", "file.render is not implemented in the standalone MVP connector.")
        raise ConnectorError("unsupported_job_type", f"Unsupported job type: {envelope.job_type}")

    def _connect_sql(self):
        try:
            import pyodbc
        except Exception as exc:
            raise ConnectorError("pyodbc_missing", "pyodbc is required on the pharmacy computer.") from exc
        return pyodbc.connect(self.config.sql_connection_string, timeout=20)

    @staticmethod
    def _json_value(value: Any) -> Any:
        if isinstance(value, (datetime, date)):
            return value.isoformat()
        if isinstance(value, Decimal):
            return str(value)
        if isinstance(value, bytes):
            return base64.b64encode(value).decode("ascii")
        return value

    def _rows_to_dicts(self, cursor, max_rows: int) -> list[dict[str, Any]]:
        columns = [column[0] for column in cursor.description or []]
        limit = max(0, int(max_rows or 0))
        if limit <= 0 or not columns:
            return []
        rows: list[dict[str, Any]] = []
        json_value = self._json_value
        while len(rows) < limit:
            batch = cursor.fetchmany(min(CONNECTOR_FETCHMANY_CHUNK_SIZE, limit - len(rows)))
            if not batch:
                break
            rows.extend({column: json_value(value) for column, value in zip(columns, row)} for row in batch)
        return rows

    def sql_query(self, envelope: JobEnvelope) -> dict[str, Any]:
        sql = str(envelope.parameters.get("sql") or "")
        params = list(envelope.parameters.get("params") or [])
        validate_select_sql(sql)
        with self._connect_sql() as connection:
            cursor = connection.cursor()
            cursor.execute(sql, *params)
            rows = self._rows_to_dicts(cursor, envelope.max_rows)
        return {"rows": rows, "sql_hash": sql_hash(sql), "_row_count": len(rows)}

    def sql_execute(self, envelope: JobEnvelope) -> dict[str, Any]:
        sql = str(envelope.parameters.get("sql") or "")
        params = list(envelope.parameters.get("params") or [])
        expected_hash = str(envelope.parameters.get("expected_sql_hash") or "")
        approval_id = str(envelope.parameters.get("approval_id") or "")
        idempotency_key = str(envelope.parameters.get("idempotency_key") or "")
        max_rowcount = int(envelope.parameters.get("expected_max_rowcount", 0))
        if not approval_id or not idempotency_key:
            raise ConnectorError("missing_approval", "sql.execute requires approval_id and idempotency_key.")
        if expected_hash != sql_hash(sql):
            raise ConnectorError("sql_hash_mismatch", "Signed SQL hash does not match SQL text.")
        validate_execute_sql(sql)
        with self._connect_sql() as connection:
            connection.autocommit = False
            try:
                cursor = connection.cursor()
                cursor.execute(sql, *params)
                rowcount = max(0, int(cursor.rowcount or 0))
                if max_rowcount and rowcount > max_rowcount:
                    raise ConnectorError("rowcount_limit_exceeded", f"Operation affected {rowcount} rows; limit is {max_rowcount}.")
                connection.commit()
            except Exception:
                connection.rollback()
                raise
        return {"rowcount": rowcount, "sql_hash": expected_hash, "_row_count": rowcount}

    def _allowed_path(self, raw_path: str) -> Path:
        if not self.config.allowed_file_roots:
            raise ConnectorError("file_roots_not_configured", "No local file roots are configured.")
        target = Path(raw_path).expanduser().resolve()
        for root_text in self.config.allowed_file_roots:
            root = Path(root_text).expanduser().resolve()
            try:
                target.relative_to(root)
                return target
            except ValueError:
                continue
        raise ConnectorError("path_not_allowed", "Requested file is outside configured allowed roots.")

    def file_fetch(self, envelope: JobEnvelope) -> dict[str, Any]:
        path = self._allowed_path(str(envelope.parameters.get("path") or ""))
        if not path.is_file():
            raise ConnectorError("file_not_found", "Requested file was not found.")
        size = path.stat().st_size
        if size > envelope.max_bytes:
            raise ConnectorError("file_too_large", f"Requested file is {size} bytes; limit is {envelope.max_bytes}.")
        data = path.read_bytes()
        mime_type = mimetypes.guess_type(str(path))[0] or "application/octet-stream"
        return {
            "filename": path.name,
            "mime_type": mime_type,
            "size": size,
            "content_base64": base64.b64encode(data).decode("ascii"),
            "_row_count": 1,
        }

    def file_delete(self, envelope: JobEnvelope) -> dict[str, Any]:
        path = self._allowed_path(str(envelope.parameters.get("path") or ""))
        existed = path.exists()
        if existed:
            if not path.is_file():
                raise ConnectorError("not_a_file", "Requested path is not a file.")
            path.unlink()
        return {"path": str(path), "existed": existed, "deleted": existed, "_row_count": 1 if existed else 0}

    def file_write(self, envelope: JobEnvelope) -> dict[str, Any]:
        path = self._allowed_path(str(envelope.parameters.get("path") or ""))
        encoded = str(envelope.parameters.get("content_base64") or "")
        overwrite = bool(envelope.parameters.get("overwrite", False))
        if not encoded:
            raise ConnectorError("missing_file_content", "file.write requires content_base64.")
        data = base64.b64decode(encoded)
        if len(data) > envelope.max_bytes:
            raise ConnectorError("file_too_large", f"Write content is {len(data)} bytes; limit is {envelope.max_bytes}.")
        if path.exists() and not overwrite:
            raise ConnectorError("file_exists", "Requested file already exists and overwrite is false.")
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_bytes(data)
        return {"path": str(path), "size": len(data), "written": True, "_row_count": 1}

    def kinray_refresh(self, envelope: JobEnvelope) -> dict[str, Any]:
        username = str(envelope.parameters.get("username") or "")
        password = str(envelope.parameters.get("password") or "")
        if not username or not password:
            raise ConnectorError("missing_kinray_credentials", "kinray.refresh requires username and password.")

        base_dir = kinray_base_dir()
        download_dir = Path(str(envelope.parameters.get("download_dir") or base_dir / "downloads")).expanduser()
        legacy_auth_dir = kinray_legacy_auth_dir()
        profile_dir = Path(str(envelope.parameters.get("profile_dir") or legacy_auth_dir / "kinray_playwright_profile")).expanduser()
        auth_state_path = Path(str(envelope.parameters.get("auth_state_path") or legacy_auth_dir / "kinray_state.json")).expanduser()
        start_date = parse_iso_date(envelope.parameters.get("start_date"), KINRAY_DEFAULT_START_DATE)
        end_date = parse_iso_date(envelope.parameters.get("end_date"), date.today())
        timeout_ms = int(envelope.parameters.get("timeout_ms") or KINRAY_DEFAULT_TIMEOUT_MS)

        config = KinrayConfig(
            username=username,
            password=password,
            start_date=start_date,
            end_date=end_date,
            download_dir=download_dir,
            profile_dir=profile_dir,
            auth_state_path=auth_state_path,
            headless=True,
            timeout_ms=timeout_ms,
        )
        mode = "headless"
        try:
            downloaded_path = kinray_download_invoice_report(config)
        except KinrayMfaRequired:
            mode = "visible_mfa"
            downloaded_path = kinray_download_invoice_report(
                KinrayConfig(
                    username=username,
                    password=password,
                    start_date=start_date,
                    end_date=end_date,
                    download_dir=download_dir,
                    profile_dir=profile_dir,
                    auth_state_path=auth_state_path,
                    headless=False,
                    timeout_ms=timeout_ms,
                )
            )

        downloaded_path = Path(downloaded_path)
        if not downloaded_path.is_file():
            raise ConnectorError("kinray_file_missing", f"Kinray download did not create a file: {downloaded_path}")
        size = downloaded_path.stat().st_size
        if size > envelope.max_bytes:
            raise ConnectorError("file_too_large", f"Downloaded Kinray file is {size} bytes; limit is {envelope.max_bytes}.")
        data = downloaded_path.read_bytes()
        return {
            "filename": downloaded_path.name,
            "path": str(downloaded_path),
            "mime_type": mimetypes.guess_type(str(downloaded_path))[0] or "application/zip",
            "size": size,
            "content_base64": base64.b64encode(data).decode("ascii"),
            "start_date": start_date.isoformat(),
            "end_date": end_date.isoformat(),
            "mode": mode,
            "_row_count": 1,
        }

    def health(self, include_database: bool) -> dict[str, Any]:
        payload: dict[str, Any] = {
            "connector_version": CONNECTOR_VERSION,
            "tenant_id": self.config.tenant_id,
            "connector_id": self.config.connector_id,
            "hostname": os.environ.get("COMPUTERNAME") or os.environ.get("HOSTNAME") or "",
            "allowed_file_roots": self.config.allowed_file_roots,
            "_row_count": 1,
        }
        try:
            import pyodbc

            payload["odbc_drivers"] = list(pyodbc.drivers())
        except Exception as exc:
            payload["odbc_error"] = str(exc)
        if include_database:
            try:
                with self._connect_sql() as connection:
                    cursor = connection.cursor()
                    cursor.execute("SELECT DNAME FROM DF0001")
                    row = cursor.fetchone()
                    payload["database_ok"] = True
                    payload["pharmacy_name"] = str(row[0]) if row else ""
            except Exception as exc:
                payload["database_ok"] = False
                payload["database_error"] = str(exc)
        return payload


def import_or_none(module_name: str) -> ModuleType | None:
    try:
        return importlib.import_module(module_name)
    except ImportError:
        return None


def ensure_python_packages(skip_install: bool = False) -> None:
    if os.name != "nt":
        raise SystemExit("This connector setup requires Windows because it stores secrets with Windows DPAPI.")

    missing_packages: list[str] = []
    for module_name, package_name in REQUIRED_IMPORTS:
        if import_or_none(module_name) is None:
            missing_packages.append(package_name)

    if not missing_packages:
        print("Python package check passed.")
        return

    unique_packages = sorted(set(missing_packages))
    if skip_install:
        raise SystemExit("Missing Python packages: " + ", ".join(unique_packages))

    print("Installing missing Python packages: " + ", ".join(unique_packages))
    subprocess.check_call([sys.executable, "-m", "pip", "install", *unique_packages])
    importlib.invalidate_caches()

    still_missing = [module for module, _package in REQUIRED_IMPORTS if import_or_none(module) is None]
    if still_missing:
        raise SystemExit("Package installation finished, but these imports still failed: " + ", ".join(still_missing))
    print("Python package installation completed.")


def dpapi_roundtrip_check() -> None:
    sample = b"cloud-pharmacy-connector-auto-dpapi-check"
    protected = dpapi_protect(sample)
    unprotected = dpapi_unprotect(protected)
    if unprotected != sample:
        raise SystemExit("Windows DPAPI check failed. Connector secrets cannot be stored safely.")


def inspect_config(config_path: Path) -> ConfigStatus:
    if not config_path.exists():
        return ConfigStatus(False, f"No connector config exists at {config_path}.")

    try:
        public_config = json.loads(config_path.read_text(encoding="utf-8"))
    except Exception as exc:
        return ConfigStatus(False, f"Connector config exists, but it is not readable JSON: {exc}")

    if not public_config.get("protected_secrets"):
        return ConfigStatus(False, "Connector config exists, but it does not contain DPAPI-protected secrets.")

    try:
        loaded = ConnectorConfig.load(config_path)
    except Exception as exc:
        return ConfigStatus(False, f"Connector config exists, but DPAPI could not decrypt it: {exc}")

    required_values = {
        "cloud_url": loaded.cloud_url,
        "tenant_id": loaded.tenant_id,
        "connector_id": loaded.connector_id,
        "connector_token": loaded.connector_token,
        "signing_secret": loaded.signing_secret,
        "sql_connection_string": loaded.sql_connection_string,
    }
    missing = [name for name, value in required_values.items() if not value]
    if missing:
        return ConfigStatus(False, "Connector config is missing required value(s): " + ", ".join(missing))

    return ConfigStatus(True, f"Existing connector config is usable at {config_path}.")


def config_publicly_looks_activated(config_path: Path) -> bool:
    if not config_path.exists():
        return False
    try:
        public_config = json.loads(config_path.read_text(encoding="utf-8"))
    except Exception:
        return False
    required_public_values = ("cloud_url", "tenant_id", "connector_id", "protected_secrets")
    return all(bool(public_config.get(name)) for name in required_public_values)


def prompt_text(prompt: str, default: str | None = None) -> str:
    suffix = f" [{default}]" if default else ""
    while True:
        value = input(f"{prompt}{suffix}: ").strip()
        if value:
            return value
        if default:
            return default
        print("This value is required.")


def prompt_secret(prompt: str) -> str:
    while True:
        value = getpass.getpass(prompt + ": ")
        if value:
            return value
        print("This value is required.")


def prompt_yes_no(prompt: str, default: bool = False) -> bool:
    default_text = "Y/n" if default else "y/N"
    while True:
        value = input(f"{prompt} [{default_text}]: ").strip().lower()
        if not value:
            return default
        if value in {"y", "yes"}:
            return True
        if value in {"n", "no"}:
            return False
        print("Please enter y or n.")


def build_sql_connection_string(sql_password: str) -> str:
    if os.environ.get("PHARMACY_CONNECTOR_SQL_CONNECTION", "").strip():
        print("Ignoring PHARMACY_CONNECTOR_SQL_CONNECTION; using the standard local PharmSQL connection.")
    print(f"Using local PharmSQL Server: {default_sql_server()}")
    return default_sql_connection_string(sql_password)


def warn_if_odbc_driver_missing(expected_driver: str) -> None:
    pyodbc = importlib.import_module("pyodbc")
    try:
        drivers = list(pyodbc.drivers())
    except Exception as exc:
        print(f"Could not list ODBC drivers: {exc}")
        return

    if expected_driver not in drivers:
        print(f"Warning: '{expected_driver}' was not found in installed ODBC drivers.")
        if drivers:
            print("Installed ODBC drivers: " + ", ".join(drivers))


def fetch_admin_token_via_ssh(args: argparse.Namespace, ssh_password: str) -> str:
    paramiko = importlib.import_module("paramiko")
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        client.connect(
            hostname=args.ssh_host,
            port=args.ssh_port,
            username=args.ssh_user,
            password=ssh_password,
            look_for_keys=False,
            allow_agent=False,
            timeout=20,
            banner_timeout=20,
            auth_timeout=20,
        )
        _stdin, stdout, stderr = client.exec_command(args.admin_token_command, timeout=20)
        token = stdout.read().decode("utf-8", errors="replace").strip()
        error_text = stderr.read().decode("utf-8", errors="replace").strip()
        exit_status = stdout.channel.recv_exit_status()
    finally:
        client.close()

    if exit_status != 0:
        raise RuntimeError(f"SSH admin token command failed with exit code {exit_status}: {error_text}")
    if not token:
        raise RuntimeError("SSH admin token command succeeded, but returned an empty token.")
    return token


def create_activation_code(cloud_url: str, admin_token: str, tenant_id: str, display_name: str) -> str:
    requests = importlib.import_module("requests")
    cloud_url = require_https_url(cloud_url)
    response = requests.post(
        f"{cloud_url.rstrip('/')}/api/admin/activation-codes",
        headers={"X-Admin-Token": admin_token},
        json={"tenant_id": tenant_id, "display_name": display_name},
        timeout=30,
    )
    try:
        response.raise_for_status()
    except Exception as exc:
        body = response.text.strip()
        detail = f" Server response: {body}" if body else ""
        raise RuntimeError(f"Unable to create activation code for client ID {tenant_id!r}.{detail}") from exc
    payload: dict[str, Any] = response.json()
    activation_code = str(payload.get("activation_code") or "")
    if not activation_code:
        raise RuntimeError("Cloud server did not return an activation_code.")
    return activation_code


def activate_connector(args: argparse.Namespace, activation_code: str, display_name: str) -> None:
    requests = importlib.import_module("requests")
    cloud_url = require_https_url(args.cloud_url)
    response = requests.post(
        f"{cloud_url}/connector/activate",
        json={
            "activation_code": activation_code,
            "display_name": display_name,
            "version": CONNECTOR_VERSION,
            "cloud_url": cloud_url,
        },
        timeout=30,
    )
    response.raise_for_status()
    payload = response.json()
    if not payload.get("ok"):
        raise SystemExit(payload)
    sql_password = str(payload.get("sql_password") or "")
    if not sql_password:
        raise SystemExit("Cloud server did not return sql_password during activation.")
    sql_connection_string = build_sql_connection_string(sql_password)
    config = ConnectorConfig(
        cloud_url=cloud_url,
        tenant_id=payload["tenant_id"],
        connector_id=payload["connector_id"],
        connector_token=payload["connector_token"],
        signing_secret=payload["signing_secret"],
        sql_connection_string=sql_connection_string,
        poll_interval_seconds=int(payload.get("poll_interval_seconds", 5)),
        allowed_file_roots=args.allowed_file_root or [],
    )
    config.save(args.config)
    print(f"Activated connector {config.connector_id}")
    print(f"Config saved to {args.config}")


def run_connector(config_path: Path, once: bool = False) -> None:
    config = ConnectorConfig.load(config_path)
    connector = LocalConnector(config)
    if once:
        connector.poll_once()
    else:
        connector.run_forever()


def collect_activation_inputs(args: argparse.Namespace) -> tuple[str, str, str]:
    hostname = socket.gethostname() or "Pharmacy workstation"
    tenant_id = args.tenant_id or prompt_text("client_id / tenant_id")
    display_name = args.display_name or prompt_text("display_name", hostname)
    ssh_password = args.ssh_password or prompt_secret(f"SSH password for {args.ssh_user}@{args.ssh_host}")
    return tenant_id, display_name, ssh_password


def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Automate Cloud Pharmacy connector setup and run.")
    parser.add_argument("--cloud-url", default=DEFAULT_CLOUD_URL)
    parser.add_argument("--config", type=Path, default=default_config_path())
    parser.add_argument("--tenant-id", default="")
    parser.add_argument("--display-name", default="")
    parser.add_argument("--allowed-file-root", action="append", default=[])

    parser.add_argument("--ssh-host", default=DEFAULT_SSH_HOST)
    parser.add_argument("--ssh-user", default=DEFAULT_SSH_USER)
    parser.add_argument("--ssh-port", type=int, default=22)
    parser.add_argument("--ssh-password", default="")
    parser.add_argument("--admin-token-command", default=DEFAULT_ADMIN_TOKEN_COMMAND)

    parser.add_argument("--sql-driver", default=DEFAULT_SQL_DRIVER)

    parser.add_argument("--force-reactivate", action="store_true")
    parser.add_argument("--skip-dependency-install", action="store_true")
    parser.add_argument("--check-only", action="store_true")
    parser.add_argument("--no-run", action="store_true")
    parser.add_argument("--once", action="store_true")
    parser.add_argument("--interactive-setup", action="store_true", help=argparse.SUPPRESS)
    parser.add_argument("--no-window-bootstrap", action="store_true", help=argparse.SUPPRESS)
    return parser.parse_args(argv)


def windows_startup_requires_interaction(args: argparse.Namespace) -> bool:
    if args.check_only or args.no_run or args.once:
        return False
    if args.force_reactivate:
        return True
    return not config_publicly_looks_activated(args.config)


def bootstrap_windows_window_mode() -> None:
    if os.name != "nt":
        return
    if os.environ.get(WINDOW_BOOTSTRAP_ENV) == "1":
        return
    if "--help" in sys.argv[1:] or "-h" in sys.argv[1:]:
        return

    args = parse_args(sys.argv[1:])
    if args.no_window_bootstrap:
        return

    if windows_startup_requires_interaction(args):
        if not windows_has_console():
            spawn_windows_interactive_setup(sys.argv[1:])
            raise SystemExit(0)
        return

    if windows_has_console() and not (args.check_only or args.no_run or args.once):
        spawn_windows_silent(sys.argv[1:])
        raise SystemExit(0)


def main() -> int:
    configure_background_stdio()
    args = parse_args()
    args.cloud_url = args.cloud_url.rstrip("/")

    ensure_python_packages(skip_install=args.skip_dependency_install)
    dpapi_roundtrip_check()
    warn_if_odbc_driver_missing(args.sql_driver)

    status = inspect_config(args.config)
    print(status.reason)

    if args.check_only:
        return 0 if status.usable else 1

    should_activate = args.force_reactivate or not status.usable
    if os.name == "nt" and should_activate and not windows_has_console():
        spawn_windows_interactive_setup(sys.argv[1:])
        return 0

    if should_activate:
        if args.config.exists() and not args.force_reactivate and not prompt_yes_no(
            "Reactivate and overwrite the current connector config?", default=False
        ):
            print("Leaving existing connector config unchanged.")
            return 1

        tenant_id, display_name, ssh_password = collect_activation_inputs(args)
        print("Fetching admin token over SSH.")
        admin_token = fetch_admin_token_via_ssh(args, ssh_password)
        print("Creating activation code.")
        activation_code = create_activation_code(args.cloud_url, admin_token, tenant_id, display_name)
        print("Activating local connector.")
        activate_connector(args, activation_code, display_name)

        status = inspect_config(args.config)
        if not status.usable:
            raise SystemExit(status.reason)

    if args.no_run:
        print("Setup complete. Connector run skipped because --no-run was provided.")
        return 0

    if os.name == "nt" and args.interactive_setup:
        print("Setup complete. Starting connector in the background.")
        spawn_windows_silent(sys.argv[1:])
        return 0

    print("Starting connector. Press Ctrl+C to stop.")
    try:
        run_connector(args.config, once=args.once)
    except KeyboardInterrupt:
        print("Connector stopped.")
    return 0


if __name__ == "__main__":
    bootstrap_windows_window_mode()
    raise SystemExit(main())
