diff --git a/docker-compose.yml b/docker-compose.yml
index 0b15120..6a254a1 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -22,6 +22,11 @@ services:
- ./videos:/app/videos
# Mount additional directories as needed:
# - /local/path:/app/media/label
+ logging:
+ driver: json-file
+ options:
+ max-size: "10m"
+ max-file: "3"
postgres:
image: postgres:16-alpine
diff --git a/public/admin.html b/public/admin.html
index 557d320..cf310b2 100644
--- a/public/admin.html
+++ b/public/admin.html
@@ -452,7 +452,8 @@
min-height: 42px;
}
- .link-row .link-drm-config {
+ .link-row .link-drm-config,
+ .link-row .link-upstream-auth {
grid-column: 2 / -2;
border: 1px solid var(--line);
border-radius: 7px;
@@ -460,7 +461,8 @@
overflow: hidden;
}
- .link-row .link-drm-config > summary {
+ .link-row .link-drm-config > summary,
+ .link-row .link-upstream-auth > summary {
display: flex;
align-items: center;
justify-content: space-between;
@@ -473,7 +475,8 @@
list-style: none;
}
- .link-row .link-drm-config > summary::-webkit-details-marker {
+ .link-row .link-drm-config > summary::-webkit-details-marker,
+ .link-row .link-upstream-auth > summary::-webkit-details-marker {
display: none;
}
@@ -487,6 +490,26 @@
letter-spacing: 0.04em;
}
+ .link-row .link-upstream-auth > summary::after {
+ content: "Cookie";
+ border-radius: 999px;
+ padding: 2px 7px;
+ background: var(--line);
+ color: var(--text);
+ font-size: 0.68rem;
+ letter-spacing: 0.04em;
+ }
+
+ .link-row .link-upstream-auth > .upstream-auth-body {
+ padding: 0 11px 11px;
+ }
+
+ .link-row .link-upstream-auth > .upstream-auth-body textarea {
+ width: 100%;
+ box-sizing: border-box;
+ min-height: 60px;
+ }
+
.link-row .link-drm-grid {
display: grid;
grid-template-columns: 0.7fr 1.5fr;
@@ -604,6 +627,12 @@
background: rgba(220, 38, 38, 0.08);
}
+ .stream-check-status.is-warning,
+ .stream-live-state.is-warning {
+ color: #b45309;
+ background: rgba(245, 158, 11, 0.1);
+ }
+
:root[data-theme="dark"] .stream-check-status.is-online,
:root[data-theme="dark"] .stream-live-state.is-online {
color: var(--mint);
@@ -614,6 +643,11 @@
color: #fb7185;
}
+ :root[data-theme="dark"] .stream-check-status.is-warning,
+ :root[data-theme="dark"] .stream-live-state.is-warning {
+ color: #fbbf24;
+ }
+
.obs-grid {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
@@ -1047,6 +1081,7 @@
}
.link-row .link-drm-config,
+ .link-row .link-upstream-auth,
.link-row .link-drm-wide {
grid-column: 1 / -1;
}
@@ -2042,6 +2077,7 @@
'form.link_name': '视角名称',
'form.link_type': '类型',
'form.proxy_mode': '代理模式',
+ 'form.upstream_cookie': '上游 Cookie',
'form.link_url': '播放链接',
'form.key_override':'Key Override',
'form.clearkey': 'ClearKey 信息',
@@ -2101,6 +2137,7 @@
'ph.link_name': '视角名',
'ph.link_url': '链接 (m3u8/flv/mpd)',
'ph.proxy_mode': '代理模式',
+ 'ph.upstream_cookie': '粘贴 Cookie 字符串,如 CloudFront-Key-Pair-Id=xxx; CloudFront-Policy=yyy; CloudFront-Signature=zzz',
'ph.main_url_disabled':'已使用 DRM 专用播放链接',
'ph.key_aes': 'AES-128 Key Hex,可多行: main-video=hex',
'ph.clearkey': 'ClearKey 信息,如 {"kid":"key"}',
@@ -2156,6 +2193,7 @@
'probe.no_info': '没有监测到推流信息',
'probe.detecting': '正在检测推流信息...',
'probe.detected': '已检测到推流信息',
+ 'probe.cookie_proxy_mismatch': '已检测到推流信号,但当前代理模式不支持 Cookie 转发,观众将无法正常播放,请改为完整代理模式',
'probe.waiting': '等待自动检测...',
'probe.closed': '直播已关闭',
'probe.drm_config_missing':'检测到 DRM 流,但缺少匹配的 DRM 配置',
@@ -2463,6 +2501,7 @@
'form.link_name': 'Name',
'form.link_type': 'Type',
'form.proxy_mode': 'Proxy mode',
+ 'form.upstream_cookie': 'Upstream Cookie',
'form.link_url': 'Playback URL',
'form.key_override':'Key Override',
'form.clearkey': 'ClearKey',
@@ -2522,6 +2561,7 @@
'ph.link_name': 'Source name',
'ph.link_url': 'URL (m3u8/flv/mpd)',
'ph.proxy_mode': 'Proxy mode',
+ 'ph.upstream_cookie': 'Paste cookie string, e.g. CloudFront-Key-Pair-Id=xxx; CloudFront-Policy=yyy; CloudFront-Signature=zzz',
'ph.main_url_disabled':'Using DRM-specific playback URL',
'ph.key_aes': 'AES-128 Key Hex, multi-line: main-video=hex',
'ph.clearkey': 'ClearKey JSON, e.g. {"kid":"key"}',
@@ -2577,6 +2617,7 @@
'probe.no_info': 'No stream detected',
'probe.detecting': 'Detecting stream...',
'probe.detected': 'Stream detected',
+ 'probe.cookie_proxy_mismatch': 'Stream signal detected, but the current proxy mode does not forward cookies — viewers will not be able to play. Switch to Full Proxy mode.',
'probe.waiting': 'Waiting for detection...',
'probe.closed': 'Stream disabled',
'probe.drm_config_missing':'DRM stream detected, but matching DRM config is missing',
@@ -3101,11 +3142,12 @@
};
};
- const probeUrl = async (url, type = '', drmConfigs = []) => {
+ const probeUrl = async (url, type = '', drmConfigs = [], upstreamCookie = '') => {
const res = await apiCall('check_stream_url', {
url,
type: inferLinkType(url, type),
- drmConfigs
+ drmConfigs,
+ upstreamCookie
});
return res.data || { valid: false, message: t('probe.no_info') };
};
@@ -3123,12 +3165,15 @@
row.dataset.probeActive = '1';
if (!silent) setProbeStatus(statusEl, 'is-checking', t('probe.detecting'));
try {
- const result = await probeUrl(url, type, getRowDrmConfigs(row));
+ const upstreamCookie = row.querySelector('.l-upstream-cookie')?.value.trim() || '';
+ const result = await probeUrl(url, type, getRowDrmConfigs(row), upstreamCookie);
if (!row.isConnected || row.dataset.probeToken !== token) return;
+ const proxyMode = row.querySelector('.l-proxy-mode')?.value || 'auto';
+ const cookieMismatch = result.valid && upstreamCookie && (proxyMode === 'direct' || proxyMode === 'manifest');
setProbeStatus(
statusEl,
- result.valid ? 'is-online' : 'is-offline',
- result.valid ? t('probe.detected') : (t('probe.' + result.code) || t('probe.no_info'))
+ cookieMismatch ? 'is-warning' : (result.valid ? 'is-online' : 'is-offline'),
+ cookieMismatch ? t('probe.cookie_proxy_mismatch') : (result.valid ? t('probe.detected') : (t('probe.' + result.code) || t('probe.no_info')))
);
} catch (e) {
if (row.isConnected && row.dataset.probeToken === token) {
@@ -3156,11 +3201,24 @@
});
};
- const applyProbeResult = (streamId, result) => {
+ const hasCookieProxyMismatch = (stream) => {
+ if (!stream?.links_json) return false;
+ try {
+ const links = JSON.parse(stream.links_json);
+ return Array.isArray(links) && links.some(l => {
+ const cookie = (l.upstreamCookie || l.upstream_cookie || '').trim();
+ const mode = (l.proxyMode || l.proxy_mode || 'auto').toLowerCase();
+ return cookie && (mode === 'direct' || mode === 'manifest');
+ });
+ } catch { return false; }
+ };
+
+ const applyProbeResult = (streamId, result, stream = null) => {
+ const mismatch = result.valid && stream && hasCookieProxyMismatch(stream);
setSavedProbeStatus(
streamId,
- result.valid ? 'is-online' : 'is-offline',
- result.valid ? t('probe.detected') : (t('probe.' + result.code) || t('probe.no_info'))
+ mismatch ? 'is-warning' : (result.valid ? 'is-online' : 'is-offline'),
+ mismatch ? t('probe.cookie_proxy_mismatch') : (result.valid ? t('probe.detected') : (t('probe.' + result.code) || t('probe.no_info')))
);
};
@@ -3172,7 +3230,7 @@
if (showChecking) setSavedProbeStatus(stream.id, 'is-checking', t('probe.detecting'));
try {
const res = await apiCall('check_stream', { id: stream.id });
- applyProbeResult(stream.id, res.data || { valid: false });
+ applyProbeResult(stream.id, res.data || { valid: false }, stream);
} catch (e) {
setSavedProbeStatus(stream.id, 'is-offline', t('probe.no_info'));
}
@@ -3194,7 +3252,7 @@
try {
const res = await apiCall('check_stream', { id: stream.id });
const result = res.data || { valid: false };
- applyProbeResult(stream.id, result);
+ applyProbeResult(stream.id, result, stream);
} catch (e) {
setSavedProbeStatus(stream.id, 'is-offline', t('probe.no_info'));
}
@@ -4207,6 +4265,7 @@
name: row.querySelector('.l-name').value,
type: row.querySelector('.l-type').value,
proxyMode: row.querySelector('.l-proxy-mode')?.value || 'auto',
+ upstreamCookie: row.querySelector('.l-upstream-cookie')?.value.trim() || '',
url: fallbackUrl,
key: row.querySelector('.l-key').value.trim(),
clearkey: row.querySelector('.l-clearkey').value.trim(),
@@ -4323,7 +4382,7 @@
handle.addEventListener('pointercancel', finish);
};
- const addLinkUI = (name = 'Default', url = '', key = '', clearkey = '', type = '', drmType = '', licenseUrl = '', licenseHeaders = '', pssh = '', certificateUrl = '', drmConfigs = [], proxyMode = 'auto') => {
+ const addLinkUI = (name = 'Default', url = '', key = '', clearkey = '', type = '', drmType = '', licenseUrl = '', licenseHeaders = '', pssh = '', certificateUrl = '', drmConfigs = [], proxyMode = 'auto', upstreamCookie = '') => {
const rawDrmType = String(drmType || '').toLowerCase();
const normalizedDrmType = rawDrmType === 'widevine' || rawDrmType === 'fairplay' ? rawDrmType : '';
const normalizedProxyMode = ['auto', 'direct', 'full', 'manifest'].includes(String(proxyMode || '').toLowerCase()) ? String(proxyMode || '').toLowerCase() : 'auto';
@@ -4378,6 +4437,12 @@
${t('form.clearkey')}
+
+ ${t('form.upstream_cookie')}
+
+
+
+
${t('drm.config')}
@@ -4754,7 +4819,7 @@
els.cancelBtn.classList.remove('hidden');
els.linksContainer.innerHTML = '';
const links = JSON.parse(stream.links_json || '[]');
- links.forEach(l => addLinkUI(l.name, l.url, l.key, l.clearkey, l.type, l.drmType || l.drm_type || '', l.licenseUrl || l.license_url || '', l.licenseHeaders || l.license_headers || '', l.pssh || '', l.certificateUrl || l.certificate_url || '', l.drmConfigs || l.drm_configs || [], l.proxyMode || l.proxy_mode || 'auto'));
+ links.forEach(l => addLinkUI(l.name, l.url, l.key, l.clearkey, l.type, l.drmType || l.drm_type || '', l.licenseUrl || l.license_url || '', l.licenseHeaders || l.license_headers || '', l.pssh || '', l.certificateUrl || l.certificate_url || '', l.drmConfigs || l.drm_configs || [], l.proxyMode || l.proxy_mode || 'auto', l.upstreamCookie || l.upstream_cookie || ''));
if (links.length === 0) addLinkUI();
} else {
resetForm();
diff --git a/server.py b/server.py
index 6333b36..b12d414 100644
--- a/server.py
+++ b/server.py
@@ -2,6 +2,8 @@
from __future__ import annotations
import base64
+import http.client
+import queue as _queue_mod
import socket
import csv
import hashlib
@@ -61,8 +63,11 @@ PASSWORD_HASH_ITERATIONS = 240000
INITIAL_ADMIN_PASSWORD_BYTES = 18
PUBLIC_ID_BYTES = 9
STREAM_PROBE_TIMEOUT = float(os.getenv("STREAM_PROBE_TIMEOUT", "4"))
+HLS_PROXY_TIMEOUT = float(os.getenv("HLS_PROXY_TIMEOUT", "15"))
TELEGRAM_TIMEOUT = float(os.getenv("TELEGRAM_TIMEOUT", "6"))
STREAM_MONITOR_INTERVAL = max(5, int(os.getenv("STREAM_MONITOR_INTERVAL", "10")))
+TG_RECONNECT_GRACE_SECS = max(0, int(os.getenv("TG_RECONNECT_GRACE_SECS", "60")))
+TG_START_MERGE_SECS = max(0, int(os.getenv("TG_START_MERGE_SECS", "30")))
SRS_HTTP_ORIGIN = os.getenv("SRS_HTTP_ORIGIN", "http://srs:8080").rstrip("/")
OBS_ROUTE_SLUG_LENGTH = max(12, int(os.getenv("OBS_ROUTE_SLUG_LENGTH", "22")))
URL_PATH_SAFE = "/._~!$&'()*+,;=:@"
@@ -91,6 +96,18 @@ VIDEO_EXTS = frozenset({".mp4", ".mkv", ".avi", ".flv", ".ts", ".mov", ".wmv", "
active_pushes: dict[str, dict] = {}
_pushes_lock = threading.Lock()
+_pending_stop_timers: dict[int, threading.Timer] = {}
+_pending_stop_lock = threading.Lock()
+_pending_start_timers: dict[int, tuple[threading.Timer, list[str]]] = {}
+_pending_start_lock = threading.Lock()
+_upstream_pools: dict[tuple, _queue_mod.Queue] = {} # (scheme, host, port) -> Queue[conn]
+_upstream_pools_lock = threading.Lock()
+_UPSTREAM_POOL_SIZE = 4
+
+_proxy_cookie_tokens: dict[str, str] = {} # token_id -> cookie string (server-side only)
+_proxy_cookie_refs: dict[str, str] = {} # cookie -> cookie_ref (reverse map for stable proxy URLs)
+_proxy_cookie_tokens_lock = threading.Lock()
+_PROXY_COOKIE_TOKEN_MAX = 5000
HLS_PROXY_PREFIX = "/proxy/hls"
HLS_MANIFEST_PROXY_PREFIX = "/proxy/hls-manifest"
VIEWER_TOKEN_TTL = 300 # seconds
@@ -336,6 +353,7 @@ def init_stats_tables(conn) -> None:
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_viewer_events_stream_time ON viewer_events(stream_id, event_at)")
+ conn.execute("ALTER TABLE stream_probe_states ADD COLUMN IF NOT EXISTS live_links_json TEXT NOT NULL DEFAULT '[]'")
@@ -425,7 +443,104 @@ def decode_proxy_target(value: str) -> str:
return base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
-def hls_proxy_url_token(url: str) -> str:
+def _upstream_conn_borrow(scheme: str, host: str, port: int, timeout: float) -> http.client.HTTPConnection:
+ key = (scheme, host, port)
+ with _upstream_pools_lock:
+ if key not in _upstream_pools:
+ _upstream_pools[key] = _queue_mod.Queue(maxsize=_UPSTREAM_POOL_SIZE)
+ pool = _upstream_pools[key]
+ try:
+ conn = pool.get_nowait()
+ conn.timeout = timeout
+ return conn
+ except _queue_mod.Empty:
+ if scheme == "https":
+ return http.client.HTTPSConnection(host, port, timeout=timeout)
+ return http.client.HTTPConnection(host, port, timeout=timeout)
+
+
+def _upstream_conn_return(scheme: str, host: str, port: int, conn: http.client.HTTPConnection) -> None:
+ key = (scheme, host, port)
+ with _upstream_pools_lock:
+ pool = _upstream_pools.get(key)
+ if pool is not None:
+ try:
+ pool.put_nowait(conn)
+ return
+ except _queue_mod.Full:
+ pass
+ try:
+ conn.close()
+ except Exception:
+ pass
+
+
+def _upstream_fetch(
+ url: str,
+ headers: dict[str, str],
+ timeout: float,
+) -> tuple[http.client.HTTPResponse, http.client.HTTPConnection, str, str, int]:
+ """Fetch URL via a pooled persistent connection.
+ Returns (response, conn, scheme, host, port).
+ Caller must fully consume the response body, then call _upstream_conn_return().
+ On any error the connection is closed automatically."""
+ parsed = urlparse(url)
+ scheme = parsed.scheme.lower()
+ host = parsed.hostname or ""
+ port = int(parsed.port or (443 if scheme == "https" else 80))
+ path = parsed.path or "/"
+ if parsed.query:
+ path = f"{path}?{parsed.query}"
+
+ for attempt in range(2):
+ conn = _upstream_conn_borrow(scheme, host, port, timeout)
+ try:
+ conn.request("GET", path, headers=headers)
+ resp = conn.getresponse()
+ return resp, conn, scheme, host, port
+ except Exception:
+ try:
+ conn.close()
+ except Exception:
+ pass
+ if attempt == 0:
+ continue
+ raise
+
+
+def _issue_proxy_cookie_token(cookie: str) -> str:
+ """Return a stable signed opaque reference for this cookie, creating one if needed.
+ The same cookie string always returns the same ref so HLS segment proxy URLs remain
+ stable across manifest re-fetches, preventing spurious ABR resets."""
+ with _proxy_cookie_tokens_lock:
+ if cookie in _proxy_cookie_refs:
+ return _proxy_cookie_refs[cookie]
+ token_id = secrets.token_urlsafe(16)
+ if len(_proxy_cookie_tokens) >= _PROXY_COOKIE_TOKEN_MAX:
+ evicted_id = next(iter(_proxy_cookie_tokens))
+ evicted_cookie = _proxy_cookie_tokens.pop(evicted_id)
+ _proxy_cookie_refs.pop(evicted_cookie, None)
+ _proxy_cookie_tokens[token_id] = cookie
+ ref = f"{token_id}.{sign(f'proxy-cookie-token:{token_id}')}"
+ _proxy_cookie_refs[cookie] = ref
+ return ref
+
+
+def _resolve_proxy_cookie_token(ref: str) -> str:
+ """Verify ref signature and return the stored cookie, or '' if invalid/not found."""
+ if "." not in ref:
+ return ""
+ token_id, sig = ref.split(".", 1)
+ if not hmac.compare_digest(sig, sign(f"proxy-cookie-token:{token_id}")):
+ return ""
+ with _proxy_cookie_tokens_lock:
+ return _proxy_cookie_tokens.get(token_id, "")
+
+
+def hls_proxy_url_token(url: str, cookie_ref: str = "") -> str:
+ # cookie_ref is an opaque token issued by _issue_proxy_cookie_token, not the raw cookie.
+ if cookie_ref:
+ return sign(f"hls-proxy-url:{url}:{cookie_ref}")
return sign(f"hls-proxy-url:{url}")
@@ -453,8 +568,13 @@ def verify_viewer_token(token: str, stream_id: int) -> bool:
return hmac.compare_digest(sig, sign(f"viewer-token:{payload}"))
-def hls_proxy_path(url: str) -> str:
+def hls_proxy_path(url: str, upstream_cookie: str = "") -> str:
encoded = encode_proxy_target(url)
+ if upstream_cookie:
+ cookie_ref = _issue_proxy_cookie_token(upstream_cookie)
+ token = hls_proxy_url_token(url, cookie_ref)
+ encoded_ref = base64.urlsafe_b64encode(cookie_ref.encode("utf-8")).decode("ascii").rstrip("=")
+ return f"{HLS_PROXY_PREFIX}/{token}/{encoded}/{encoded_ref}"
return f"{HLS_PROXY_PREFIX}/{hls_proxy_url_token(url)}/{encoded}"
@@ -477,7 +597,8 @@ def playback_url_for_mode(url: str, link: dict[str, object], proxy_mode: str) ->
return url
if mode == "manifest":
return hls_manifest_proxy_path(url)
- return hls_proxy_path(url)
+ upstream_cookie = str(link.get("upstreamCookie", link.get("upstream_cookie", ""))).strip()
+ return hls_proxy_path(url, upstream_cookie)
PLAYABLE_VIDEO_EXTS = frozenset({".mp4", ".mkv", ".mov", ".webm", ".m4v"})
@@ -540,6 +661,8 @@ def add_playback_urls(links: list[dict[str, object]]) -> list[dict[str, object]]
config_item["playback_url"] = playback_url_for_mode(drm_playback_url, probe_item, proxy_mode)
prepared_configs.append(config_item)
item["drmConfigs"] = prepared_configs
+ # Cookie is already embedded in playback_url; strip it from viewer-facing data.
+ item.pop("upstreamCookie", None)
prepared.append(item)
return prepared
@@ -672,6 +795,7 @@ def normalize_links(raw: object) -> list[dict[str, object]]:
"type": link_type,
"url": url,
"proxyMode": proxy_mode,
+ "upstreamCookie": str(item.get("upstreamCookie", item.get("upstream_cookie", ""))).strip(),
"key": str(item.get("key", "")).strip(),
"clearkey": str(item.get("clearkey", "")).strip(),
"drmConfigs": drm_configs,
@@ -1145,6 +1269,7 @@ def probe_stream_url(
raw_url: object,
type_hint: object = "",
drm_configs: object | None = None,
+ upstream_cookie: str = "",
) -> dict[str, object]:
url = str(raw_url or "").strip()
if not url:
@@ -1166,6 +1291,8 @@ def probe_stream_url(
"User-Agent": "StreamHall/1.0",
"Accept": "*/*",
}
+ if upstream_cookie:
+ headers["Cookie"] = upstream_cookie
if not is_hls and not is_dash:
headers["Range"] = "bytes=0-4095"
@@ -1468,15 +1595,25 @@ def probe_stream_links(row: dict[str, object]) -> dict[str, object]:
links = normalize_links(json.loads(row["links_json"] or "[]"))
except json.JSONDecodeError:
links = []
+ probe_all = int(row.get("tg_notify_enabled", 0) or 0) == 1
+ all_live_names: list[str] = []
+ first_result: dict[str, object] | None = None
for index, link in enumerate(links):
- result = probe_stream_url(link["url"], link.get("type", ""), link.get("drmConfigs", []))
+ result = probe_stream_url(link["url"], link.get("type", ""), link.get("drmConfigs", []), link.get("upstreamCookie", ""))
if result["valid"]:
- return {
- **result,
- "index": index,
- "url": link["url"],
- "link_name": link["name"],
- }
+ all_live_names.append(link["name"])
+ if first_result is None:
+ first_result = {
+ **result,
+ "index": index,
+ "url": link["url"],
+ "link_name": link["name"],
+ }
+ if not probe_all:
+ break
+ if first_result is not None:
+ first_result["all_live_names"] = all_live_names
+ return first_result
return stream_probe_response(False)
@@ -1518,6 +1655,32 @@ def notification_context_for_row(
}
+def _send_tg_live_notification(
+ row: dict[str, object],
+ is_live: bool,
+ probe_result: dict[str, object],
+ headers: object | None = None,
+) -> None:
+ if int(row["tg_notify_enabled"] or 0) != 1:
+ return
+ settings = telegram_settings()
+ label = normalize_stream_label(row.get("stream_label", "LIVE")).lower()
+ notify_key = f"telegram_{label}_notify_{'start' if is_live else 'stop'}"
+ template_key = f"telegram_{label}_{'start' if is_live else 'stop'}_template"
+ status = "start" if is_live else "stop"
+ if settings.get(notify_key, "0") != "1":
+ return
+ text = render_message_template(
+ settings.get(template_key, ""),
+ notification_context_for_row(row, probe_result, status, headers, settings),
+ )
+ stream_id = int(row["id"])
+ try:
+ send_telegram_message(settings, text)
+ except Exception as exc:
+ print(f"Telegram notification failed for stream {stream_id}: {exc}")
+
+
def maybe_notify_stream_transition(
row: dict[str, object],
is_live: bool,
@@ -1525,50 +1688,110 @@ def maybe_notify_stream_transition(
headers: object | None = None,
) -> None:
stream_id = int(row["id"])
+ current_live_names: list[str] = probe_result.get("all_live_names", []) if is_live else []
+ if is_live and not current_live_names and probe_result.get("link_name"):
+ current_live_names = [str(probe_result["link_name"])]
+
with db() as conn:
previous = conn.execute(
- "SELECT is_live FROM stream_probe_states WHERE stream_id = ?", (stream_id,)
+ "SELECT is_live, live_links_json FROM stream_probe_states WHERE stream_id = ?", (stream_id,)
).fetchone()
- previous_live = None if previous is None else bool(previous["is_live"])
+ previous_is_live = None if previous is None else bool(previous["is_live"])
+ try:
+ previous_live_names: set[str] = set(json.loads((previous["live_links_json"] or "[]") if previous else "[]"))
+ except (json.JSONDecodeError, TypeError):
+ previous_live_names = set()
conn.execute(
"""
- INSERT INTO stream_probe_states (stream_id, is_live, updated_at)
- VALUES (?, ?, ?)
+ INSERT INTO stream_probe_states (stream_id, is_live, live_links_json, updated_at)
+ VALUES (?, ?, ?, ?)
ON CONFLICT(stream_id) DO UPDATE SET
is_live = excluded.is_live,
+ live_links_json = excluded.live_links_json,
updated_at = excluded.updated_at
""",
- (stream_id, 1 if is_live else 0, now()),
+ (stream_id, 1 if is_live else 0, json.dumps(current_live_names), now()),
)
- if previous_live == is_live:
- return
- if previous_live is None and not is_live:
+
+ if previous_is_live is None and not is_live:
return
if int(row["tg_notify_enabled"] or 0) != 1:
return
- settings = telegram_settings()
- label = normalize_stream_label(row.get("stream_label", "LIVE")).lower()
- if is_live:
- notify_key = f"telegram_{label}_notify_start"
- template_key = f"telegram_{label}_start_template"
- status = "start"
- else:
- notify_key = f"telegram_{label}_notify_stop"
- template_key = f"telegram_{label}_stop_template"
- status = "stop"
- if settings.get(notify_key, "0") != "1":
- return
- template = settings.get(template_key, "")
+ newly_live = [n for n in current_live_names if n not in previous_live_names]
- text = render_message_template(
- template,
- notification_context_for_row(row, probe_result, status, headers, settings),
- )
- try:
- send_telegram_message(settings, text)
- except Exception as exc:
- print(f"Telegram notification failed for stream {stream_id}: {exc}")
+ if is_live:
+ with _pending_stop_lock:
+ stop_timer = _pending_stop_timers.pop(stream_id, None)
+ if stop_timer is not None:
+ stop_timer.cancel()
+ return
+ if not newly_live:
+ return
+ # Schedule start notification with merge window
+ if TG_START_MERGE_SECS <= 0:
+ merged = dict(probe_result, link_name=" & ".join(newly_live))
+ _send_tg_live_notification(row, True, merged, headers)
+ return
+
+ with _pending_start_lock:
+ existing = _pending_start_timers.get(stream_id)
+ if existing:
+ existing[0].cancel()
+ all_names = list(dict.fromkeys(existing[1] + newly_live))
+ else:
+ all_names = list(newly_live)
+
+ def _fire_start() -> None:
+ with _pending_start_lock:
+ entry = _pending_start_timers.get(stream_id)
+ if entry is None or entry[0] is not _the_start_timer:
+ return
+ del _pending_start_timers[stream_id]
+ merged = dict(probe_result, link_name=" & ".join(all_names))
+ _send_tg_live_notification(row, True, merged, headers)
+
+ _the_start_timer = threading.Timer(TG_START_MERGE_SECS, _fire_start)
+ _the_start_timer.daemon = True
+ _pending_start_timers[stream_id] = (_the_start_timer, all_names)
+ _the_start_timer.start()
+ else:
+ with _pending_start_lock:
+ start_entry = _pending_start_timers.pop(stream_id, None)
+ if start_entry:
+ start_entry[0].cancel()
+
+ if not previous_is_live:
+ return
+
+ if TG_RECONNECT_GRACE_SECS <= 0:
+ _send_tg_live_notification(row, False, probe_result, headers)
+ return
+
+ def _delayed_stop() -> None:
+ with _pending_stop_lock:
+ if _pending_stop_timers.get(stream_id) is not _the_timer:
+ return
+ del _pending_stop_timers[stream_id]
+ try:
+ with db() as conn:
+ state = conn.execute(
+ "SELECT is_live FROM stream_probe_states WHERE stream_id = ?", (stream_id,)
+ ).fetchone()
+ if state and bool(state["is_live"]):
+ return
+ except Exception:
+ pass
+ _send_tg_live_notification(row, False, probe_result, headers)
+
+ _the_timer = threading.Timer(TG_RECONNECT_GRACE_SECS, _delayed_stop)
+ _the_timer.daemon = True
+ with _pending_stop_lock:
+ old = _pending_stop_timers.pop(stream_id, None)
+ if old is not None:
+ old.cancel()
+ _pending_stop_timers[stream_id] = _the_timer
+ _the_timer.start()
def notify_current_live_if_needed(stream_id: int, headers: object | None = None) -> None:
@@ -1586,11 +1809,25 @@ def notify_current_live_if_needed(stream_id: int, headers: object | None = None)
label = normalize_stream_label(row.get("stream_label", "LIVE")).lower()
if settings.get(f"telegram_{label}_notify_start", "0") != "1":
return
+ live_names = result.get("all_live_names", [result["link_name"]] if result.get("link_name") else [])
+ link_name = " & ".join(live_names) if live_names else result.get("link_name", "")
text = render_message_template(
settings.get(f"telegram_{label}_start_template", ""),
- notification_context_for_row(row, result, "start", headers, settings),
+ notification_context_for_row(row, dict(result, link_name=link_name), "start", headers, settings),
)
+ # Cancel any start timer the monitor may have already scheduled,
+ # so it doesn't fire a merged notification that re-includes these links.
+ with _pending_start_lock:
+ entry = _pending_start_timers.pop(stream_id, None)
+ if entry:
+ entry[0].cancel()
send_telegram_message(settings, text)
+ # Mark these links as already-notified so the monitor won't re-send.
+ with db() as conn:
+ conn.execute(
+ "UPDATE stream_probe_states SET live_links_json = ? WHERE stream_id = ?",
+ (json.dumps(live_names), stream_id),
+ )
except Exception as exc:
print(f"Telegram current live notification failed for stream {stream_id}: {exc}")
@@ -1637,14 +1874,14 @@ def rewrite_hls_manifest(manifest: str, slug: str, stream_key: str) -> str:
return "\n".join(rewritten) + ("\n" if manifest.endswith("\n") else "")
-def rewrite_external_hls_manifest(manifest: str, base_url: str) -> str:
+def rewrite_external_hls_manifest(manifest: str, base_url: str, upstream_cookie: str = "") -> str:
# Rewrite all URLs in an external HLS manifest (segment lines and URI="..."
# attributes such as EXT-X-KEY) to route through the signed /proxy/hls/
# endpoint, enabling cross-origin playback and key override in the player.
def proxied_uri(value: str) -> str:
if not value or value.startswith(("data:", "skd:")):
return value
- return hls_proxy_path(urljoin(base_url, value))
+ return hls_proxy_path(urljoin(base_url, value), upstream_cookie)
rewritten: list[str] = []
for line in manifest.splitlines():
@@ -1686,10 +1923,15 @@ def monitor_streams_loop() -> None:
try:
with db() as conn:
rows = conn.execute("SELECT * FROM streams ORDER BY id DESC").fetchall()
- for row in rows:
- check_stream_row_live(row)
except Exception as exc:
- print(f"Stream monitor failed: {exc}")
+ print(f"Stream monitor: failed to fetch streams: {exc}", flush=True)
+ time.sleep(STREAM_MONITOR_INTERVAL)
+ continue
+ for row in rows:
+ try:
+ check_stream_row_live(row)
+ except Exception as exc:
+ print(f"Stream monitor: error on stream {row.get('id')}: {exc}", flush=True)
time.sleep(STREAM_MONITOR_INTERVAL)
@@ -1697,7 +1939,13 @@ class StreamHallHandler(BaseHTTPRequestHandler):
server_version = "StreamHall/1.0"
def log_message(self, fmt: str, *args: object) -> None:
- print(f"{self.address_string()} - {fmt % args}")
+ raw_path = self.path if hasattr(self, "path") else ""
+ path = raw_path.split("?", 1)[0]
+ if path.startswith(("/h/", "/proxy/hls/", "/proxy/hls-manifest/")):
+ return
+ if any(k in raw_path for k in ("viewer_heartbeat", "check_player_stream", "check_stream", "stream_stats_summary")):
+ return
+ print(f"{self.address_string()} - {fmt % args}", flush=True)
def do_GET(self) -> None:
parsed = urlparse(self.path)
@@ -2503,12 +2751,18 @@ class StreamHallHandler(BaseHTTPRequestHandler):
if cur.rowcount == 0:
raise AppError("stream_not_found")
if enabled and row:
- headers = dict(self.headers)
- threading.Thread(
- target=notify_current_live_if_needed,
- args=(stream_id, headers),
- daemon=True,
- ).start()
+ # Reset live_links_json and cancel any pending start timer so the monitor's
+ # next cycle treats all live links as newly-live via the merge window.
+ # Using a single notification path eliminates duplicate-send races.
+ with db() as conn:
+ conn.execute(
+ "UPDATE stream_probe_states SET live_links_json = '[]' WHERE stream_id = ?",
+ (stream_id,),
+ )
+ with _pending_start_lock:
+ entry = _pending_start_timers.pop(stream_id, None)
+ if entry:
+ entry[0].cancel()
self.send_json({"status": "success", "enabled": enabled})
def api_reorder_streams(self) -> None:
@@ -2578,7 +2832,12 @@ class StreamHallHandler(BaseHTTPRequestHandler):
def api_check_stream_url(self) -> None:
body = self.read_json()
- result = probe_stream_url(body.get("url", ""), body.get("type", ""), body.get("drmConfigs", body.get("drm_configs", [])))
+ result = probe_stream_url(
+ body.get("url", ""),
+ body.get("type", ""),
+ body.get("drmConfigs", body.get("drm_configs", [])),
+ str(body.get("upstreamCookie", body.get("upstream_cookie", "")) or "").strip(),
+ )
self.send_json({"status": "success", "data": result})
def api_discover_drm(self) -> None:
@@ -3240,7 +3499,7 @@ class StreamHallHandler(BaseHTTPRequestHandler):
upstream_url = f"{SRS_HTTP_ORIGIN}{upstream_path}{query}"
try:
req = Request(upstream_url, headers={"User-Agent": "StreamHall/1.0"})
- opener = urlopen(req, timeout=STREAM_PROBE_TIMEOUT) if rewrite_manifest else urlopen(req)
+ opener = urlopen(req, timeout=HLS_PROXY_TIMEOUT) if rewrite_manifest else urlopen(req)
with opener as resp:
if rewrite_manifest:
body = resp.read(1024 * 1024)
@@ -3274,14 +3533,28 @@ class StreamHallHandler(BaseHTTPRequestHandler):
self.send_error(HTTPStatus.BAD_GATEWAY)
def proxy_hls_route(self, request_path: str, send_body: bool = True) -> None:
- # URL format: /proxy/hls//
- # The token is an HMAC of the full target URL; it ensures that only URLs
- # generated by hls_proxy_path() can be proxied, preventing open-proxy abuse.
+ # URL format: /proxy/hls//[/]
+ # When a cookie is needed, the last segment is a server-issued opaque token reference
+ # (token_id.hmac_sig encoded as base64). The actual cookie is stored server-side only
+ # and never appears in the URL, preventing extraction from browser network logs.
parts = request_path.strip("/").split("/")
- if len(parts) != 4 or parts[0] != "proxy" or parts[1] != "hls":
+ if parts[:2] != ["proxy", "hls"]:
+ self.send_error(HTTPStatus.NOT_FOUND)
+ return
+ if len(parts) == 4:
+ token, encoded_url = parts[2], parts[3]
+ cookie_ref = ""
+ elif len(parts) == 5:
+ token, encoded_url, encoded_ref = parts[2], parts[3], parts[4]
+ try:
+ padded = encoded_ref + "=" * (-len(encoded_ref) % 4)
+ cookie_ref = base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
+ except Exception:
+ self.send_error(HTTPStatus.BAD_REQUEST)
+ return
+ else:
self.send_error(HTTPStatus.NOT_FOUND)
return
- token, encoded_url = parts[2], parts[3]
try:
target_url = decode_proxy_target(encoded_url)
except (ValueError, UnicodeDecodeError):
@@ -3291,49 +3564,80 @@ class StreamHallHandler(BaseHTTPRequestHandler):
if parsed.scheme not in ("http", "https"):
self.send_error(HTTPStatus.BAD_REQUEST)
return
- if not hmac.compare_digest(token, hls_proxy_url_token(target_url)):
+ if not hmac.compare_digest(token, hls_proxy_url_token(target_url, cookie_ref)):
self.send_error(HTTPStatus.FORBIDDEN)
return
+ upstream_cookie = ""
+ if cookie_ref:
+ upstream_cookie = _resolve_proxy_cookie_token(cookie_ref)
+ if not upstream_cookie:
+ # Token not found — server may have restarted; player page needs reload.
+ self.send_error(HTTPStatus.FORBIDDEN)
+ return
try:
reject_private_http_url(target_url)
except AppError:
self.send_error(HTTPStatus.FORBIDDEN)
return
+ upstream_headers: dict[str, str] = {"User-Agent": "StreamHall/1.0"}
+ if upstream_cookie:
+ upstream_headers["Cookie"] = upstream_cookie
try:
- req = Request(target_url, headers={"User-Agent": "StreamHall/1.0"})
- with urlopen(req, timeout=STREAM_PROBE_TIMEOUT if parsed.path.lower().endswith(".m3u8") else 20) as resp:
- content_type = resp.headers.get("Content-Type") or mimetypes.guess_type(parsed.path)[0] or "application/octet-stream"
- is_manifest = parsed.path.lower().endswith(".m3u8") or "mpegurl" in content_type.lower()
- if is_manifest:
- body = resp.read(2 * 1024 * 1024)
- content = rewrite_external_hls_manifest(decode_probe_text(body), target_url).encode("utf-8")
- self.send_response(HTTPStatus.OK)
- self.send_header("Content-Type", "application/vnd.apple.mpegurl; charset=utf-8")
- self.send_header("Content-Length", str(len(content)))
- self.send_header("Cache-Control", "no-cache")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.end_headers()
- if send_body:
- self.wfile.write(content)
- return
-
- self.send_response(HTTPStatus.OK)
- self.send_header("Content-Type", content_type)
- self.send_header("Cache-Control", "no-cache")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.end_headers()
- if not send_body:
- return
- while True:
- chunk = resp.read(65536)
- if not chunk:
- break
- self.wfile.write(chunk)
- except HTTPError as exc:
- self.send_error(HTTPStatus(exc.code) if exc.code in HTTPStatus._value2member_map_ else HTTPStatus.BAD_GATEWAY)
- except (URLError, TimeoutError, OSError):
+ resp, conn, _scheme, _host, _port = _upstream_fetch(target_url, upstream_headers, HLS_PROXY_TIMEOUT)
+ except (http.client.HTTPException, OSError, TimeoutError):
self.send_error(HTTPStatus.BAD_GATEWAY)
+ return
+
+ if resp.status >= 400:
+ resp.read()
+ _upstream_conn_return(_scheme, _host, _port, conn)
+ self.send_error(HTTPStatus(resp.status) if resp.status in HTTPStatus._value2member_map_ else HTTPStatus.BAD_GATEWAY)
+ return
+
+ content_type = resp.getheader("Content-Type") or mimetypes.guess_type(parsed.path)[0] or "application/octet-stream"
+ is_manifest = parsed.path.lower().endswith(".m3u8") or "mpegurl" in content_type.lower()
+
+ if is_manifest:
+ body = resp.read(2 * 1024 * 1024)
+ _upstream_conn_return(_scheme, _host, _port, conn)
+ content = rewrite_external_hls_manifest(decode_probe_text(body), target_url, upstream_cookie).encode("utf-8")
+ self.send_response(HTTPStatus.OK)
+ self.send_header("Content-Type", "application/vnd.apple.mpegurl; charset=utf-8")
+ self.send_header("Content-Length", str(len(content)))
+ self.send_header("Cache-Control", "no-cache")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.end_headers()
+ if send_body:
+ self.wfile.write(content)
+ return
+
+ self.send_response(HTTPStatus.OK)
+ self.send_header("Content-Type", content_type)
+ self.send_header("Cache-Control", "no-cache")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.end_headers()
+ if not send_body:
+ resp.read()
+ _upstream_conn_return(_scheme, _host, _port, conn)
+ return
+ conn_returned = False
+ try:
+ while True:
+ chunk = resp.read(65536)
+ if not chunk:
+ break
+ self.wfile.write(chunk)
+ _upstream_conn_return(_scheme, _host, _port, conn)
+ conn_returned = True
+ except OSError:
+ pass
+ finally:
+ if not conn_returned:
+ try:
+ conn.close()
+ except Exception:
+ pass
def proxy_hls_manifest_route(self, request_path: str, send_body: bool = True) -> None:
# URL format: /proxy/hls-manifest//