2505 lines
104 KiB
Python
2505 lines
104 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import csv
|
|
import hashlib
|
|
import hmac
|
|
import io
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import secrets
|
|
import threading
|
|
import time
|
|
from http import HTTPStatus
|
|
from http.cookies import SimpleCookie
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import parse_qs, quote, urlencode, urljoin, urlparse
|
|
from urllib.request import Request, urlopen
|
|
|
|
try:
|
|
import psycopg
|
|
from psycopg.rows import dict_row
|
|
except ImportError:
|
|
psycopg = None
|
|
dict_row = None
|
|
|
|
|
|
# StreamHall server - single-file Python HTTP server (no external framework).
|
|
# Each request gets its own thread (ThreadingHTTPServer) and its own PostgreSQL
|
|
# connection. Two daemon threads run in the background: one monitors stream
|
|
# liveness, the other cleans up stale viewer sessions.
|
|
# Admin endpoints require either a session cookie or a Bearer/query-param API key.
|
|
|
|
class AppError(Exception):
|
|
"""Carries an i18n error code and an optional English detail string.
|
|
The code maps to err.<code> in the frontend translation tables.
|
|
The detail (e.g. a third-party API description) is passed through as-is."""
|
|
def __init__(self, code: str, detail: str = ""):
|
|
super().__init__(code)
|
|
self.code = code
|
|
self.detail = detail
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parent
|
|
PUBLIC_DIR = ROOT / "public"
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "").strip()
|
|
SECRET_KEY = os.getenv("SECRET_KEY", "change-this-secret").encode("utf-8")
|
|
SESSION_COOKIE = "streamhall_session"
|
|
SESSION_MAX_AGE = 60 * 60 * 24
|
|
PASSWORD_HASH_ITERATIONS = 240000
|
|
INITIAL_ADMIN_PASSWORD_BYTES = 18
|
|
PUBLIC_ID_BYTES = 9
|
|
STREAM_PROBE_TIMEOUT = float(os.getenv("STREAM_PROBE_TIMEOUT", "4"))
|
|
TELEGRAM_TIMEOUT = float(os.getenv("TELEGRAM_TIMEOUT", "6"))
|
|
STREAM_MONITOR_INTERVAL = max(5, int(os.getenv("STREAM_MONITOR_INTERVAL", "10")))
|
|
SRS_HTTP_ORIGIN = os.getenv("SRS_HTTP_ORIGIN", "http://srs:8080").rstrip("/")
|
|
OBS_ROUTE_SLUG_LENGTH = max(12, int(os.getenv("OBS_ROUTE_SLUG_LENGTH", "22")))
|
|
URL_PATH_SAFE = "/._~!$&'()*+,;=:@"
|
|
HLS_PROXY_PREFIX = "/proxy/hls"
|
|
HLS_URI_RE = re.compile(r'URI="([^"]+)"') # matches URI="..." attributes in HLS tag lines (e.g. EXT-X-KEY)
|
|
|
|
DEFAULT_SITE_SETTINGS = {
|
|
"site_title": "StreamHall",
|
|
"site_icon_url": "",
|
|
"site_description": "",
|
|
"site_description_en": "",
|
|
"site_nav_links": '[{"label":"直播列表","url":"#stream-list"}]',
|
|
"site_nav_links_en": '[{"label":"Streams","url":"#stream-list"}]',
|
|
"footer_markdown": "",
|
|
"footer_markdown_en": "",
|
|
"telegram_public_base_url": "",
|
|
}
|
|
|
|
DEFAULT_TELEGRAM_SETTINGS = {
|
|
"telegram_bot_token": "",
|
|
"telegram_chat_id": "",
|
|
"telegram_public_base_url": "",
|
|
"telegram_live_notify_start": "0",
|
|
"telegram_live_notify_stop": "0",
|
|
"telegram_live_start_template": "【开播提醒】{title}\n观看地址:{url}\n时间:{time}",
|
|
"telegram_live_stop_template": "【关播提醒】{title}\n时间:{time}",
|
|
"telegram_archive_notify_start": "0",
|
|
"telegram_archive_notify_stop": "0",
|
|
"telegram_archive_start_template": "【上架提醒】{title}\n观看地址:{url}\n时间:{time}",
|
|
"telegram_archive_stop_template": "【下架提醒】{title}\n时间:{time}",
|
|
}
|
|
|
|
|
|
|
|
def now() -> int:
|
|
return int(time.time())
|
|
|
|
|
|
class PostgresCursor:
|
|
def __init__(self, cursor):
|
|
self.cursor = cursor
|
|
self.rowcount = cursor.rowcount
|
|
|
|
def fetchone(self):
|
|
return self.cursor.fetchone()
|
|
|
|
def fetchall(self):
|
|
return self.cursor.fetchall()
|
|
|
|
|
|
class PostgresConnection:
|
|
def __init__(self):
|
|
if not DATABASE_URL:
|
|
raise RuntimeError("DATABASE_URL is required. StreamHall uses Postgres only.")
|
|
if psycopg is None:
|
|
raise RuntimeError("psycopg is required. Install requirements.txt before starting StreamHall.")
|
|
# Retry for up to 30 seconds so the app can start before Postgres is
|
|
# ready in the Docker Compose stack (postgres container may still be
|
|
# initialising when this container starts).
|
|
last_exc = None
|
|
for _ in range(30):
|
|
try:
|
|
self.conn = psycopg.connect(DATABASE_URL, row_factory=dict_row)
|
|
break
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
time.sleep(1)
|
|
else:
|
|
raise RuntimeError(f"Could not connect to Postgres: {last_exc}")
|
|
|
|
def __enter__(self):
|
|
self.conn.__enter__()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return self.conn.__exit__(exc_type, exc, tb)
|
|
|
|
def execute(self, sql: str, params: tuple | list = ()):
|
|
# Translate SQLite-style ? placeholders to psycopg %s before executing.
|
|
return PostgresCursor(self.conn.execute(sql.replace("?", "%s"), params))
|
|
|
|
|
|
def db():
|
|
return PostgresConnection()
|
|
|
|
|
|
def generate_public_id(conn) -> str:
|
|
while True:
|
|
value = secrets.token_urlsafe(PUBLIC_ID_BYTES)
|
|
row = conn.execute("SELECT 1 FROM streams WHERE public_id = ?", (value,)).fetchone()
|
|
if not row:
|
|
return value
|
|
|
|
|
|
def obs_route_slug(stream_key: str) -> str:
|
|
# Derive a public URL slug from a stream key using HMAC-SHA256.
|
|
# The slug is deterministic (same key -> same slug) but non-reversible,
|
|
# so the real stream key is never exposed in the public HLS URL.
|
|
digest = hmac.new(SECRET_KEY, stream_key.encode("utf-8"), hashlib.sha256).digest()
|
|
return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")[:OBS_ROUTE_SLUG_LENGTH]
|
|
|
|
|
|
def normalize_stream_label(value: object) -> str:
|
|
label = str(value or "LIVE").strip().upper()
|
|
return label if label in ("LIVE", "ARCHIVE") else "LIVE"
|
|
|
|
|
|
def init_db() -> None:
|
|
init_postgres_db()
|
|
|
|
|
|
def init_postgres_db() -> None:
|
|
with db() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS streams (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
public_id TEXT NOT NULL DEFAULT '',
|
|
stream_label TEXT NOT NULL DEFAULT 'LIVE',
|
|
event_name TEXT NOT NULL,
|
|
stream_password TEXT NOT NULL DEFAULT '',
|
|
links_json TEXT NOT NULL DEFAULT '[]',
|
|
is_hidden INTEGER NOT NULL DEFAULT 0,
|
|
is_enabled INTEGER NOT NULL DEFAULT 1,
|
|
tg_notify_enabled INTEGER NOT NULL DEFAULT 0,
|
|
admin_pinned INTEGER NOT NULL DEFAULT 0,
|
|
public_pinned INTEGER NOT NULL DEFAULT 0,
|
|
created_at INTEGER NOT NULL,
|
|
updated_at INTEGER NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_streams_public_id ON streams(public_id)")
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS site_settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS stream_probe_states (
|
|
stream_id INTEGER PRIMARY KEY,
|
|
is_live INTEGER NOT NULL DEFAULT 0,
|
|
updated_at INTEGER NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS obs_stream_routes (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
stream_key TEXT NOT NULL UNIQUE,
|
|
public_slug TEXT NOT NULL UNIQUE,
|
|
created_at INTEGER NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS api_keys (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
token_hash TEXT NOT NULL UNIQUE,
|
|
label TEXT NOT NULL DEFAULT '',
|
|
created_at INTEGER NOT NULL,
|
|
last_used_at INTEGER NOT NULL DEFAULT 0
|
|
)
|
|
"""
|
|
)
|
|
init_stats_tables(conn)
|
|
for key, value in DEFAULT_SITE_SETTINGS.items():
|
|
conn.execute(
|
|
"INSERT INTO site_settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO NOTHING",
|
|
(key, value),
|
|
)
|
|
for key, value in DEFAULT_TELEGRAM_SETTINGS.items():
|
|
conn.execute(
|
|
"INSERT INTO site_settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO NOTHING",
|
|
(key, value),
|
|
)
|
|
row = conn.execute("SELECT COUNT(*) AS count FROM streams").fetchone()
|
|
ensure_admin_password(conn)
|
|
|
|
|
|
def init_stats_tables(conn) -> None:
|
|
# ALTER TABLE ... ADD COLUMN IF NOT EXISTS is used for additive schema
|
|
# migrations: new columns are appended without dropping existing data,
|
|
# so upgrades on a live database are safe and idempotent.
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS viewer_sessions (
|
|
session_id TEXT PRIMARY KEY,
|
|
visitor_id TEXT NOT NULL,
|
|
stream_id INTEGER NOT NULL,
|
|
public_id TEXT NOT NULL,
|
|
ip_hash TEXT NOT NULL DEFAULT '',
|
|
user_agent TEXT NOT NULL DEFAULT '',
|
|
referer TEXT NOT NULL DEFAULT '',
|
|
device_type TEXT NOT NULL DEFAULT '',
|
|
started_at INTEGER NOT NULL,
|
|
last_seen_at INTEGER NOT NULL,
|
|
ended_at INTEGER NOT NULL DEFAULT 0,
|
|
is_active INTEGER NOT NULL DEFAULT 1,
|
|
play_state TEXT NOT NULL DEFAULT 'viewing'
|
|
)
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_viewer_sessions_stream ON viewer_sessions(stream_id)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_viewer_sessions_last_seen ON viewer_sessions(last_seen_at)")
|
|
conn.execute("ALTER TABLE viewer_sessions ADD COLUMN IF NOT EXISTS ip_address TEXT NOT NULL DEFAULT ''")
|
|
conn.execute("ALTER TABLE viewer_sessions ADD COLUMN IF NOT EXISTS os TEXT NOT NULL DEFAULT ''")
|
|
conn.execute("ALTER TABLE viewer_sessions ADD COLUMN IF NOT EXISTS browser TEXT NOT NULL DEFAULT ''")
|
|
conn.execute("ALTER TABLE streams ADD COLUMN IF NOT EXISTS sort_order INTEGER NOT NULL DEFAULT 0")
|
|
conn.execute("UPDATE streams SET sort_order = id WHERE sort_order = 0")
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS viewer_events (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
session_id TEXT NOT NULL,
|
|
stream_id INTEGER NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
event_at INTEGER NOT NULL,
|
|
metadata TEXT NOT NULL DEFAULT '{{}}'
|
|
)
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_viewer_events_stream_time ON viewer_events(stream_id, event_at)")
|
|
|
|
|
|
|
|
def reset_postgres_sequence(conn, table: str) -> None:
|
|
conn.execute(
|
|
f"SELECT setval(pg_get_serial_sequence('{table}', 'id'), COALESCE((SELECT MAX(id) FROM {table}), 1), (SELECT COUNT(*) FROM {table}) > 0)"
|
|
)
|
|
|
|
|
|
def b64encode_bytes(value: bytes) -> str:
|
|
return base64.urlsafe_b64encode(value).decode("ascii").rstrip("=")
|
|
|
|
|
|
def b64decode_bytes(value: str) -> bytes:
|
|
padded = value + "=" * (-len(value) % 4)
|
|
return base64.urlsafe_b64decode(padded.encode("ascii"))
|
|
|
|
|
|
def hash_admin_password(password: str) -> str:
|
|
salt = secrets.token_bytes(16)
|
|
digest = hashlib.pbkdf2_hmac(
|
|
"sha256",
|
|
password.encode("utf-8"),
|
|
salt,
|
|
PASSWORD_HASH_ITERATIONS,
|
|
)
|
|
return f"pbkdf2_sha256${PASSWORD_HASH_ITERATIONS}${b64encode_bytes(salt)}${b64encode_bytes(digest)}"
|
|
|
|
|
|
def verify_admin_password_hash(password: str, stored_hash: str) -> bool:
|
|
try:
|
|
algorithm, iterations_text, salt_text, digest_text = stored_hash.split("$", 3)
|
|
if algorithm != "pbkdf2_sha256":
|
|
return False
|
|
iterations = int(iterations_text)
|
|
salt = b64decode_bytes(salt_text)
|
|
expected = b64decode_bytes(digest_text)
|
|
actual = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, iterations)
|
|
except Exception:
|
|
return False
|
|
return hmac.compare_digest(actual, expected)
|
|
|
|
|
|
def get_setting(conn, key: str) -> str:
|
|
row = conn.execute("SELECT value FROM site_settings WHERE key = ?", (key,)).fetchone()
|
|
return str(row["value"]) if row else ""
|
|
|
|
|
|
def set_setting(conn, key: str, value: str) -> None:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO site_settings (key, value)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET value = excluded.value
|
|
""",
|
|
(key, value),
|
|
)
|
|
|
|
|
|
def generate_initial_admin_password() -> str:
|
|
return secrets.token_urlsafe(INITIAL_ADMIN_PASSWORD_BYTES)
|
|
|
|
|
|
def ensure_admin_password(conn) -> None:
|
|
if get_setting(conn, "admin_password_hash"):
|
|
return
|
|
password = generate_initial_admin_password()
|
|
set_setting(conn, "admin_password_hash", hash_admin_password(password))
|
|
set_setting(conn, "admin_password_changed_at", str(now()))
|
|
print("=" * 72, flush=True)
|
|
print("StreamHall initial admin password:", password, flush=True)
|
|
print("Save this password now. It is shown only once.", flush=True)
|
|
print("=" * 72, flush=True)
|
|
|
|
|
|
def sign(value: str) -> str:
|
|
sig = hmac.new(SECRET_KEY, value.encode("utf-8"), hashlib.sha256).digest()
|
|
return base64.urlsafe_b64encode(sig).decode("ascii").rstrip("=")
|
|
|
|
|
|
def encode_proxy_target(url: str) -> str:
|
|
return base64.urlsafe_b64encode(url.encode("utf-8")).decode("ascii").rstrip("=")
|
|
|
|
|
|
def decode_proxy_target(value: str) -> str:
|
|
padded = value + "=" * (-len(value) % 4)
|
|
return base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
|
|
|
|
|
|
def hls_proxy_host_token(host: str) -> str:
|
|
return sign(f"hls-proxy-host:{host.lower()}")
|
|
|
|
|
|
def hls_proxy_path(url: str) -> str:
|
|
host = urlparse(url).netloc
|
|
encoded = encode_proxy_target(url)
|
|
return f"{HLS_PROXY_PREFIX}/{hls_proxy_host_token(host)}/{encoded}"
|
|
|
|
|
|
def is_hls_link(link: dict[str, object]) -> bool:
|
|
raw_url = str(link.get("url") or "")
|
|
link_type = str(link.get("type") or "").strip().lower()
|
|
path = urlparse(raw_url).path.lower()
|
|
return link_type == "m3u8" or path.endswith(".m3u8")
|
|
|
|
|
|
def add_playback_urls(links: list[dict[str, object]]) -> list[dict[str, object]]:
|
|
prepared: list[dict[str, object]] = []
|
|
for link in links:
|
|
item = dict(link)
|
|
raw_url = str(item.get("url") or "")
|
|
parsed = urlparse(raw_url)
|
|
if is_hls_link(item) and parsed.scheme in ("http", "https"):
|
|
item["playback_url"] = hls_proxy_path(raw_url)
|
|
prepared.append(item)
|
|
return prepared
|
|
|
|
|
|
def make_session() -> str:
|
|
value = f"admin:{now()}"
|
|
token = base64.urlsafe_b64encode(value.encode("utf-8")).decode("ascii").rstrip("=")
|
|
return f"{token}.{sign(value)}"
|
|
|
|
|
|
def verify_session(token: str | None) -> bool:
|
|
if not token or "." not in token:
|
|
return False
|
|
payload, given_sig = token.rsplit(".", 1)
|
|
try:
|
|
padded = payload + "=" * (-len(payload) % 4)
|
|
value = base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
|
|
role, issued = value.split(":", 1)
|
|
issued_at = int(issued)
|
|
if role != "admin" or now() - issued_at > SESSION_MAX_AGE:
|
|
return False
|
|
# Reject sessions issued before the last password change so that
|
|
# rotating the password immediately invalidates all existing sessions.
|
|
if issued_at < admin_session_not_before():
|
|
return False
|
|
except Exception:
|
|
return False
|
|
return hmac.compare_digest(sign(value), given_sig)
|
|
|
|
|
|
def admin_session_not_before() -> int:
|
|
try:
|
|
with db() as conn:
|
|
return int(get_setting(conn, "admin_password_changed_at") or 0)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def normalize_links(raw: object) -> list[dict[str, str]]:
|
|
links = raw if isinstance(raw, list) else []
|
|
normalized = []
|
|
for item in links:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = str(item.get("name", "")).strip()
|
|
url = str(item.get("url", "")).strip()
|
|
if not name or not url:
|
|
continue
|
|
link_type = str(item.get("type", "")).strip().lower()
|
|
if link_type not in ("", "m3u8", "flv", "dash"):
|
|
link_type = ""
|
|
normalized.append(
|
|
{
|
|
"name": name,
|
|
"type": link_type,
|
|
"url": url,
|
|
"key": str(item.get("key", "")).strip(),
|
|
"clearkey": str(item.get("clearkey", "")).strip(),
|
|
}
|
|
)
|
|
return normalized
|
|
|
|
|
|
def normalize_site_settings(raw: dict[str, object]) -> dict[str, str]:
|
|
site_title = str(raw.get("siteTitle", raw.get("site_title", ""))).strip()
|
|
site_description = str(raw.get("siteDescription", raw.get("site_description", ""))).strip()
|
|
site_description_en = str(raw.get("siteDescriptionEn", raw.get("site_description_en", ""))).strip()
|
|
site_icon_url = str(raw.get("siteIconUrl", raw.get("site_icon_url", ""))).strip()
|
|
footer_markdown = str(raw.get("footerMarkdown", raw.get("footer_markdown", ""))).strip()
|
|
footer_markdown_en = str(raw.get("footerMarkdownEn", raw.get("footer_markdown_en", ""))).strip()
|
|
|
|
def _parse_nav_links(raw_val: object) -> list[dict[str, str]]:
|
|
if isinstance(raw_val, str):
|
|
try:
|
|
raw_val = json.loads(raw_val)
|
|
except json.JSONDecodeError:
|
|
raw_val = []
|
|
result: list[dict[str, str]] = []
|
|
if isinstance(raw_val, list):
|
|
for item in raw_val[:12]:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
label = str(item.get("label", "")).strip()[:24]
|
|
url = str(item.get("url", "")).strip()[:300]
|
|
if not label or not url:
|
|
continue
|
|
result.append({"label": label, "url": url})
|
|
return result
|
|
|
|
nav_links = _parse_nav_links(raw.get("navLinks", raw.get("site_nav_links", [])))
|
|
nav_links_en = _parse_nav_links(raw.get("navLinksEn", raw.get("site_nav_links_en", [])))
|
|
|
|
if not site_title:
|
|
raise AppError("site_title_empty")
|
|
if site_icon_url and not (
|
|
site_icon_url.startswith("/")
|
|
or site_icon_url.startswith("data:image/")
|
|
or urlparse(site_icon_url).scheme in ("http", "https")
|
|
):
|
|
raise AppError("site_icon_invalid")
|
|
public_base_url = str(raw.get("publicBaseUrl", raw.get("telegram_public_base_url", ""))).strip().rstrip("/")
|
|
return {
|
|
"site_title": site_title[:80],
|
|
"site_description": site_description[:300],
|
|
"site_description_en": site_description_en[:300],
|
|
"site_icon_url": site_icon_url[:500],
|
|
"site_nav_links": json.dumps(nav_links, ensure_ascii=False),
|
|
"site_nav_links_en": json.dumps(nav_links_en, ensure_ascii=False),
|
|
"footer_markdown": footer_markdown[:5000],
|
|
"footer_markdown_en": footer_markdown_en[:5000],
|
|
"telegram_public_base_url": public_base_url[:300],
|
|
}
|
|
|
|
|
|
def bool_setting(value: object) -> str:
|
|
if isinstance(value, bool):
|
|
return "1" if value else "0"
|
|
text = str(value or "").strip().lower()
|
|
return "1" if text in ("1", "true", "yes", "on") else "0"
|
|
|
|
|
|
def _tpl(raw: dict[str, object], camel_key: str, db_key: str) -> str:
|
|
return str(raw.get(camel_key, raw.get(db_key, DEFAULT_TELEGRAM_SETTINGS.get(db_key, "")))).strip()
|
|
|
|
|
|
def normalize_telegram_settings(raw: dict[str, object]) -> dict[str, str]:
|
|
return {
|
|
"telegram_bot_token": str(raw.get("botToken", raw.get("telegram_bot_token", ""))).strip()[:200],
|
|
"telegram_chat_id": str(raw.get("chatId", raw.get("telegram_chat_id", ""))).strip()[:120],
|
|
"telegram_public_base_url": str(
|
|
raw.get("publicBaseUrl", raw.get("telegram_public_base_url", ""))
|
|
).strip().rstrip("/")[:300],
|
|
"telegram_live_notify_start": bool_setting(raw.get("liveNotifyStart", raw.get("telegram_live_notify_start", "0"))),
|
|
"telegram_live_notify_stop": bool_setting(raw.get("liveNotifyStop", raw.get("telegram_live_notify_stop", "0"))),
|
|
"telegram_live_start_template": (_tpl(raw, "liveStartTemplate", "telegram_live_start_template") or DEFAULT_TELEGRAM_SETTINGS["telegram_live_start_template"])[:1000],
|
|
"telegram_live_stop_template": (_tpl(raw, "liveStopTemplate", "telegram_live_stop_template") or DEFAULT_TELEGRAM_SETTINGS["telegram_live_stop_template"])[:1000],
|
|
"telegram_archive_notify_start": bool_setting(raw.get("archiveNotifyStart", raw.get("telegram_archive_notify_start", "0"))),
|
|
"telegram_archive_notify_stop": bool_setting(raw.get("archiveNotifyStop", raw.get("telegram_archive_notify_stop", "0"))),
|
|
"telegram_archive_start_template": (_tpl(raw, "archiveStartTemplate", "telegram_archive_start_template") or DEFAULT_TELEGRAM_SETTINGS["telegram_archive_start_template"])[:1000],
|
|
"telegram_archive_stop_template": (_tpl(raw, "archiveStopTemplate", "telegram_archive_stop_template") or DEFAULT_TELEGRAM_SETTINGS["telegram_archive_stop_template"])[:1000],
|
|
}
|
|
|
|
|
|
def stream_probe_response(valid: bool, status_code: int | None = None) -> dict[str, object]:
|
|
return {
|
|
"valid": valid,
|
|
"code": "detected" if valid else "no_info",
|
|
"status_code": status_code,
|
|
}
|
|
|
|
|
|
def decode_probe_text(data: bytes) -> str:
|
|
for encoding in ("utf-8-sig", "utf-8", "latin-1"):
|
|
try:
|
|
return data.decode(encoding)
|
|
except UnicodeDecodeError:
|
|
continue
|
|
return ""
|
|
|
|
|
|
def probe_stream_url(raw_url: object, type_hint: object = "") -> dict[str, object]:
|
|
url = str(raw_url or "").strip()
|
|
if not url:
|
|
return stream_probe_response(False)
|
|
|
|
parsed = urlparse(url)
|
|
if parsed.scheme not in ("http", "https") or not parsed.netloc:
|
|
return stream_probe_response(False)
|
|
|
|
hint = str(type_hint or "").strip().lower()
|
|
path = parsed.path.lower()
|
|
is_hls = hint == "m3u8" or path.endswith(".m3u8")
|
|
is_dash = hint == "dash" or path.endswith(".mpd")
|
|
is_flv = hint == "flv" or path.endswith(".flv")
|
|
headers = {
|
|
"User-Agent": "StreamHall/1.0",
|
|
"Accept": "*/*",
|
|
}
|
|
if not is_hls and not is_dash:
|
|
headers["Range"] = "bytes=0-4095"
|
|
|
|
try:
|
|
with urlopen(Request(url, headers=headers), timeout=STREAM_PROBE_TIMEOUT) as resp:
|
|
status_code = int(getattr(resp, "status", resp.getcode()) or 0)
|
|
content_type = resp.headers.get("Content-Type", "").lower()
|
|
read_limit = 65536 if is_hls or is_dash or "mpegurl" in content_type or "xml" in content_type else 4096
|
|
data = resp.read(read_limit)
|
|
except HTTPError as exc:
|
|
return stream_probe_response(False, exc.code)
|
|
except (TimeoutError, URLError, OSError):
|
|
return stream_probe_response(False)
|
|
|
|
if status_code >= 400:
|
|
return stream_probe_response(False, status_code)
|
|
|
|
if is_hls or "mpegurl" in content_type or "m3u8" in content_type:
|
|
text = decode_probe_text(data)
|
|
has_playlist = "#EXTM3U" in text
|
|
has_live_media = any(
|
|
marker in text
|
|
for marker in ("#EXTINF", "#EXT-X-STREAM-INF", "#EXT-X-MEDIA-SEQUENCE", "#EXT-X-PART")
|
|
)
|
|
return stream_probe_response(has_playlist and has_live_media, status_code)
|
|
|
|
if is_dash or "dash+xml" in content_type:
|
|
return stream_probe_response(b"<MPD" in data[:2048], status_code)
|
|
|
|
if is_flv or "flv" in content_type:
|
|
return stream_probe_response(data.startswith(b"FLV") or len(data) > 0, status_code)
|
|
|
|
has_video_type = content_type.startswith("video/") or content_type.startswith("audio/")
|
|
is_binary_stream = "application/octet-stream" in content_type
|
|
return stream_probe_response(len(data) > 0 and (has_video_type or is_binary_stream), status_code)
|
|
|
|
|
|
def site_settings() -> dict[str, str]:
|
|
settings = dict(DEFAULT_SITE_SETTINGS)
|
|
with db() as conn:
|
|
rows = conn.execute("SELECT key, value FROM site_settings").fetchall()
|
|
for row in rows:
|
|
if row["key"] in settings:
|
|
settings[row["key"]] = row["value"]
|
|
return settings
|
|
|
|
|
|
def telegram_settings() -> dict[str, str]:
|
|
settings = dict(DEFAULT_TELEGRAM_SETTINGS)
|
|
with db() as conn:
|
|
rows = conn.execute("SELECT key, value FROM site_settings").fetchall()
|
|
for row in rows:
|
|
if row["key"] in settings:
|
|
settings[row["key"]] = row["value"]
|
|
return settings
|
|
|
|
|
|
class SafeTemplateValues(dict):
|
|
# Return the original {key} literal for any unknown placeholder instead of
|
|
# raising KeyError, so templates with unrecognised variables render safely.
|
|
def __missing__(self, key: str) -> str:
|
|
return "{" + key + "}"
|
|
|
|
|
|
def render_message_template(template: str, values: dict[str, object]) -> str:
|
|
try:
|
|
return template.format_map(SafeTemplateValues({key: str(value) for key, value in values.items()}))
|
|
except ValueError:
|
|
return template
|
|
|
|
|
|
def send_telegram_message(settings: dict[str, str], text: str) -> None:
|
|
token = settings.get("telegram_bot_token", "").strip()
|
|
chat_id = settings.get("telegram_chat_id", "").strip()
|
|
if not token or not chat_id:
|
|
raise AppError("tg_config_missing")
|
|
payload = urlencode(
|
|
{
|
|
"chat_id": chat_id,
|
|
"text": text,
|
|
"parse_mode": "HTML",
|
|
"disable_web_page_preview": "false",
|
|
}
|
|
).encode("utf-8")
|
|
req = Request(
|
|
f"https://api.telegram.org/bot{token}/sendMessage",
|
|
data=payload,
|
|
headers={
|
|
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8",
|
|
"User-Agent": "StreamHall/1.0",
|
|
},
|
|
)
|
|
try:
|
|
with urlopen(req, timeout=TELEGRAM_TIMEOUT) as resp:
|
|
raw = resp.read(65536)
|
|
except HTTPError as exc:
|
|
raw = exc.read(65536)
|
|
try:
|
|
data = json.loads(raw.decode("utf-8"))
|
|
description = str(data.get("description") or f"TG API HTTP {exc.code}")
|
|
except (UnicodeDecodeError, json.JSONDecodeError):
|
|
description = f"TG API HTTP {exc.code}"
|
|
raise AppError("tg_api_error", detail=description) from exc
|
|
try:
|
|
data = json.loads(raw.decode("utf-8"))
|
|
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
|
|
raise AppError("tg_api_invalid") from exc
|
|
if not data.get("ok"):
|
|
description = str(data.get("description") or "")
|
|
raise AppError("tg_api_error", detail=description)
|
|
|
|
|
|
def find_stream(conn, stream_ref: object) -> dict[str, object] | None:
|
|
ref = str(stream_ref or "").strip()
|
|
if not ref:
|
|
return None
|
|
return conn.execute("SELECT * FROM streams WHERE public_id = ?", (ref,)).fetchone()
|
|
|
|
|
|
def verify_admin_password(password: str) -> bool:
|
|
with db() as conn:
|
|
stored_hash = get_setting(conn, "admin_password_hash")
|
|
return bool(stored_hash) and verify_admin_password_hash(password, stored_hash)
|
|
|
|
|
|
def client_ip_hash(headers: object) -> str:
|
|
# Hash the client IP with HMAC-SHA256 so unique visitors can be counted
|
|
# without storing the raw IP address in the database.
|
|
raw = headers.get("CF-Connecting-IP") or headers.get("X-Forwarded-For") or headers.get("X-Real-IP") or ""
|
|
ip = str(raw).split(",", 1)[0].strip()
|
|
if not ip:
|
|
return ""
|
|
return hmac.new(SECRET_KEY, ip.encode("utf-8"), hashlib.sha256).hexdigest()
|
|
|
|
|
|
def detect_device_type(user_agent: str) -> str:
|
|
ua = user_agent.lower()
|
|
if any(item in ua for item in ("mobile", "iphone", "android")):
|
|
return "mobile"
|
|
if any(item in ua for item in ("ipad", "tablet")):
|
|
return "tablet"
|
|
return "desktop"
|
|
|
|
|
|
def client_ip(headers: object) -> str:
|
|
raw = headers.get("CF-Connecting-IP") or headers.get("X-Forwarded-For") or headers.get("X-Real-IP") or ""
|
|
return str(raw).split(",", 1)[0].strip()
|
|
|
|
|
|
def detect_os(user_agent: str) -> str:
|
|
ua = user_agent.lower()
|
|
if "windows" in ua:
|
|
return "Windows"
|
|
if "iphone" in ua or "ipad" in ua:
|
|
return "iOS"
|
|
if "android" in ua:
|
|
return "Android"
|
|
if "mac os" in ua or "macintosh" in ua:
|
|
return "macOS"
|
|
if "linux" in ua:
|
|
return "Linux"
|
|
return "Other"
|
|
|
|
|
|
def detect_browser(user_agent: str) -> str:
|
|
ua = user_agent.lower()
|
|
if "edg/" in ua or "edge/" in ua:
|
|
return "Edge"
|
|
if "firefox/" in ua or "fxios/" in ua:
|
|
return "Firefox"
|
|
if "opr/" in ua or "opera/" in ua:
|
|
return "Opera"
|
|
if "chrome/" in ua or "crios/" in ua or "chromium/" in ua:
|
|
return "Chrome"
|
|
if "safari/" in ua:
|
|
return "Safari"
|
|
return "Other"
|
|
|
|
|
|
_geo_cache: dict[str, dict] = {}
|
|
_geo_cache_time: dict[str, float] = {}
|
|
_GEO_TTL = 21600 # 6-hour in-process cache to stay within ip-api.com free-tier rate limits
|
|
|
|
|
|
def batch_geoip(ips: list[str]) -> dict[str, dict]:
|
|
# Resolve IPs to country/region/city via ip-api.com batch endpoint (max 100 per call).
|
|
# Results are cached in memory for _GEO_TTL seconds. If the lookup fails, the
|
|
# affected IPs silently get an empty geo dict so stats still render.
|
|
result: dict[str, dict] = {}
|
|
to_fetch: list[str] = []
|
|
t = time.time()
|
|
for ip in ips:
|
|
if not ip:
|
|
continue
|
|
if ip in _geo_cache and t - _geo_cache_time.get(ip, 0) < _GEO_TTL:
|
|
result[ip] = _geo_cache[ip]
|
|
else:
|
|
to_fetch.append(ip)
|
|
if to_fetch:
|
|
try:
|
|
payload = json.dumps([
|
|
{"query": ip, "fields": "status,countryCode,country,regionName,city"}
|
|
for ip in to_fetch[:100]
|
|
]).encode("utf-8")
|
|
req = Request(
|
|
"http://ip-api.com/batch",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urlopen(req, timeout=4) as resp:
|
|
rows = json.loads(resp.read())
|
|
t2 = time.time()
|
|
for ip, row in zip(to_fetch, rows):
|
|
geo: dict[str, str] = {}
|
|
if isinstance(row, dict) and row.get("status") == "success":
|
|
geo = {
|
|
"countryCode": str(row.get("countryCode", "") or ""),
|
|
"country": str(row.get("country", "") or ""),
|
|
"region": str(row.get("regionName", "") or ""),
|
|
"city": str(row.get("city", "") or ""),
|
|
}
|
|
_geo_cache[ip] = geo
|
|
_geo_cache_time[ip] = t2
|
|
result[ip] = geo
|
|
except Exception:
|
|
pass
|
|
for ip in ips:
|
|
if ip and ip not in result:
|
|
result[ip] = {}
|
|
return result
|
|
|
|
|
|
def stream_public(row: dict[str, object]) -> dict[str, object]:
|
|
return {
|
|
"id": row["public_id"],
|
|
"stream_label": normalize_stream_label(row["stream_label"]),
|
|
"event_name": row["event_name"],
|
|
"has_password": 1 if row["stream_password"] else 0,
|
|
}
|
|
|
|
|
|
def stream_admin(row: dict[str, object]) -> dict[str, object]:
|
|
return {
|
|
"id": row["id"],
|
|
"public_id": row["public_id"],
|
|
"stream_label": normalize_stream_label(row["stream_label"]),
|
|
"event_name": row["event_name"],
|
|
"stream_password": row["stream_password"],
|
|
"links_json": row["links_json"],
|
|
"is_hidden": row["is_hidden"],
|
|
"is_enabled": row["is_enabled"],
|
|
"tg_notify_enabled": row["tg_notify_enabled"],
|
|
"sort_order": row["sort_order"],
|
|
"created_at": row["created_at"],
|
|
"updated_at": row["updated_at"],
|
|
}
|
|
|
|
|
|
def obs_route_public(row: dict[str, object]) -> dict[str, object]:
|
|
return {
|
|
"id": row["id"],
|
|
"stream_key": row["stream_key"],
|
|
"public_slug": row["public_slug"],
|
|
"created_at": row["created_at"],
|
|
}
|
|
|
|
|
|
def player_data(row: dict[str, object], settings: dict[str, str] | None = None) -> dict[str, object]:
|
|
try:
|
|
links = json.loads(row["links_json"] or "[]")
|
|
except json.JSONDecodeError:
|
|
links = []
|
|
site = settings or site_settings()
|
|
return {
|
|
"eventName": row["event_name"],
|
|
"siteTitle": site["site_title"],
|
|
"siteIconUrl": site.get("site_icon_url", ""),
|
|
"links": add_playback_urls(normalize_links(links)),
|
|
}
|
|
|
|
|
|
def probe_stream_links(row: dict[str, object]) -> dict[str, object]:
|
|
try:
|
|
links = normalize_links(json.loads(row["links_json"] or "[]"))
|
|
except json.JSONDecodeError:
|
|
links = []
|
|
for index, link in enumerate(links):
|
|
result = probe_stream_url(link["url"], link.get("type", ""))
|
|
if result["valid"]:
|
|
return {
|
|
**result,
|
|
"index": index,
|
|
"url": link["url"],
|
|
"link_name": link["name"],
|
|
}
|
|
return stream_probe_response(False)
|
|
|
|
|
|
def player_url_for_row(
|
|
row: dict[str, object],
|
|
headers: object | None = None,
|
|
tg_settings: dict[str, str] | None = None,
|
|
) -> str:
|
|
path = f"/player.html?id={quote(str(row['public_id']))}"
|
|
if headers is not None:
|
|
host = headers.get("X-Forwarded-Host") or headers.get("Host") or ""
|
|
proto = headers.get("X-Forwarded-Proto") or "http"
|
|
if host:
|
|
return f"{proto}://{host}{path}"
|
|
settings = tg_settings or telegram_settings()
|
|
base_url = settings.get("telegram_public_base_url", "").strip().rstrip("/")
|
|
return f"{base_url}{path}" if base_url else path
|
|
|
|
|
|
def notification_context_for_row(
|
|
row: dict[str, object],
|
|
probe_result: dict[str, object],
|
|
status: str,
|
|
headers: object | None = None,
|
|
tg_settings: dict[str, str] | None = None,
|
|
) -> dict[str, object]:
|
|
site = site_settings()
|
|
settings = tg_settings or telegram_settings()
|
|
return {
|
|
"title": row["event_name"],
|
|
"site_title": site["site_title"],
|
|
"url": player_url_for_row(row, headers, settings),
|
|
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now())),
|
|
"status": status,
|
|
"stream_id": row["id"],
|
|
"public_id": row["public_id"],
|
|
"link_name": probe_result.get("link_name", ""),
|
|
"source_url": probe_result.get("url", ""),
|
|
}
|
|
|
|
|
|
def maybe_notify_stream_transition(
|
|
row: dict[str, object],
|
|
is_live: bool,
|
|
probe_result: dict[str, object],
|
|
headers: object | None = None,
|
|
) -> None:
|
|
stream_id = int(row["id"])
|
|
with db() as conn:
|
|
previous = conn.execute(
|
|
"SELECT is_live FROM stream_probe_states WHERE stream_id = ?", (stream_id,)
|
|
).fetchone()
|
|
previous_live = None if previous is None else bool(previous["is_live"])
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO stream_probe_states (stream_id, is_live, updated_at)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(stream_id) DO UPDATE SET
|
|
is_live = excluded.is_live,
|
|
updated_at = excluded.updated_at
|
|
""",
|
|
(stream_id, 1 if is_live else 0, now()),
|
|
)
|
|
if previous_live == is_live:
|
|
return
|
|
if previous_live is None and not is_live:
|
|
return
|
|
if int(row["tg_notify_enabled"] or 0) != 1:
|
|
return
|
|
|
|
settings = telegram_settings()
|
|
label = normalize_stream_label(row.get("stream_label", "LIVE")).lower()
|
|
if is_live:
|
|
notify_key = f"telegram_{label}_notify_start"
|
|
template_key = f"telegram_{label}_start_template"
|
|
status = "start"
|
|
else:
|
|
notify_key = f"telegram_{label}_notify_stop"
|
|
template_key = f"telegram_{label}_stop_template"
|
|
status = "stop"
|
|
if settings.get(notify_key, "0") != "1":
|
|
return
|
|
template = settings.get(template_key, "")
|
|
|
|
text = render_message_template(
|
|
template,
|
|
notification_context_for_row(row, probe_result, status, headers, settings),
|
|
)
|
|
try:
|
|
send_telegram_message(settings, text)
|
|
except Exception as exc:
|
|
print(f"Telegram notification failed for stream {stream_id}: {exc}")
|
|
|
|
|
|
def notify_current_live_if_needed(stream_id: int, headers: object | None = None) -> None:
|
|
try:
|
|
with db() as conn:
|
|
row = conn.execute("SELECT * FROM streams WHERE id = ?", (stream_id,)).fetchone()
|
|
if not row:
|
|
return
|
|
if int(row["is_enabled"] or 0) != 1 or int(row["tg_notify_enabled"] or 0) != 1:
|
|
return
|
|
result = probe_stream_links(row)
|
|
if not result.get("valid"):
|
|
return
|
|
settings = telegram_settings()
|
|
label = normalize_stream_label(row.get("stream_label", "LIVE")).lower()
|
|
if settings.get(f"telegram_{label}_notify_start", "0") != "1":
|
|
return
|
|
text = render_message_template(
|
|
settings.get(f"telegram_{label}_start_template", ""),
|
|
notification_context_for_row(row, result, "start", headers, settings),
|
|
)
|
|
send_telegram_message(settings, text)
|
|
except Exception as exc:
|
|
print(f"Telegram current live notification failed for stream {stream_id}: {exc}")
|
|
|
|
|
|
def check_stream_row_live(
|
|
row: dict[str, object],
|
|
headers: object | None = None,
|
|
notify: bool = True,
|
|
) -> dict[str, object]:
|
|
if int(row["is_enabled"] or 0) != 1:
|
|
result = stream_probe_response(False)
|
|
result["code"] = "closed"
|
|
if notify:
|
|
maybe_notify_stream_transition(row, False, result, headers)
|
|
return result
|
|
result = probe_stream_links(row)
|
|
if notify:
|
|
maybe_notify_stream_transition(row, bool(result.get("valid")), result, headers)
|
|
return result
|
|
|
|
|
|
def rewrite_hls_manifest(manifest: str, slug: str, stream_key: str) -> str:
|
|
# Rewrite an SRS-generated HLS manifest so all segment URLs point to the
|
|
# public /h/<slug>/ proxy path. This hides the real stream key from clients
|
|
# while the server can still reverse-map slug -> key when proxying segments.
|
|
rewritten: list[str] = []
|
|
base = f"/h/{quote(slug, safe='')}/"
|
|
for line in manifest.splitlines():
|
|
text = line.strip()
|
|
if not text or text.startswith("#"):
|
|
rewritten.append(line)
|
|
continue
|
|
if text.startswith(("http://", "https://")):
|
|
rewritten.append(line)
|
|
continue
|
|
parsed = urlparse(text)
|
|
segment = parsed.path.lstrip("/")
|
|
if segment.startswith("live/"):
|
|
segment = segment.split("/", 1)[1]
|
|
if segment.startswith(stream_key):
|
|
segment = f"{slug}{segment[len(stream_key):]}"
|
|
query = f"?{parsed.query}" if parsed.query else ""
|
|
rewritten.append(f"{base}{quote(segment, safe=URL_PATH_SAFE)}{query}")
|
|
return "\n".join(rewritten) + ("\n" if manifest.endswith("\n") else "")
|
|
|
|
|
|
def rewrite_external_hls_manifest(manifest: str, base_url: str) -> str:
|
|
# Rewrite all URLs in an external HLS manifest (segment lines and URI="..."
|
|
# attributes such as EXT-X-KEY) to route through the signed /proxy/hls/
|
|
# endpoint, enabling cross-origin playback and key override in the player.
|
|
def proxied_uri(value: str) -> str:
|
|
if not value or value.startswith("data:"):
|
|
return value
|
|
return hls_proxy_path(urljoin(base_url, value))
|
|
|
|
rewritten: list[str] = []
|
|
for line in manifest.splitlines():
|
|
text = line.strip()
|
|
if not text:
|
|
rewritten.append(line)
|
|
continue
|
|
if text.startswith("#"):
|
|
rewritten.append(HLS_URI_RE.sub(lambda match: f'URI="{proxied_uri(match.group(1))}"', line))
|
|
continue
|
|
rewritten.append(proxied_uri(text))
|
|
return "\n".join(rewritten) + ("\n" if manifest.endswith("\n") else "")
|
|
|
|
|
|
def monitor_streams_loop() -> None:
|
|
while True:
|
|
try:
|
|
with db() as conn:
|
|
rows = conn.execute("SELECT * FROM streams ORDER BY id DESC").fetchall()
|
|
for row in rows:
|
|
check_stream_row_live(row)
|
|
except Exception as exc:
|
|
print(f"Stream monitor failed: {exc}")
|
|
time.sleep(STREAM_MONITOR_INTERVAL)
|
|
|
|
|
|
class StreamHallHandler(BaseHTTPRequestHandler):
|
|
server_version = "StreamHall/1.0"
|
|
|
|
def log_message(self, fmt: str, *args: object) -> None:
|
|
print(f"{self.address_string()} - {fmt % args}")
|
|
|
|
def do_GET(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
if parsed.path.rstrip("/") == "/api":
|
|
self.handle_api(parsed)
|
|
return
|
|
if parsed.path.startswith("/h/"):
|
|
self.proxy_obs_route(parsed.path, parsed.query, send_body=True)
|
|
return
|
|
if parsed.path.startswith(f"{HLS_PROXY_PREFIX}/"):
|
|
self.proxy_hls_route(parsed.path, send_body=True)
|
|
return
|
|
self.serve_static(parsed.path)
|
|
|
|
def do_HEAD(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
if parsed.path.rstrip("/") == "/api":
|
|
self.send_error(HTTPStatus.METHOD_NOT_ALLOWED)
|
|
return
|
|
if parsed.path.startswith("/h/"):
|
|
self.proxy_obs_route(parsed.path, parsed.query, send_body=False)
|
|
return
|
|
if parsed.path.startswith(f"{HLS_PROXY_PREFIX}/"):
|
|
self.proxy_hls_route(parsed.path, send_body=False)
|
|
return
|
|
self.serve_static(parsed.path, send_body=False)
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
if parsed.path.rstrip("/") == "/api":
|
|
self.handle_api(parsed)
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def is_admin(self) -> bool:
|
|
cookie = SimpleCookie(self.headers.get("Cookie"))
|
|
morsel = cookie.get(SESSION_COOKIE)
|
|
if verify_session(morsel.value if morsel else None):
|
|
return True
|
|
return self._verify_api_key()
|
|
|
|
def _verify_api_key(self) -> bool:
|
|
token = None
|
|
auth = self.headers.get("Authorization", "")
|
|
if auth.startswith("Bearer "):
|
|
token = auth[7:].strip()
|
|
if not token:
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
token = qs.get("api_key", [""])[0]
|
|
if not token:
|
|
return False
|
|
token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest()
|
|
try:
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT id FROM api_keys WHERE token_hash = ?", (token_hash,)
|
|
).fetchone()
|
|
if row:
|
|
conn.execute(
|
|
"UPDATE api_keys SET last_used_at = ? WHERE id = ?",
|
|
(now(), row["id"]),
|
|
)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
def read_json(self) -> dict[str, object]:
|
|
length = int(self.headers.get("Content-Length", "0") or "0")
|
|
if not length:
|
|
return {}
|
|
raw = self.rfile.read(length).decode("utf-8")
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
def send_json(
|
|
self,
|
|
payload: dict[str, object],
|
|
status: HTTPStatus = HTTPStatus.OK,
|
|
extra_headers: dict[str, str] | None = None,
|
|
) -> None:
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.send_header("X-Content-Type-Options", "nosniff")
|
|
for key, value in (extra_headers or {}).items():
|
|
self.send_header(key, value)
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def error_json(self, code: str, status: HTTPStatus = HTTPStatus.BAD_REQUEST, *, detail: str = "") -> None:
|
|
payload: dict[str, object] = {"status": "error", "code": code}
|
|
if detail:
|
|
payload["detail"] = detail
|
|
self.send_json(payload, status)
|
|
|
|
def handle_api(self, parsed) -> None:
|
|
action = parse_qs(parsed.query).get("action", [""])[0]
|
|
try:
|
|
if action == "site_settings":
|
|
self.api_site_settings()
|
|
elif action == "public_list":
|
|
self.api_public_list()
|
|
elif action == "get_player_data":
|
|
self.api_get_player_data(parse_qs(parsed.query))
|
|
elif action == "verify_password":
|
|
self.api_verify_password()
|
|
elif action == "check_player_stream":
|
|
self.api_check_player_stream()
|
|
elif action == "viewer_start":
|
|
self.api_viewer_start()
|
|
elif action == "viewer_heartbeat":
|
|
self.api_viewer_heartbeat()
|
|
elif action == "viewer_end":
|
|
self.api_viewer_end()
|
|
elif action == "login":
|
|
self.api_login()
|
|
elif action == "logout":
|
|
self.api_logout()
|
|
elif action == "session":
|
|
self.send_json({"status": "success", "logged_in": self.is_admin()})
|
|
elif action == "list_admin":
|
|
self.require_admin()
|
|
self.api_list_admin()
|
|
elif action == "stream_stats_summary":
|
|
self.require_admin()
|
|
self.api_stream_stats_summary()
|
|
elif action == "add":
|
|
self.require_admin()
|
|
self.api_add()
|
|
elif action == "update":
|
|
self.require_admin()
|
|
self.api_update()
|
|
elif action == "delete":
|
|
self.require_admin()
|
|
self.api_delete()
|
|
elif action == "set_stream_enabled":
|
|
self.require_admin()
|
|
self.api_set_stream_enabled()
|
|
elif action == "set_stream_tg_notify":
|
|
self.require_admin()
|
|
self.api_set_stream_tg_notify()
|
|
elif action == "reorder_streams":
|
|
self.require_admin()
|
|
self.api_reorder_streams()
|
|
elif action == "update_site_settings":
|
|
self.require_admin()
|
|
self.api_update_site_settings()
|
|
elif action == "update_admin_password":
|
|
self.require_admin()
|
|
self.api_update_admin_password()
|
|
elif action == "telegram_settings":
|
|
self.require_admin()
|
|
self.api_telegram_settings()
|
|
elif action == "update_telegram_settings":
|
|
self.require_admin()
|
|
self.api_update_telegram_settings()
|
|
elif action == "test_telegram":
|
|
self.require_admin()
|
|
self.api_test_telegram()
|
|
elif action == "check_stream_url":
|
|
self.require_admin()
|
|
self.api_check_stream_url()
|
|
elif action == "check_stream":
|
|
self.require_admin()
|
|
self.api_check_stream()
|
|
elif action == "list_obs_routes":
|
|
self.require_admin()
|
|
self.api_list_obs_routes()
|
|
elif action == "add_obs_route":
|
|
self.require_admin()
|
|
self.api_add_obs_route()
|
|
elif action == "delete_obs_route":
|
|
self.require_admin()
|
|
self.api_delete_obs_route()
|
|
elif action == "stats_overview":
|
|
self.require_admin()
|
|
self.api_stats_overview()
|
|
elif action == "stats_streams":
|
|
self.require_admin()
|
|
self.api_stats_streams()
|
|
elif action == "stats_timeseries":
|
|
self.require_admin()
|
|
self.api_stats_timeseries()
|
|
elif action == "stats_export_csv":
|
|
self.require_admin()
|
|
self.api_stats_export_csv()
|
|
elif action == "stats_stream_detail":
|
|
self.require_admin()
|
|
self.api_stats_stream_detail()
|
|
elif action == "stats_dashboard_realtime":
|
|
self.require_admin()
|
|
self.api_stats_dashboard_realtime()
|
|
elif action == "stats_stream_realtime":
|
|
self.require_admin()
|
|
self.api_stats_stream_realtime()
|
|
elif action == "stats_geo":
|
|
self.require_admin()
|
|
self.api_stats_geo()
|
|
elif action == "stats_sessions_page":
|
|
self.require_admin()
|
|
self.api_stats_sessions_page()
|
|
elif action == "list_api_keys":
|
|
self.require_admin()
|
|
self.api_list_api_keys()
|
|
elif action == "create_api_key":
|
|
self.require_admin()
|
|
self.api_create_api_key()
|
|
elif action == "delete_api_key":
|
|
self.require_admin()
|
|
self.api_delete_api_key()
|
|
else:
|
|
self.error_json("invalid_action")
|
|
except PermissionError:
|
|
self.error_json("auth_required", HTTPStatus.UNAUTHORIZED)
|
|
except AppError as exc:
|
|
self.error_json(exc.code, detail=exc.detail)
|
|
except ValueError as exc:
|
|
self.error_json(str(exc))
|
|
except Exception as exc:
|
|
self.error_json("server_error", HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(exc))
|
|
|
|
def require_admin(self) -> None:
|
|
if not self.is_admin():
|
|
raise PermissionError()
|
|
|
|
def player_url(self, row: dict[str, object]) -> str:
|
|
return player_url_for_row(row, self.headers)
|
|
|
|
def request_origin(self) -> str:
|
|
host = self.headers.get("X-Forwarded-Host") or self.headers.get("Host") or ""
|
|
proto = self.headers.get("X-Forwarded-Proto") or "http"
|
|
return f"{proto}://{host}" if host else ""
|
|
|
|
def notification_context(self, row: dict[str, object], probe_result: dict[str, object], status: str) -> dict[str, object]:
|
|
return notification_context_for_row(row, probe_result, status, self.headers)
|
|
|
|
def maybe_notify_stream_transition(self, row: dict[str, object], is_live: bool, probe_result: dict[str, object]) -> None:
|
|
maybe_notify_stream_transition(row, is_live, probe_result, self.headers)
|
|
|
|
def check_stream_row(self, row: dict[str, object], notify: bool = True) -> dict[str, object]:
|
|
return check_stream_row_live(row, self.headers, notify)
|
|
|
|
def api_site_settings(self) -> None:
|
|
settings = site_settings()
|
|
origin = self.request_origin()
|
|
# Auto-populate telegram_public_base_url from the request origin the
|
|
# first time the frontend loads, so Telegram notification links resolve
|
|
# to the correct host without requiring manual configuration.
|
|
if origin and not settings.get("telegram_public_base_url"):
|
|
settings["telegram_public_base_url"] = origin
|
|
with db() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO site_settings (key, value)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET value = excluded.value
|
|
""",
|
|
("telegram_public_base_url", origin),
|
|
)
|
|
self.send_json({"status": "success", "data": settings})
|
|
|
|
def api_public_list(self) -> None:
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"SELECT id, public_id, stream_label, event_name, stream_password FROM streams "
|
|
"WHERE is_hidden = 0 AND is_enabled = 1 "
|
|
"ORDER BY CASE stream_label WHEN 'LIVE' THEN 0 WHEN 'ARCHIVE' THEN 1 ELSE 2 END ASC, "
|
|
"sort_order ASC, id DESC"
|
|
).fetchall()
|
|
self.send_json({"status": "success", "data": [stream_public(row) for row in rows]})
|
|
|
|
def api_get_player_data(self, query: dict[str, list[str]]) -> None:
|
|
stream_ref = query.get("id", [""])[0]
|
|
with db() as conn:
|
|
row = find_stream(conn, stream_ref)
|
|
if not row:
|
|
raise AppError("stream_not_found")
|
|
if int(row["is_enabled"] or 0) != 1:
|
|
raise AppError("stream_disabled")
|
|
settings = site_settings()
|
|
if row["stream_password"]:
|
|
self.send_json(
|
|
{
|
|
"status": "success",
|
|
"data": {
|
|
"requires_password": True,
|
|
"eventName": row["event_name"],
|
|
"siteTitle": settings["site_title"],
|
|
"siteIconUrl": settings.get("site_icon_url", ""),
|
|
},
|
|
}
|
|
)
|
|
return
|
|
self.send_json({"status": "success", "data": player_data(row, settings)})
|
|
|
|
def api_verify_password(self) -> None:
|
|
body = self.read_json()
|
|
password = str(body.get("password", ""))
|
|
with db() as conn:
|
|
row = find_stream(conn, body.get("id", ""))
|
|
if not row:
|
|
raise AppError("stream_not_found")
|
|
if int(row["is_enabled"] or 0) != 1:
|
|
raise AppError("stream_disabled")
|
|
if row["stream_password"] and not hmac.compare_digest(row["stream_password"], password):
|
|
raise AppError("auth_incorrect_password")
|
|
self.send_json({"status": "success", "data": player_data(row)})
|
|
|
|
def api_check_player_stream(self) -> None:
|
|
body = self.read_json()
|
|
password = str(body.get("password", ""))
|
|
with db() as conn:
|
|
row = find_stream(conn, body.get("id", ""))
|
|
if not row:
|
|
raise AppError("stream_not_found")
|
|
if row["stream_password"] and not hmac.compare_digest(row["stream_password"], password):
|
|
raise AppError("auth_incorrect_password")
|
|
self.send_json({"status": "success", "data": self.check_stream_row(row)})
|
|
|
|
def api_viewer_start(self) -> None:
|
|
body = self.read_json()
|
|
visitor_id = str(body.get("visitorId", "")).strip()[:120] or secrets.token_urlsafe(18)
|
|
stream_ref = body.get("id", "")
|
|
referer = str(body.get("referer", self.headers.get("Referer", ""))).strip()[:500]
|
|
with db() as conn:
|
|
row = find_stream(conn, stream_ref)
|
|
if not row or int(row["is_enabled"] or 0) != 1:
|
|
raise AppError("stream_not_found_or_disabled")
|
|
session_id = secrets.token_urlsafe(18)
|
|
timestamp = now()
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO viewer_sessions
|
|
(session_id, visitor_id, stream_id, public_id, ip_hash, user_agent, referer,
|
|
device_type, ip_address, browser, os, started_at, last_seen_at, is_active, play_state)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?)
|
|
""",
|
|
(
|
|
session_id,
|
|
visitor_id,
|
|
row["id"],
|
|
row["public_id"],
|
|
client_ip_hash(self.headers),
|
|
self.headers.get("User-Agent", "")[:800],
|
|
referer,
|
|
detect_device_type(self.headers.get("User-Agent", "")),
|
|
client_ip(self.headers),
|
|
detect_browser(self.headers.get("User-Agent", "")),
|
|
detect_os(self.headers.get("User-Agent", "")),
|
|
timestamp,
|
|
timestamp,
|
|
str(body.get("state", "viewing"))[:40],
|
|
),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO viewer_events (session_id, stream_id, event_type, event_at, metadata) VALUES (?, ?, ?, ?, ?)",
|
|
(session_id, row["id"], "start", timestamp, "{}"),
|
|
)
|
|
self.send_json({"status": "success", "data": {"sessionId": session_id, "visitorId": visitor_id}})
|
|
|
|
def api_viewer_heartbeat(self) -> None:
|
|
body = self.read_json()
|
|
session_id = str(body.get("sessionId", "")).strip()
|
|
if not session_id:
|
|
raise AppError("missing_session_id")
|
|
timestamp = now()
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"UPDATE viewer_sessions SET last_seen_at = ?, is_active = 1, play_state = ? WHERE session_id = ?",
|
|
(timestamp, str(body.get("state", "viewing"))[:40], session_id),
|
|
)
|
|
if cur.rowcount == 0:
|
|
raise AppError("viewer_session_not_found")
|
|
self.send_json({"status": "success"})
|
|
|
|
def api_viewer_end(self) -> None:
|
|
body = self.read_json()
|
|
session_id = str(body.get("sessionId", "")).strip()
|
|
if not session_id:
|
|
self.send_json({"status": "success"})
|
|
return
|
|
timestamp = now()
|
|
with db() as conn:
|
|
row = conn.execute("SELECT stream_id FROM viewer_sessions WHERE session_id = ?", (session_id,)).fetchone()
|
|
conn.execute(
|
|
"UPDATE viewer_sessions SET last_seen_at = ?, ended_at = ?, is_active = 0, play_state = ? WHERE session_id = ?",
|
|
(timestamp, timestamp, str(body.get("state", "ended"))[:40], session_id),
|
|
)
|
|
if row:
|
|
conn.execute(
|
|
"INSERT INTO viewer_events (session_id, stream_id, event_type, event_at, metadata) VALUES (?, ?, ?, ?, ?)",
|
|
(session_id, row["stream_id"], "end", timestamp, "{}"),
|
|
)
|
|
self.send_json({"status": "success"})
|
|
|
|
def api_login(self) -> None:
|
|
body = self.read_json()
|
|
password = str(body.get("password", "")).strip()
|
|
if not verify_admin_password(password):
|
|
raise AppError("auth_incorrect_password")
|
|
token = make_session()
|
|
cookie = (
|
|
f"{SESSION_COOKIE}={token}; Path=/; Max-Age={SESSION_MAX_AGE}; "
|
|
"HttpOnly; SameSite=Lax"
|
|
)
|
|
self.send_json({"status": "success"}, extra_headers={"Set-Cookie": cookie})
|
|
|
|
def api_logout(self) -> None:
|
|
cookie = f"{SESSION_COOKIE}=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax"
|
|
self.send_json({"status": "success"}, extra_headers={"Set-Cookie": cookie})
|
|
|
|
def api_update_admin_password(self) -> None:
|
|
body = self.read_json()
|
|
current_password = str(body.get("currentPassword", "")).strip()
|
|
new_password = str(body.get("newPassword", "")).strip()
|
|
confirm_password = str(body.get("confirmPassword", "")).strip()
|
|
if not verify_admin_password(current_password):
|
|
raise AppError("auth_current_pw_wrong")
|
|
if len(new_password) < 10:
|
|
raise AppError("auth_pw_too_short")
|
|
if new_password != confirm_password:
|
|
raise AppError("auth_pw_mismatch")
|
|
with db() as conn:
|
|
set_setting(conn, "admin_password_hash", hash_admin_password(new_password))
|
|
set_setting(conn, "admin_password_changed_at", str(now()))
|
|
cookie = f"{SESSION_COOKIE}=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax"
|
|
self.send_json({"status": "success"}, extra_headers={"Set-Cookie": cookie})
|
|
|
|
def api_list_admin(self) -> None:
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM streams ORDER BY "
|
|
"CASE stream_label WHEN 'LIVE' THEN 0 WHEN 'ARCHIVE' THEN 1 ELSE 2 END ASC, "
|
|
"sort_order ASC, id DESC"
|
|
).fetchall()
|
|
self.send_json({"status": "success", "data": [stream_admin(row) for row in rows]})
|
|
|
|
def api_stream_stats_summary(self) -> None:
|
|
online_after = now() - 45
|
|
local_now = time.localtime(now())
|
|
today_start = int(time.mktime((local_now.tm_year, local_now.tm_mon, local_now.tm_mday, 0, 0, 0, 0, 0, -1)))
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
stream_id,
|
|
SUM(CASE WHEN is_active = 1 AND last_seen_at >= ? THEN 1 ELSE 0 END) AS online,
|
|
SUM(CASE WHEN started_at >= ? THEN 1 ELSE 0 END) AS today_views,
|
|
COUNT(*) AS total_views,
|
|
MAX(last_seen_at) AS last_seen_at
|
|
FROM viewer_sessions
|
|
GROUP BY stream_id
|
|
""",
|
|
(online_after, today_start),
|
|
).fetchall()
|
|
data = {
|
|
str(row["stream_id"]): {
|
|
"online": int(row["online"] or 0),
|
|
"today_views": int(row["today_views"] or 0),
|
|
"total_views": int(row["total_views"] or 0),
|
|
"last_seen_at": int(row["last_seen_at"] or 0),
|
|
}
|
|
for row in rows
|
|
}
|
|
self.send_json({"status": "success", "data": data})
|
|
|
|
def api_add(self) -> None:
|
|
body = self.read_json()
|
|
event_name = str(body.get("eventName", "")).strip()
|
|
if not event_name:
|
|
raise AppError("stream_name_empty")
|
|
links = normalize_links(body.get("links", []))
|
|
stream_label = normalize_stream_label(body.get("streamLabel"))
|
|
with db() as conn:
|
|
max_order = conn.execute(
|
|
"SELECT COALESCE(MAX(sort_order), 0) FROM streams WHERE stream_label = ?",
|
|
(stream_label,),
|
|
).fetchone()[0]
|
|
params = (
|
|
generate_public_id(conn),
|
|
stream_label,
|
|
event_name,
|
|
str(body.get("streamPassword", "")).strip(),
|
|
json.dumps(links, ensure_ascii=False),
|
|
1 if body.get("isHidden") else 0,
|
|
int(max_order) + 1,
|
|
now(),
|
|
now(),
|
|
)
|
|
row = conn.execute(
|
|
"""
|
|
INSERT INTO streams
|
|
(public_id, stream_label, event_name, stream_password, links_json, is_hidden, sort_order, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
RETURNING id
|
|
""",
|
|
params,
|
|
).fetchone()
|
|
stream_id = row["id"]
|
|
self.send_json({"status": "success", "id": stream_id})
|
|
|
|
def api_update(self) -> None:
|
|
body = self.read_json()
|
|
stream_id = int(body.get("id", 0) or 0)
|
|
event_name = str(body.get("eventName", "")).strip()
|
|
if not stream_id:
|
|
raise AppError("missing_stream_id")
|
|
if not event_name:
|
|
raise AppError("stream_name_empty")
|
|
links = normalize_links(body.get("links", []))
|
|
stream_label = normalize_stream_label(body.get("streamLabel"))
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"""
|
|
UPDATE streams
|
|
SET stream_label = ?, event_name = ?, stream_password = ?, links_json = ?, is_hidden = ?, updated_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
stream_label,
|
|
event_name,
|
|
str(body.get("streamPassword", "")).strip(),
|
|
json.dumps(links, ensure_ascii=False),
|
|
1 if body.get("isHidden") else 0,
|
|
now(),
|
|
stream_id,
|
|
),
|
|
)
|
|
if cur.rowcount == 0:
|
|
raise AppError("stream_not_found")
|
|
self.send_json({"status": "success"})
|
|
|
|
def api_delete(self) -> None:
|
|
body = self.read_json()
|
|
stream_id = int(body.get("id", 0) or 0)
|
|
if not stream_id:
|
|
raise AppError("missing_stream_id")
|
|
with db() as conn:
|
|
conn.execute("DELETE FROM streams WHERE id = ?", (stream_id,))
|
|
conn.execute("DELETE FROM stream_probe_states WHERE stream_id = ?", (stream_id,))
|
|
self.send_json({"status": "success"})
|
|
|
|
def api_set_stream_enabled(self) -> None:
|
|
body = self.read_json()
|
|
stream_id = int(body.get("id", 0) or 0)
|
|
enabled = 1 if body.get("enabled") else 0
|
|
if not stream_id:
|
|
raise AppError("missing_stream_id")
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"UPDATE streams SET is_enabled = ?, updated_at = ? WHERE id = ?",
|
|
(enabled, now(), stream_id),
|
|
)
|
|
row = conn.execute("SELECT * FROM streams WHERE id = ?", (stream_id,)).fetchone()
|
|
if cur.rowcount == 0 or not row:
|
|
raise AppError("stream_not_found")
|
|
result = None if enabled else stream_probe_response(False)
|
|
if not enabled:
|
|
result["code"] = "closed"
|
|
self.maybe_notify_stream_transition(row, False, result)
|
|
self.send_json({"status": "success", "enabled": enabled, "data": result})
|
|
|
|
def api_set_stream_tg_notify(self) -> None:
|
|
body = self.read_json()
|
|
stream_id = int(body.get("id", 0) or 0)
|
|
enabled = 1 if body.get("enabled") else 0
|
|
if not stream_id:
|
|
raise AppError("missing_stream_id")
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"UPDATE streams SET tg_notify_enabled = ?, updated_at = ? WHERE id = ?",
|
|
(enabled, now(), stream_id),
|
|
)
|
|
row = conn.execute("SELECT * FROM streams WHERE id = ?", (stream_id,)).fetchone()
|
|
if cur.rowcount == 0:
|
|
raise AppError("stream_not_found")
|
|
if enabled and row:
|
|
headers = dict(self.headers)
|
|
threading.Thread(
|
|
target=notify_current_live_if_needed,
|
|
args=(stream_id, headers),
|
|
daemon=True,
|
|
).start()
|
|
self.send_json({"status": "success", "enabled": enabled})
|
|
|
|
def api_reorder_streams(self) -> None:
|
|
body = self.read_json()
|
|
ids = [int(x) for x in body.get("ids", [])]
|
|
label = str(body.get("label", "")).strip()
|
|
if not ids or not label:
|
|
raise AppError("obs_ids_required")
|
|
with db() as conn:
|
|
for order, stream_id in enumerate(ids):
|
|
conn.execute(
|
|
"UPDATE streams SET sort_order = ?, updated_at = ? WHERE id = ? AND stream_label = ?",
|
|
(order, now(), stream_id, label),
|
|
)
|
|
self.send_json({"status": "success"})
|
|
|
|
def api_update_site_settings(self) -> None:
|
|
settings = normalize_site_settings(self.read_json())
|
|
with db() as conn:
|
|
for key, value in settings.items():
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO site_settings (key, value)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET value = excluded.value
|
|
""",
|
|
(key, value),
|
|
)
|
|
self.send_json({"status": "success", "data": settings})
|
|
|
|
def api_telegram_settings(self) -> None:
|
|
settings = telegram_settings()
|
|
origin = self.request_origin()
|
|
if origin and not settings.get("telegram_public_base_url"):
|
|
settings["telegram_public_base_url"] = origin
|
|
with db() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO site_settings (key, value)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET value = excluded.value
|
|
""",
|
|
("telegram_public_base_url", origin),
|
|
)
|
|
self.send_json({"status": "success", "data": settings})
|
|
|
|
def api_update_telegram_settings(self) -> None:
|
|
settings = normalize_telegram_settings(self.read_json())
|
|
with db() as conn:
|
|
for key, value in settings.items():
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO site_settings (key, value)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET value = excluded.value
|
|
""",
|
|
(key, value),
|
|
)
|
|
self.send_json({"status": "success", "data": settings})
|
|
|
|
def api_test_telegram(self) -> None:
|
|
settings = normalize_telegram_settings(self.read_json())
|
|
if not settings["telegram_bot_token"] or not settings["telegram_chat_id"]:
|
|
raise AppError("tg_config_missing")
|
|
send_telegram_message(settings, "StreamHall Telegram test message")
|
|
self.send_json({"status": "success", "message": "Test message sent"})
|
|
|
|
def api_check_stream_url(self) -> None:
|
|
body = self.read_json()
|
|
result = probe_stream_url(body.get("url", ""), body.get("type", ""))
|
|
self.send_json({"status": "success", "data": result})
|
|
|
|
def api_check_stream(self) -> None:
|
|
body = self.read_json()
|
|
stream_id = int(body.get("id", 0) or 0)
|
|
if not stream_id:
|
|
raise AppError("missing_stream_id")
|
|
with db() as conn:
|
|
row = conn.execute("SELECT * FROM streams WHERE id = ?", (stream_id,)).fetchone()
|
|
if not row:
|
|
raise AppError("stream_not_found")
|
|
self.send_json({"status": "success", "data": self.check_stream_row(row)})
|
|
|
|
def api_list_obs_routes(self) -> None:
|
|
with db() as conn:
|
|
rows = conn.execute("SELECT * FROM obs_stream_routes ORDER BY id DESC").fetchall()
|
|
self.send_json({"status": "success", "data": [obs_route_public(row) for row in rows]})
|
|
|
|
def api_add_obs_route(self) -> None:
|
|
body = self.read_json()
|
|
stream_key = str(body.get("streamKey", "")).strip()
|
|
if not stream_key:
|
|
raise AppError("stream_key_empty")
|
|
if len(stream_key) > 180:
|
|
raise AppError("stream_key_too_long")
|
|
slug = obs_route_slug(stream_key)
|
|
with db() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO obs_stream_routes (stream_key, public_slug, created_at)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(stream_key) DO UPDATE SET public_slug = excluded.public_slug
|
|
""",
|
|
(stream_key, slug, now()),
|
|
)
|
|
row = conn.execute("SELECT * FROM obs_stream_routes WHERE stream_key = ?", (stream_key,)).fetchone()
|
|
self.send_json({"status": "success", "data": obs_route_public(row)})
|
|
|
|
def api_delete_obs_route(self) -> None:
|
|
body = self.read_json()
|
|
route_id = int(body.get("id", 0) or 0)
|
|
if not route_id:
|
|
raise AppError("missing_route_id")
|
|
with db() as conn:
|
|
cur = conn.execute("DELETE FROM obs_stream_routes WHERE id = ?", (route_id,))
|
|
if cur.rowcount == 0:
|
|
raise AppError("route_not_found")
|
|
self.send_json({"status": "success"})
|
|
|
|
def api_stats_overview(self) -> None:
|
|
online_after = now() - 45
|
|
local_now = time.localtime(now())
|
|
today_start = int(time.mktime((local_now.tm_year, local_now.tm_mon, local_now.tm_mday, 0, 0, 0, 0, 0, -1)))
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT
|
|
SUM(CASE WHEN is_active = 1 AND last_seen_at >= ? THEN 1 ELSE 0 END) AS total_online,
|
|
SUM(CASE WHEN started_at >= ? THEN 1 ELSE 0 END) AS today_views,
|
|
COUNT(*) AS total_views,
|
|
COUNT(DISTINCT visitor_id) AS unique_visitors,
|
|
COUNT(DISTINCT stream_id) AS streams_with_views
|
|
FROM viewer_sessions
|
|
""",
|
|
(online_after, today_start),
|
|
).fetchone()
|
|
device_rows = conn.execute(
|
|
"SELECT device_type, COUNT(*) AS cnt FROM viewer_sessions GROUP BY device_type"
|
|
).fetchall()
|
|
devices = {r["device_type"]: int(r["cnt"]) for r in device_rows}
|
|
self.send_json({"status": "success", "data": {
|
|
"total_online": int(row["total_online"] or 0),
|
|
"today_views": int(row["today_views"] or 0),
|
|
"total_views": int(row["total_views"] or 0),
|
|
"unique_visitors": int(row["unique_visitors"] or 0),
|
|
"streams_with_views": int(row["streams_with_views"] or 0),
|
|
"devices": devices,
|
|
}})
|
|
|
|
def api_stats_streams(self) -> None:
|
|
online_after = now() - 45
|
|
local_now = time.localtime(now())
|
|
today_start = int(time.mktime((local_now.tm_year, local_now.tm_mon, local_now.tm_mday, 0, 0, 0, 0, 0, -1)))
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
vs.stream_id,
|
|
s.event_name,
|
|
SUM(CASE WHEN vs.is_active = 1 AND vs.last_seen_at >= ? THEN 1 ELSE 0 END) AS online,
|
|
SUM(CASE WHEN vs.started_at >= ? THEN 1 ELSE 0 END) AS today_views,
|
|
COUNT(*) AS total_views,
|
|
COUNT(DISTINCT vs.visitor_id) AS unique_visitors,
|
|
SUM(CASE WHEN vs.device_type = 'mobile' THEN 1 ELSE 0 END) AS mobile,
|
|
SUM(CASE WHEN vs.device_type = 'tablet' THEN 1 ELSE 0 END) AS tablet,
|
|
SUM(CASE WHEN vs.device_type = 'desktop' THEN 1 ELSE 0 END) AS desktop,
|
|
AVG(CASE WHEN vs.ended_at > 0 THEN vs.ended_at - vs.started_at END) AS avg_duration,
|
|
MAX(vs.last_seen_at) AS last_seen_at
|
|
FROM viewer_sessions vs
|
|
LEFT JOIN streams s ON vs.stream_id = s.id
|
|
GROUP BY vs.stream_id, s.event_name
|
|
ORDER BY total_views DESC
|
|
""",
|
|
(online_after, today_start),
|
|
).fetchall()
|
|
self.send_json({"status": "success", "data": [
|
|
{
|
|
"stream_id": int(r["stream_id"]),
|
|
"event_name": str(r["event_name"] or ""),
|
|
"online": int(r["online"] or 0),
|
|
"today_views": int(r["today_views"] or 0),
|
|
"total_views": int(r["total_views"] or 0),
|
|
"unique_visitors": int(r["unique_visitors"] or 0),
|
|
"mobile": int(r["mobile"] or 0),
|
|
"tablet": int(r["tablet"] or 0),
|
|
"desktop": int(r["desktop"] or 0),
|
|
"avg_duration": round(float(r["avg_duration"]), 1) if r["avg_duration"] else None,
|
|
"last_seen_at": int(r["last_seen_at"] or 0),
|
|
}
|
|
for r in rows
|
|
]})
|
|
|
|
def api_stats_timeseries(self) -> None:
|
|
range_param = parse_qs(urlparse(self.path).query).get("range", ["today"])[0]
|
|
local_now = time.localtime(now())
|
|
today_start = int(time.mktime((local_now.tm_year, local_now.tm_mon, local_now.tm_mday, 0, 0, 0, 0, 0, -1)))
|
|
if range_param == "7d":
|
|
bucket_secs = 86400
|
|
ref_start = today_start - 6 * 86400
|
|
num_buckets = 7
|
|
elif range_param == "30d":
|
|
bucket_secs = 86400
|
|
ref_start = today_start - 29 * 86400
|
|
num_buckets = 30
|
|
else:
|
|
range_param = "today"
|
|
bucket_secs = 3600
|
|
ref_start = today_start
|
|
num_buckets = 24
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT (started_at - ?) / ? AS bucket, COUNT(*) AS views
|
|
FROM viewer_sessions
|
|
WHERE started_at >= ?
|
|
GROUP BY bucket
|
|
""",
|
|
(ref_start, bucket_secs, ref_start),
|
|
).fetchall()
|
|
buckets = [0] * num_buckets
|
|
for r in rows:
|
|
idx = int(r["bucket"])
|
|
if 0 <= idx < num_buckets:
|
|
buckets[idx] = int(r["views"])
|
|
self.send_json({"status": "success", "data": {
|
|
"range": range_param,
|
|
"buckets": buckets,
|
|
"ref_start": ref_start,
|
|
"bucket_secs": bucket_secs,
|
|
}})
|
|
|
|
def api_stats_export_csv(self) -> None:
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT vs.session_id, vs.visitor_id, s.event_name,
|
|
vs.device_type, vs.referer,
|
|
vs.started_at, vs.ended_at, vs.last_seen_at,
|
|
vs.is_active, vs.play_state
|
|
FROM viewer_sessions vs
|
|
LEFT JOIN streams s ON vs.stream_id = s.id
|
|
ORDER BY vs.started_at DESC
|
|
"""
|
|
).fetchall()
|
|
buf = io.StringIO()
|
|
writer = csv.writer(buf)
|
|
writer.writerow(["session_id", "visitor_id", "event_name", "device_type", "referer",
|
|
"started_at", "ended_at", "last_seen_at", "is_active", "play_state"])
|
|
for r in rows:
|
|
writer.writerow([
|
|
r["session_id"], r["visitor_id"], r["event_name"] or "",
|
|
r["device_type"], r["referer"],
|
|
r["started_at"], r["ended_at"], r["last_seen_at"],
|
|
r["is_active"], r["play_state"],
|
|
])
|
|
body = buf.getvalue().encode("utf-8-sig")
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "text/csv; charset=utf-8")
|
|
self.send_header("Content-Disposition", 'attachment; filename="viewer_stats.csv"')
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def api_stats_stream_detail(self) -> None:
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
stream_id = int(qs.get("id", [0])[0] or 0)
|
|
range_param = qs.get("range", ["7d"])[0]
|
|
if not stream_id:
|
|
raise AppError("missing_stream_id")
|
|
online_after = now() - 45
|
|
local_now = time.localtime(now())
|
|
today_start = int(time.mktime((local_now.tm_year, local_now.tm_mon, local_now.tm_mday, 0, 0, 0, 0, 0, -1)))
|
|
if range_param == "today":
|
|
bucket_secs, ref_start, num_buckets = 3600, today_start, 24
|
|
elif range_param == "30d":
|
|
bucket_secs, ref_start, num_buckets = 86400, today_start - 29 * 86400, 30
|
|
else:
|
|
range_param = "7d"
|
|
bucket_secs, ref_start, num_buckets = 86400, today_start - 6 * 86400, 7
|
|
with db() as conn:
|
|
summary = conn.execute(
|
|
"""
|
|
SELECT
|
|
SUM(CASE WHEN is_active = 1 AND last_seen_at >= ? THEN 1 ELSE 0 END) AS online,
|
|
SUM(CASE WHEN started_at >= ? THEN 1 ELSE 0 END) AS today_views,
|
|
COUNT(*) AS total_views,
|
|
COUNT(DISTINCT visitor_id) AS unique_visitors,
|
|
SUM(CASE WHEN device_type = 'mobile' THEN 1 ELSE 0 END) AS mobile,
|
|
SUM(CASE WHEN device_type = 'tablet' THEN 1 ELSE 0 END) AS tablet,
|
|
SUM(CASE WHEN device_type = 'desktop' THEN 1 ELSE 0 END) AS desktop,
|
|
AVG(CASE WHEN ended_at > 0 THEN ended_at - started_at END) AS avg_duration
|
|
FROM viewer_sessions WHERE stream_id = ?
|
|
""",
|
|
(online_after, today_start, stream_id),
|
|
).fetchone()
|
|
ts_rows = conn.execute(
|
|
"""
|
|
SELECT (started_at - ?) / ? AS bucket, COUNT(*) AS views
|
|
FROM viewer_sessions
|
|
WHERE stream_id = ? AND started_at >= ?
|
|
GROUP BY bucket
|
|
""",
|
|
(ref_start, bucket_secs, stream_id, ref_start),
|
|
).fetchall()
|
|
recent = conn.execute(
|
|
"""
|
|
SELECT started_at, ended_at, last_seen_at, device_type, play_state, is_active, ip_address, browser
|
|
FROM viewer_sessions
|
|
WHERE stream_id = ?
|
|
ORDER BY started_at DESC LIMIT 20
|
|
""",
|
|
(stream_id,),
|
|
).fetchall()
|
|
buckets = [0] * num_buckets
|
|
for r in ts_rows:
|
|
idx = int(r["bucket"])
|
|
if 0 <= idx < num_buckets:
|
|
buckets[idx] = int(r["views"])
|
|
ips = [str(r["ip_address"] or "") for r in recent]
|
|
geo_map = batch_geoip(ips)
|
|
self.send_json({"status": "success", "data": {
|
|
"summary": {
|
|
"online": int(summary["online"] or 0),
|
|
"today_views": int(summary["today_views"] or 0),
|
|
"total_views": int(summary["total_views"] or 0),
|
|
"unique_visitors": int(summary["unique_visitors"] or 0),
|
|
"mobile": int(summary["mobile"] or 0),
|
|
"tablet": int(summary["tablet"] or 0),
|
|
"desktop": int(summary["desktop"] or 0),
|
|
"avg_duration": round(float(summary["avg_duration"]), 1) if summary["avg_duration"] else None,
|
|
},
|
|
"timeseries": {
|
|
"range": range_param, "buckets": buckets,
|
|
"ref_start": ref_start, "bucket_secs": bucket_secs,
|
|
},
|
|
"recent_sessions": [
|
|
{
|
|
"started_at": int(r["started_at"]),
|
|
"ended_at": int(r["ended_at"] or 0),
|
|
"last_seen_at": int(r["last_seen_at"] or 0),
|
|
"device_type": str(r["device_type"] or ""),
|
|
"play_state": str(r["play_state"] or ""),
|
|
"is_active": int(r["is_active"]),
|
|
"ip_address": str(r["ip_address"] or ""),
|
|
"browser": str(r["browser"] or ""),
|
|
"geo": geo_map.get(str(r["ip_address"] or ""), {}),
|
|
}
|
|
for r in recent
|
|
],
|
|
}})
|
|
|
|
def _stream_live_snapshot(self, stream_id: int) -> dict:
|
|
online_after = now() - 45
|
|
local_now = time.localtime(now())
|
|
today_start = int(time.mktime(
|
|
(local_now.tm_year, local_now.tm_mon, local_now.tm_mday, 0, 0, 0, 0, 0, -1)
|
|
))
|
|
with db() as conn:
|
|
summary = conn.execute(
|
|
"""
|
|
SELECT
|
|
SUM(CASE WHEN is_active=1 AND last_seen_at >= ? THEN 1 ELSE 0 END) AS online,
|
|
SUM(CASE WHEN started_at >= ? THEN 1 ELSE 0 END) AS today_views,
|
|
COUNT(*) AS total_views,
|
|
COUNT(DISTINCT visitor_id) AS unique_visitors,
|
|
SUM(CASE WHEN device_type='mobile' THEN 1 ELSE 0 END) AS mobile,
|
|
SUM(CASE WHEN device_type='tablet' THEN 1 ELSE 0 END) AS tablet,
|
|
SUM(CASE WHEN device_type='desktop' THEN 1 ELSE 0 END) AS desktop,
|
|
AVG(CASE WHEN ended_at > 0 THEN ended_at - started_at END) AS avg_duration
|
|
FROM viewer_sessions WHERE stream_id = ?
|
|
""",
|
|
(online_after, today_start, stream_id),
|
|
).fetchone()
|
|
browser_rows = conn.execute(
|
|
"SELECT browser, COUNT(*) AS cnt FROM viewer_sessions WHERE stream_id = ? AND browser != '' GROUP BY browser ORDER BY cnt DESC LIMIT 6",
|
|
(stream_id,),
|
|
).fetchall()
|
|
os_rows = conn.execute(
|
|
"SELECT os, COUNT(*) AS cnt FROM viewer_sessions WHERE stream_id = ? AND os != '' GROUP BY os ORDER BY cnt DESC LIMIT 6",
|
|
(stream_id,),
|
|
).fetchall()
|
|
recent = conn.execute(
|
|
"""
|
|
SELECT started_at, ended_at, last_seen_at, device_type, play_state,
|
|
is_active, ip_address, browser
|
|
FROM viewer_sessions WHERE stream_id = ?
|
|
ORDER BY started_at DESC LIMIT 20
|
|
""",
|
|
(stream_id,),
|
|
).fetchall()
|
|
ips = [str(r["ip_address"] or "") for r in recent]
|
|
geo_map = batch_geoip(ips)
|
|
return {
|
|
"summary": {
|
|
"online": int(summary["online"] or 0),
|
|
"today_views": int(summary["today_views"] or 0),
|
|
"total_views": int(summary["total_views"] or 0),
|
|
"unique_visitors": int(summary["unique_visitors"] or 0),
|
|
"mobile": int(summary["mobile"] or 0),
|
|
"tablet": int(summary["tablet"] or 0),
|
|
"desktop": int(summary["desktop"] or 0),
|
|
"avg_duration": round(float(summary["avg_duration"]), 1) if summary["avg_duration"] else None,
|
|
"browsers": [{"name": r["browser"], "cnt": int(r["cnt"])} for r in browser_rows],
|
|
"oses": [{"name": r["os"], "cnt": int(r["cnt"])} for r in os_rows],
|
|
},
|
|
"recent_sessions": [
|
|
{
|
|
"started_at": int(r["started_at"]),
|
|
"ended_at": int(r["ended_at"] or 0),
|
|
"last_seen_at": int(r["last_seen_at"] or 0),
|
|
"device_type": str(r["device_type"] or ""),
|
|
"play_state": str(r["play_state"] or ""),
|
|
"is_active": int(r["is_active"]),
|
|
"ip_address": str(r["ip_address"] or ""),
|
|
"browser": str(r["browser"] or ""),
|
|
"geo": geo_map.get(str(r["ip_address"] or ""), {}),
|
|
}
|
|
for r in recent
|
|
],
|
|
}
|
|
|
|
def _dashboard_snapshot(self) -> dict:
|
|
online_after = now() - 45
|
|
local_now = time.localtime(now())
|
|
today_start = int(time.mktime(
|
|
(local_now.tm_year, local_now.tm_mon, local_now.tm_mday, 0, 0, 0, 0, 0, -1)
|
|
))
|
|
with db() as conn:
|
|
ov = conn.execute(
|
|
"""
|
|
SELECT
|
|
SUM(CASE WHEN is_active = 1 AND last_seen_at >= ? THEN 1 ELSE 0 END) AS total_online,
|
|
SUM(CASE WHEN started_at >= ? THEN 1 ELSE 0 END) AS today_views,
|
|
COUNT(*) AS total_views,
|
|
COUNT(DISTINCT visitor_id) AS unique_visitors,
|
|
COUNT(DISTINCT stream_id) AS streams_with_views
|
|
FROM viewer_sessions
|
|
""",
|
|
(online_after, today_start),
|
|
).fetchone()
|
|
device_rows = conn.execute(
|
|
"SELECT device_type, COUNT(*) AS cnt FROM viewer_sessions GROUP BY device_type"
|
|
).fetchall()
|
|
browser_rows = conn.execute(
|
|
"SELECT browser, COUNT(*) AS cnt FROM viewer_sessions WHERE browser != '' GROUP BY browser ORDER BY cnt DESC LIMIT 6"
|
|
).fetchall()
|
|
os_rows = conn.execute(
|
|
"SELECT os, COUNT(*) AS cnt FROM viewer_sessions WHERE os != '' GROUP BY os ORDER BY cnt DESC LIMIT 6"
|
|
).fetchall()
|
|
stream_rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
vs.stream_id,
|
|
s.event_name,
|
|
SUM(CASE WHEN vs.is_active = 1 AND vs.last_seen_at >= ? THEN 1 ELSE 0 END) AS online,
|
|
SUM(CASE WHEN vs.started_at >= ? THEN 1 ELSE 0 END) AS today_views,
|
|
COUNT(*) AS total_views,
|
|
COUNT(DISTINCT vs.visitor_id) AS unique_visitors,
|
|
SUM(CASE WHEN vs.device_type = 'mobile' THEN 1 ELSE 0 END) AS mobile,
|
|
SUM(CASE WHEN vs.device_type = 'tablet' THEN 1 ELSE 0 END) AS tablet,
|
|
SUM(CASE WHEN vs.device_type = 'desktop' THEN 1 ELSE 0 END) AS desktop,
|
|
AVG(CASE WHEN vs.ended_at > 0 THEN vs.ended_at - vs.started_at END) AS avg_duration,
|
|
MAX(vs.last_seen_at) AS last_seen_at
|
|
FROM viewer_sessions vs
|
|
LEFT JOIN streams s ON vs.stream_id = s.id
|
|
GROUP BY vs.stream_id, s.event_name
|
|
ORDER BY total_views DESC
|
|
""",
|
|
(online_after, today_start),
|
|
).fetchall()
|
|
devices = {r["device_type"]: int(r["cnt"]) for r in device_rows}
|
|
browsers = [{"name": r["browser"], "cnt": int(r["cnt"])} for r in browser_rows]
|
|
oses = [{"name": r["os"], "cnt": int(r["cnt"])} for r in os_rows]
|
|
return {
|
|
"overview": {
|
|
"total_online": int(ov["total_online"] or 0),
|
|
"today_views": int(ov["today_views"] or 0),
|
|
"total_views": int(ov["total_views"] or 0),
|
|
"unique_visitors": int(ov["unique_visitors"] or 0),
|
|
"streams_with_views": int(ov["streams_with_views"] or 0),
|
|
"devices": devices,
|
|
"browsers": browsers,
|
|
"oses": oses,
|
|
},
|
|
"streams": [
|
|
{
|
|
"stream_id": int(r["stream_id"]),
|
|
"event_name": str(r["event_name"] or ""),
|
|
"online": int(r["online"] or 0),
|
|
"today_views": int(r["today_views"] or 0),
|
|
"total_views": int(r["total_views"] or 0),
|
|
"unique_visitors": int(r["unique_visitors"] or 0),
|
|
"mobile": int(r["mobile"] or 0),
|
|
"tablet": int(r["tablet"] or 0),
|
|
"desktop": int(r["desktop"] or 0),
|
|
"avg_duration": round(float(r["avg_duration"]), 1) if r["avg_duration"] else None,
|
|
"last_seen_at": int(r["last_seen_at"] or 0),
|
|
}
|
|
for r in stream_rows
|
|
],
|
|
}
|
|
|
|
def api_stats_geo(self) -> None:
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
stream_id = qs.get("id", [None])[0]
|
|
range_ = qs.get("range", ["30d"])[0]
|
|
since_map = {"today": 86400, "7d": 7 * 86400, "30d": 30 * 86400}
|
|
since = (now() - since_map[range_]) if range_ in since_map else 0
|
|
with db() as conn:
|
|
cond = "WHERE ip_address != ''"
|
|
params: list = []
|
|
if since:
|
|
cond += " AND started_at >= ?"
|
|
params.append(since)
|
|
if stream_id:
|
|
cond += " AND stream_id = ?"
|
|
params.append(int(stream_id))
|
|
rows = conn.execute(
|
|
f"SELECT DISTINCT ip_address FROM viewer_sessions {cond}", params
|
|
).fetchall()
|
|
ips = [r["ip_address"] for r in rows]
|
|
geo_map = batch_geoip(ips)
|
|
country_counts: dict[str, dict] = {}
|
|
for ip in ips:
|
|
g = geo_map.get(ip, {})
|
|
code = g.get("countryCode", "")
|
|
if not code:
|
|
continue
|
|
if code not in country_counts:
|
|
country_counts[code] = {"code": code, "name": g.get("country", code), "count": 0}
|
|
country_counts[code]["count"] += 1
|
|
countries = sorted(country_counts.values(), key=lambda x: x["count"], reverse=True)
|
|
self.send_json({"status": "ok", "data": {"countries": countries}})
|
|
|
|
def api_list_api_keys(self) -> None:
|
|
with db() as conn:
|
|
rows = conn.execute(
|
|
"SELECT id, label, created_at, last_used_at FROM api_keys ORDER BY created_at DESC"
|
|
).fetchall()
|
|
self.send_json({"status": "success", "data": [dict(r) for r in rows]})
|
|
|
|
def api_create_api_key(self) -> None:
|
|
body = self.read_json()
|
|
label = str(body.get("label", "")).strip()[:80]
|
|
token = secrets.token_urlsafe(32)
|
|
token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest()
|
|
ts = now()
|
|
with db() as conn:
|
|
conn.execute(
|
|
"INSERT INTO api_keys (token_hash, label, created_at, last_used_at) VALUES (?, ?, ?, 0)",
|
|
(token_hash, label, ts),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT id, label, created_at, last_used_at FROM api_keys WHERE token_hash = ?",
|
|
(token_hash,),
|
|
).fetchone()
|
|
result = dict(row)
|
|
result["token"] = token
|
|
self.send_json({"status": "success", "data": result})
|
|
|
|
def api_delete_api_key(self) -> None:
|
|
body = self.read_json()
|
|
key_id = int(body.get("id", 0))
|
|
if not key_id:
|
|
raise AppError("missing_id")
|
|
with db() as conn:
|
|
conn.execute("DELETE FROM api_keys WHERE id = ?", (key_id,))
|
|
self.send_json({"status": "success"})
|
|
|
|
def api_stats_sessions_page(self) -> None:
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
stream_id = int(qs.get("id", [0])[0] or 0)
|
|
offset = max(0, int(qs.get("offset", [0])[0] or 0))
|
|
limit = min(max(1, int(qs.get("limit", [50])[0] or 50)), 200)
|
|
order_col = qs.get("order_by", ["last_seen_at"])[0]
|
|
order_dir = qs.get("order_dir", ["desc"])[0].upper()
|
|
SORT_COLS = {
|
|
"last_seen_at": "last_seen_at",
|
|
"started_at": "started_at",
|
|
"duration": "CASE WHEN ended_at > 0 THEN ended_at - started_at ELSE last_seen_at - started_at END",
|
|
"device_type": "device_type",
|
|
"browser": "browser",
|
|
"ip_address": "ip_address",
|
|
}
|
|
if order_dir not in ("ASC", "DESC"):
|
|
order_dir = "DESC"
|
|
col_expr = SORT_COLS.get(order_col, "last_seen_at")
|
|
with db() as conn:
|
|
total = conn.execute(
|
|
"SELECT COUNT(*) AS cnt FROM viewer_sessions WHERE stream_id = ?", (stream_id,)
|
|
).fetchone()["cnt"]
|
|
rows = conn.execute(
|
|
"SELECT started_at, ended_at, last_seen_at, device_type, "
|
|
"play_state, is_active, ip_address, browser "
|
|
"FROM viewer_sessions WHERE stream_id = ? "
|
|
f"ORDER BY {col_expr} {order_dir} LIMIT ? OFFSET ?",
|
|
(stream_id, limit, offset),
|
|
).fetchall()
|
|
geo_map = batch_geoip([r["ip_address"] for r in rows if r["ip_address"]])
|
|
sessions = [{**dict(r), "geo": geo_map.get(r["ip_address"], {})} for r in rows]
|
|
self.send_json({"status": "ok", "data": {
|
|
"sessions": sessions, "total": total, "offset": offset, "limit": limit,
|
|
}})
|
|
|
|
def api_stats_dashboard_realtime(self) -> None:
|
|
try:
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "text/event-stream; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("X-Accel-Buffering", "no")
|
|
self.end_headers()
|
|
except OSError:
|
|
return
|
|
while True:
|
|
try:
|
|
msg = ("data: " + json.dumps(self._dashboard_snapshot(), ensure_ascii=False) + "\n\n").encode("utf-8")
|
|
self.wfile.write(msg)
|
|
self.wfile.flush()
|
|
except OSError:
|
|
break
|
|
time.sleep(1)
|
|
|
|
def api_stats_stream_realtime(self) -> None:
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
stream_id = int(qs.get("id", [0])[0] or 0)
|
|
if not stream_id:
|
|
raise AppError("missing_stream_id")
|
|
try:
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "text/event-stream; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("X-Accel-Buffering", "no")
|
|
self.end_headers()
|
|
except OSError:
|
|
return
|
|
while True:
|
|
try:
|
|
snapshot = self._stream_live_snapshot(stream_id)
|
|
msg = ("data: " + json.dumps(snapshot, ensure_ascii=False) + "\n\n").encode("utf-8")
|
|
self.wfile.write(msg)
|
|
self.wfile.flush()
|
|
except OSError:
|
|
break
|
|
time.sleep(1)
|
|
|
|
def proxy_obs_route(self, request_path: str, request_query: str = "", send_body: bool = True) -> None:
|
|
parts = request_path.strip("/").split("/")
|
|
if len(parts) < 2 or parts[0] != "h":
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
return
|
|
slug = parts[1]
|
|
tail = "/".join(parts[2:])
|
|
flv_slug = slug[:-4] if slug.endswith(".flv") else ""
|
|
lookup_slug = flv_slug or slug
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM obs_stream_routes WHERE public_slug = ?", (lookup_slug,)
|
|
).fetchone()
|
|
if not row:
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
return
|
|
|
|
stream_key = row["stream_key"]
|
|
if flv_slug:
|
|
upstream_path = f"/live/{quote(stream_key, safe='')}.flv"
|
|
content_type = "video/x-flv"
|
|
rewrite_manifest = False
|
|
elif tail in ("", "index.m3u8"):
|
|
upstream_path = f"/live/{quote(stream_key, safe='')}.m3u8"
|
|
content_type = "application/vnd.apple.mpegurl"
|
|
rewrite_manifest = True
|
|
else:
|
|
if tail.startswith(lookup_slug):
|
|
tail = f"{stream_key}{tail[len(lookup_slug):]}"
|
|
upstream_path = f"/live/{quote(tail, safe=URL_PATH_SAFE)}"
|
|
content_type = mimetypes.guess_type(tail)[0] or "application/octet-stream"
|
|
rewrite_manifest = tail.split("?", 1)[0].endswith(".m3u8")
|
|
|
|
query = f"?{request_query}" if request_query else ""
|
|
upstream_url = f"{SRS_HTTP_ORIGIN}{upstream_path}{query}"
|
|
try:
|
|
req = Request(upstream_url, headers={"User-Agent": "StreamHall/1.0"})
|
|
opener = urlopen(req, timeout=STREAM_PROBE_TIMEOUT) if rewrite_manifest else urlopen(req)
|
|
with opener as resp:
|
|
if rewrite_manifest:
|
|
body = resp.read(1024 * 1024)
|
|
text = decode_probe_text(body)
|
|
content = rewrite_hls_manifest(text, lookup_slug, stream_key).encode("utf-8")
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", f"{content_type}; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(content)))
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
if send_body:
|
|
self.wfile.write(content)
|
|
return
|
|
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
if not send_body:
|
|
return
|
|
while True:
|
|
chunk = resp.read(65536)
|
|
if not chunk:
|
|
break
|
|
self.wfile.write(chunk)
|
|
except HTTPError as exc:
|
|
self.send_error(HTTPStatus(exc.code) if exc.code in HTTPStatus._value2member_map_ else HTTPStatus.BAD_GATEWAY)
|
|
except (URLError, TimeoutError, OSError):
|
|
self.send_error(HTTPStatus.BAD_GATEWAY)
|
|
|
|
def proxy_hls_route(self, request_path: str, send_body: bool = True) -> None:
|
|
# URL format: /proxy/hls/<token>/<base64-encoded-url>
|
|
# The token is an HMAC of the target hostname; it ensures that only URLs
|
|
# generated by hls_proxy_path() can be proxied, preventing open-proxy abuse.
|
|
parts = request_path.strip("/").split("/")
|
|
if len(parts) != 4 or parts[0] != "proxy" or parts[1] != "hls":
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
return
|
|
token, encoded_url = parts[2], parts[3]
|
|
try:
|
|
target_url = decode_proxy_target(encoded_url)
|
|
except (ValueError, UnicodeDecodeError):
|
|
self.send_error(HTTPStatus.BAD_REQUEST)
|
|
return
|
|
parsed = urlparse(target_url)
|
|
if parsed.scheme not in ("http", "https"):
|
|
self.send_error(HTTPStatus.BAD_REQUEST)
|
|
return
|
|
if not hmac.compare_digest(token, hls_proxy_host_token(parsed.netloc)):
|
|
self.send_error(HTTPStatus.FORBIDDEN)
|
|
return
|
|
|
|
try:
|
|
req = Request(target_url, headers={"User-Agent": "StreamHall/1.0"})
|
|
with urlopen(req, timeout=STREAM_PROBE_TIMEOUT if parsed.path.lower().endswith(".m3u8") else 20) as resp:
|
|
content_type = resp.headers.get("Content-Type") or mimetypes.guess_type(parsed.path)[0] or "application/octet-stream"
|
|
is_manifest = parsed.path.lower().endswith(".m3u8") or "mpegurl" in content_type.lower()
|
|
if is_manifest:
|
|
body = resp.read(2 * 1024 * 1024)
|
|
content = rewrite_external_hls_manifest(decode_probe_text(body), target_url).encode("utf-8")
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "application/vnd.apple.mpegurl; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(content)))
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
if send_body:
|
|
self.wfile.write(content)
|
|
return
|
|
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
if not send_body:
|
|
return
|
|
while True:
|
|
chunk = resp.read(65536)
|
|
if not chunk:
|
|
break
|
|
self.wfile.write(chunk)
|
|
except HTTPError as exc:
|
|
self.send_error(HTTPStatus(exc.code) if exc.code in HTTPStatus._value2member_map_ else HTTPStatus.BAD_GATEWAY)
|
|
except (URLError, TimeoutError, OSError):
|
|
self.send_error(HTTPStatus.BAD_GATEWAY)
|
|
|
|
def serve_static(self, request_path: str, send_body: bool = True) -> None:
|
|
routes = {
|
|
"/": "index.html",
|
|
"/index.html": "index.html",
|
|
"/player.html": "player.html",
|
|
"/admin": "admin.html",
|
|
"/admin.html": "admin.html",
|
|
}
|
|
rel = routes.get(request_path, request_path.lstrip("/"))
|
|
target = (PUBLIC_DIR / rel).resolve()
|
|
try:
|
|
target.relative_to(PUBLIC_DIR.resolve())
|
|
except ValueError:
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
return
|
|
if not target.exists() or not target.is_file():
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
return
|
|
content = target.read_bytes()
|
|
mime = mimetypes.guess_type(str(target))[0] or "application/octet-stream"
|
|
if target.suffix == ".js":
|
|
mime = "text/javascript"
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", f"{mime}; charset=utf-8" if mime.startswith("text/") else mime)
|
|
self.send_header("Content-Length", str(len(content)))
|
|
self.send_header("Cache-Control", "no-cache" if target.suffix == ".html" else "public, max-age=86400")
|
|
self.send_header("X-Content-Type-Options", "nosniff")
|
|
self.end_headers()
|
|
if send_body:
|
|
self.wfile.write(content)
|
|
|
|
|
|
_SESSION_STALE_SECS = 75 # 5 missed 15s heartbeats → consider ended
|
|
|
|
|
|
def cleanup_stale_sessions_loop() -> None:
|
|
while True:
|
|
time.sleep(120)
|
|
try:
|
|
cutoff = now() - _SESSION_STALE_SECS
|
|
with db() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE viewer_sessions
|
|
SET ended_at = last_seen_at, is_active = 0, play_state = 'ended'
|
|
WHERE is_active = 1 AND last_seen_at < ?
|
|
""",
|
|
(cutoff,),
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main() -> None:
|
|
init_db()
|
|
threading.Thread(target=monitor_streams_loop, daemon=True).start()
|
|
threading.Thread(target=cleanup_stale_sessions_loop, daemon=True).start()
|
|
host = os.getenv("HOST", "0.0.0.0")
|
|
port = int(os.getenv("PORT", "8080"))
|
|
httpd = ThreadingHTTPServer((host, port), StreamHallHandler)
|
|
print(f"StreamHall listening on http://{host}:{port}")
|
|
httpd.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|