"""
Runnable proof — Premium ETL เฟส 2 (Warehouse & Load)
=====================================================
พิสูจน์ตรรกะ load จริงด้วย sqlite (stdlib) — ตรรกะ UPSERT/quality เดียวกับ loader.py (Postgres)
รัน:  python load_demo.py
ทำ:
  1) อ่าน output เฟส 1 (input_from_phase1.json)
  2) ฉีดข้อมูลพังจงใจ 1 แถว (total ติดลบ) เพื่อทดสอบ quality gate
  3) สร้าง warehouse (sqlite) + UPSERT
  4) รันโหลดซ้ำ -> พิสูจน์ idempotent (จำนวนแถวเท่าเดิม)
  5) query สรุปยอดขายรายแพลตฟอร์ม
"""
from __future__ import annotations

import os
import json
import sqlite3

from quality import quality_gate

DDL = """
CREATE TABLE IF NOT EXISTS orders (
  platform TEXT, platform_order_id TEXT, order_date TEXT, total_amount REAL,
  status TEXT, sku TEXT, qty INTEGER, unit_price REAL,
  PRIMARY KEY (platform, platform_order_id, sku)
);
CREATE TABLE IF NOT EXISTS inventory (
  platform TEXT, sku TEXT, stock INTEGER, updated_at TEXT,
  PRIMARY KEY (platform, sku)
);
"""

UPSERT_O = """
INSERT INTO orders (platform, platform_order_id, order_date, total_amount, status, sku, qty, unit_price)
VALUES (:platform, :platform_order_id, :order_date, :total_amount, :status, :sku, :qty, :unit_price)
ON CONFLICT (platform, platform_order_id, sku) DO UPDATE SET
  total_amount=excluded.total_amount, status=excluded.status,
  qty=excluded.qty, unit_price=excluded.unit_price
"""
UPSERT_I = """
INSERT INTO inventory (platform, sku, stock, updated_at)
VALUES (:platform, :sku, :stock, :updated_at)
ON CONFLICT (platform, sku) DO UPDATE SET stock=excluded.stock, updated_at=excluded.updated_at
"""


def load_into(conn, payload):
    gated = quality_gate(payload)
    conn.executemany(UPSERT_O, gated["clean_orders"])
    conn.executemany(UPSERT_I, gated["clean_inventory"])
    conn.commit()
    return gated


def main():
    src = os.environ.get("INPUT", "input_from_phase1.json")
    payload = json.load(open(src, encoding="utf-8"))

    # 2) ฉีดข้อมูลพังจงใจ -> ต้องโดน quality gate ปัด
    payload = json.loads(json.dumps(payload))   # copy
    payload["orders"].append({"platform": "shopee", "platform_order_id": "BAD-1",
                              "order_date": "2026-06-06T00:00:00Z", "total_amount": -999,
                              "status": "x", "sku": "SKU-X", "qty": 1, "unit_price": -999})

    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(DDL)

    g1 = load_into(conn, payload)
    n1 = conn.execute("SELECT COUNT(*) FROM orders").fetchone()[0]

    # 4) โหลดซ้ำ window เดิม -> ต้องไม่เพิ่มแถว (idempotent)
    load_into(conn, payload)
    n2 = conn.execute("SELECT COUNT(*) FROM orders").fetchone()[0]

    # 5) สรุปยอดขายรายแพลตฟอร์ม
    rows = conn.execute(
        "SELECT platform, COUNT(DISTINCT platform_order_id) AS orders, "
        "ROUND(SUM(total_amount),2) AS revenue FROM orders GROUP BY platform ORDER BY platform"
    ).fetchall()

    print("=== Quality Gate ===")
    print(json.dumps(g1["stats"], ensure_ascii=False))
    print("rejected:", [r["reason"] for r in g1["rejected"]])
    print("\n=== Idempotency ===")
    print(f"orders after run #1 = {n1} | after run #2 = {n2} | idempotent = {n1 == n2}")
    print("\n=== ยอดขายรายแพลตฟอร์ม (จาก warehouse) ===")
    for r in rows:
        print(f"  {r['platform']:10} orders={r['orders']}  revenue=฿{r['revenue']:,.0f}")

    out = {"stats": g1["stats"], "rejected_reasons": [r["reason"] for r in g1["rejected"]],
           "idempotent": n1 == n2, "rows_after_run1": n1, "rows_after_run2": n2,
           "by_platform": [dict(r) for r in rows]}
    json.dump(out, open(os.environ.get("OUT", "load_result.json"), "w", encoding="utf-8"),
              ensure_ascii=False, indent=2)


if __name__ == "__main__":
    main()
