feat: upstream cookie proxy, HLS connection pool, multi-link TG notifications
Build and Push Docker Image / build (push) Successful in 15s

- Add upstream Cookie support for HLS full-proxy mode (CloudFront signed cookies
  stored server-side as opaque tokens; never exposed in proxy URLs)
- Add HTTP connection pool for HLS proxy upstream requests to avoid per-request
  TLS handshake overhead; introduce HLS_PROXY_TIMEOUT separate from probe timeout
- Add per-link TG start notification with 30s merge window: each newly-live link
  fires independently, links that come online within the window are merged into
  one message with names joined by ' & '
- Fix TG reconnect grace period (TG_RECONNECT_GRACE_SECS=60): suppress both
  stop and start notifications for brief RTMP disconnects
- Fix stream probe to check all links for TG-enabled streams; non-TG streams
  still stop at first valid link to avoid unnecessary probes
- Filter high-frequency HTTP access log entries (HLS segments, heartbeat, etc.)
- Add json-file logging driver config to docker-compose for reliable log access
This commit is contained in:
Stardream
2026-05-31 00:58:44 +10:00
parent 8e1ed10ba5
commit 6d39c512d7
3 changed files with 481 additions and 107 deletions
+5
View File
@@ -22,6 +22,11 @@ services:
- ./videos:/app/videos
# Mount additional directories as needed:
# - /local/path:/app/media/label
logging:
driver: json-file
options:
max-size: "10m"
max-file: "3"
postgres:
image: postgres:16-alpine
+80 -15
View File
@@ -452,7 +452,8 @@
min-height: 42px;
}
.link-row .link-drm-config {
.link-row .link-drm-config,
.link-row .link-upstream-auth {
grid-column: 2 / -2;
border: 1px solid var(--line);
border-radius: 7px;
@@ -460,7 +461,8 @@
overflow: hidden;
}
.link-row .link-drm-config > summary {
.link-row .link-drm-config > summary,
.link-row .link-upstream-auth > summary {
display: flex;
align-items: center;
justify-content: space-between;
@@ -473,7 +475,8 @@
list-style: none;
}
.link-row .link-drm-config > summary::-webkit-details-marker {
.link-row .link-drm-config > summary::-webkit-details-marker,
.link-row .link-upstream-auth > summary::-webkit-details-marker {
display: none;
}
@@ -487,6 +490,26 @@
letter-spacing: 0.04em;
}
.link-row .link-upstream-auth > summary::after {
content: "Cookie";
border-radius: 999px;
padding: 2px 7px;
background: var(--line);
color: var(--text);
font-size: 0.68rem;
letter-spacing: 0.04em;
}
.link-row .link-upstream-auth > .upstream-auth-body {
padding: 0 11px 11px;
}
.link-row .link-upstream-auth > .upstream-auth-body textarea {
width: 100%;
box-sizing: border-box;
min-height: 60px;
}
.link-row .link-drm-grid {
display: grid;
grid-template-columns: 0.7fr 1.5fr;
@@ -604,6 +627,12 @@
background: rgba(220, 38, 38, 0.08);
}
.stream-check-status.is-warning,
.stream-live-state.is-warning {
color: #b45309;
background: rgba(245, 158, 11, 0.1);
}
:root[data-theme="dark"] .stream-check-status.is-online,
:root[data-theme="dark"] .stream-live-state.is-online {
color: var(--mint);
@@ -614,6 +643,11 @@
color: #fb7185;
}
:root[data-theme="dark"] .stream-check-status.is-warning,
:root[data-theme="dark"] .stream-live-state.is-warning {
color: #fbbf24;
}
.obs-grid {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
@@ -1047,6 +1081,7 @@
}
.link-row .link-drm-config,
.link-row .link-upstream-auth,
.link-row .link-drm-wide {
grid-column: 1 / -1;
}
@@ -2042,6 +2077,7 @@
'form.link_name': '视角名称',
'form.link_type': '类型',
'form.proxy_mode': '代理模式',
'form.upstream_cookie': '上游 Cookie',
'form.link_url': '播放链接',
'form.key_override':'Key Override',
'form.clearkey': 'ClearKey 信息',
@@ -2101,6 +2137,7 @@
'ph.link_name': '视角名',
'ph.link_url': '链接 (m3u8/flv/mpd)',
'ph.proxy_mode': '代理模式',
'ph.upstream_cookie': '粘贴 Cookie 字符串,如 CloudFront-Key-Pair-Id=xxx; CloudFront-Policy=yyy; CloudFront-Signature=zzz',
'ph.main_url_disabled':'已使用 DRM 专用播放链接',
'ph.key_aes': 'AES-128 Key Hex,可多行: main-video=hex',
'ph.clearkey': 'ClearKey 信息,如 {"kid":"key"}',
@@ -2156,6 +2193,7 @@
'probe.no_info': '没有监测到推流信息',
'probe.detecting': '正在检测推流信息...',
'probe.detected': '已检测到推流信息',
'probe.cookie_proxy_mismatch': '已检测到推流信号,但当前代理模式不支持 Cookie 转发,观众将无法正常播放,请改为完整代理模式',
'probe.waiting': '等待自动检测...',
'probe.closed': '直播已关闭',
'probe.drm_config_missing':'检测到 DRM 流,但缺少匹配的 DRM 配置',
@@ -2463,6 +2501,7 @@
'form.link_name': 'Name',
'form.link_type': 'Type',
'form.proxy_mode': 'Proxy mode',
'form.upstream_cookie': 'Upstream Cookie',
'form.link_url': 'Playback URL',
'form.key_override':'Key Override',
'form.clearkey': 'ClearKey',
@@ -2522,6 +2561,7 @@
'ph.link_name': 'Source name',
'ph.link_url': 'URL (m3u8/flv/mpd)',
'ph.proxy_mode': 'Proxy mode',
'ph.upstream_cookie': 'Paste cookie string, e.g. CloudFront-Key-Pair-Id=xxx; CloudFront-Policy=yyy; CloudFront-Signature=zzz',
'ph.main_url_disabled':'Using DRM-specific playback URL',
'ph.key_aes': 'AES-128 Key Hex, multi-line: main-video=hex',
'ph.clearkey': 'ClearKey JSON, e.g. {"kid":"key"}',
@@ -2577,6 +2617,7 @@
'probe.no_info': 'No stream detected',
'probe.detecting': 'Detecting stream...',
'probe.detected': 'Stream detected',
'probe.cookie_proxy_mismatch': 'Stream signal detected, but the current proxy mode does not forward cookies — viewers will not be able to play. Switch to Full Proxy mode.',
'probe.waiting': 'Waiting for detection...',
'probe.closed': 'Stream disabled',
'probe.drm_config_missing':'DRM stream detected, but matching DRM config is missing',
@@ -3101,11 +3142,12 @@
};
};
const probeUrl = async (url, type = '', drmConfigs = []) => {
const probeUrl = async (url, type = '', drmConfigs = [], upstreamCookie = '') => {
const res = await apiCall('check_stream_url', {
url,
type: inferLinkType(url, type),
drmConfigs
drmConfigs,
upstreamCookie
});
return res.data || { valid: false, message: t('probe.no_info') };
};
@@ -3123,12 +3165,15 @@
row.dataset.probeActive = '1';
if (!silent) setProbeStatus(statusEl, 'is-checking', t('probe.detecting'));
try {
const result = await probeUrl(url, type, getRowDrmConfigs(row));
const upstreamCookie = row.querySelector('.l-upstream-cookie')?.value.trim() || '';
const result = await probeUrl(url, type, getRowDrmConfigs(row), upstreamCookie);
if (!row.isConnected || row.dataset.probeToken !== token) return;
const proxyMode = row.querySelector('.l-proxy-mode')?.value || 'auto';
const cookieMismatch = result.valid && upstreamCookie && (proxyMode === 'direct' || proxyMode === 'manifest');
setProbeStatus(
statusEl,
result.valid ? 'is-online' : 'is-offline',
result.valid ? t('probe.detected') : (t('probe.' + result.code) || t('probe.no_info'))
cookieMismatch ? 'is-warning' : (result.valid ? 'is-online' : 'is-offline'),
cookieMismatch ? t('probe.cookie_proxy_mismatch') : (result.valid ? t('probe.detected') : (t('probe.' + result.code) || t('probe.no_info')))
);
} catch (e) {
if (row.isConnected && row.dataset.probeToken === token) {
@@ -3156,11 +3201,24 @@
});
};
const applyProbeResult = (streamId, result) => {
const hasCookieProxyMismatch = (stream) => {
if (!stream?.links_json) return false;
try {
const links = JSON.parse(stream.links_json);
return Array.isArray(links) && links.some(l => {
const cookie = (l.upstreamCookie || l.upstream_cookie || '').trim();
const mode = (l.proxyMode || l.proxy_mode || 'auto').toLowerCase();
return cookie && (mode === 'direct' || mode === 'manifest');
});
} catch { return false; }
};
const applyProbeResult = (streamId, result, stream = null) => {
const mismatch = result.valid && stream && hasCookieProxyMismatch(stream);
setSavedProbeStatus(
streamId,
result.valid ? 'is-online' : 'is-offline',
result.valid ? t('probe.detected') : (t('probe.' + result.code) || t('probe.no_info'))
mismatch ? 'is-warning' : (result.valid ? 'is-online' : 'is-offline'),
mismatch ? t('probe.cookie_proxy_mismatch') : (result.valid ? t('probe.detected') : (t('probe.' + result.code) || t('probe.no_info')))
);
};
@@ -3172,7 +3230,7 @@
if (showChecking) setSavedProbeStatus(stream.id, 'is-checking', t('probe.detecting'));
try {
const res = await apiCall('check_stream', { id: stream.id });
applyProbeResult(stream.id, res.data || { valid: false });
applyProbeResult(stream.id, res.data || { valid: false }, stream);
} catch (e) {
setSavedProbeStatus(stream.id, 'is-offline', t('probe.no_info'));
}
@@ -3194,7 +3252,7 @@
try {
const res = await apiCall('check_stream', { id: stream.id });
const result = res.data || { valid: false };
applyProbeResult(stream.id, result);
applyProbeResult(stream.id, result, stream);
} catch (e) {
setSavedProbeStatus(stream.id, 'is-offline', t('probe.no_info'));
}
@@ -4207,6 +4265,7 @@
name: row.querySelector('.l-name').value,
type: row.querySelector('.l-type').value,
proxyMode: row.querySelector('.l-proxy-mode')?.value || 'auto',
upstreamCookie: row.querySelector('.l-upstream-cookie')?.value.trim() || '',
url: fallbackUrl,
key: row.querySelector('.l-key').value.trim(),
clearkey: row.querySelector('.l-clearkey').value.trim(),
@@ -4323,7 +4382,7 @@
handle.addEventListener('pointercancel', finish);
};
const addLinkUI = (name = 'Default', url = '', key = '', clearkey = '', type = '', drmType = '', licenseUrl = '', licenseHeaders = '', pssh = '', certificateUrl = '', drmConfigs = [], proxyMode = 'auto') => {
const addLinkUI = (name = 'Default', url = '', key = '', clearkey = '', type = '', drmType = '', licenseUrl = '', licenseHeaders = '', pssh = '', certificateUrl = '', drmConfigs = [], proxyMode = 'auto', upstreamCookie = '') => {
const rawDrmType = String(drmType || '').toLowerCase();
const normalizedDrmType = rawDrmType === 'widevine' || rawDrmType === 'fairplay' ? rawDrmType : '';
const normalizedProxyMode = ['auto', 'direct', 'full', 'manifest'].includes(String(proxyMode || '').toLowerCase()) ? String(proxyMode || '').toLowerCase() : 'auto';
@@ -4378,6 +4437,12 @@
<div class="link-field-label">${t('form.clearkey')}</div>
<textarea class="l-clearkey" rows="2" placeholder="${t('ph.clearkey')}">${escapeHtml(clearkey)}</textarea>
</div>
<details class="link-upstream-auth" ${upstreamCookie ? 'open' : ''}>
<summary>${t('form.upstream_cookie')}</summary>
<div class="upstream-auth-body">
<textarea class="l-upstream-cookie" rows="3" placeholder="${t('ph.upstream_cookie')}">${escapeHtml(upstreamCookie)}</textarea>
</div>
</details>
<details class="link-drm-config" ${normalizedDrmConfigs.length ? 'open' : ''}>
<summary>${t('drm.config')}</summary>
<div class="link-drm-list"></div>
@@ -4754,7 +4819,7 @@
els.cancelBtn.classList.remove('hidden');
els.linksContainer.innerHTML = '';
const links = JSON.parse(stream.links_json || '[]');
links.forEach(l => addLinkUI(l.name, l.url, l.key, l.clearkey, l.type, l.drmType || l.drm_type || '', l.licenseUrl || l.license_url || '', l.licenseHeaders || l.license_headers || '', l.pssh || '', l.certificateUrl || l.certificate_url || '', l.drmConfigs || l.drm_configs || [], l.proxyMode || l.proxy_mode || 'auto'));
links.forEach(l => addLinkUI(l.name, l.url, l.key, l.clearkey, l.type, l.drmType || l.drm_type || '', l.licenseUrl || l.license_url || '', l.licenseHeaders || l.license_headers || '', l.pssh || '', l.certificateUrl || l.certificate_url || '', l.drmConfigs || l.drm_configs || [], l.proxyMode || l.proxy_mode || 'auto', l.upstreamCookie || l.upstream_cookie || ''));
if (links.length === 0) addLinkUI();
} else {
resetForm();
+396 -92
View File
@@ -2,6 +2,8 @@
from __future__ import annotations
import base64
import http.client
import queue as _queue_mod
import socket
import csv
import hashlib
@@ -61,8 +63,11 @@ PASSWORD_HASH_ITERATIONS = 240000
INITIAL_ADMIN_PASSWORD_BYTES = 18
PUBLIC_ID_BYTES = 9
STREAM_PROBE_TIMEOUT = float(os.getenv("STREAM_PROBE_TIMEOUT", "4"))
HLS_PROXY_TIMEOUT = float(os.getenv("HLS_PROXY_TIMEOUT", "15"))
TELEGRAM_TIMEOUT = float(os.getenv("TELEGRAM_TIMEOUT", "6"))
STREAM_MONITOR_INTERVAL = max(5, int(os.getenv("STREAM_MONITOR_INTERVAL", "10")))
TG_RECONNECT_GRACE_SECS = max(0, int(os.getenv("TG_RECONNECT_GRACE_SECS", "60")))
TG_START_MERGE_SECS = max(0, int(os.getenv("TG_START_MERGE_SECS", "30")))
SRS_HTTP_ORIGIN = os.getenv("SRS_HTTP_ORIGIN", "http://srs:8080").rstrip("/")
OBS_ROUTE_SLUG_LENGTH = max(12, int(os.getenv("OBS_ROUTE_SLUG_LENGTH", "22")))
URL_PATH_SAFE = "/._~!$&'()*+,;=:@"
@@ -91,6 +96,18 @@ VIDEO_EXTS = frozenset({".mp4", ".mkv", ".avi", ".flv", ".ts", ".mov", ".wmv", "
active_pushes: dict[str, dict] = {}
_pushes_lock = threading.Lock()
_pending_stop_timers: dict[int, threading.Timer] = {}
_pending_stop_lock = threading.Lock()
_pending_start_timers: dict[int, tuple[threading.Timer, list[str]]] = {}
_pending_start_lock = threading.Lock()
_upstream_pools: dict[tuple, _queue_mod.Queue] = {} # (scheme, host, port) -> Queue[conn]
_upstream_pools_lock = threading.Lock()
_UPSTREAM_POOL_SIZE = 4
_proxy_cookie_tokens: dict[str, str] = {} # token_id -> cookie string (server-side only)
_proxy_cookie_refs: dict[str, str] = {} # cookie -> cookie_ref (reverse map for stable proxy URLs)
_proxy_cookie_tokens_lock = threading.Lock()
_PROXY_COOKIE_TOKEN_MAX = 5000
HLS_PROXY_PREFIX = "/proxy/hls"
HLS_MANIFEST_PROXY_PREFIX = "/proxy/hls-manifest"
VIEWER_TOKEN_TTL = 300 # seconds
@@ -336,6 +353,7 @@ def init_stats_tables(conn) -> None:
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_viewer_events_stream_time ON viewer_events(stream_id, event_at)")
conn.execute("ALTER TABLE stream_probe_states ADD COLUMN IF NOT EXISTS live_links_json TEXT NOT NULL DEFAULT '[]'")
@@ -425,7 +443,104 @@ def decode_proxy_target(value: str) -> str:
return base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
def hls_proxy_url_token(url: str) -> str:
def _upstream_conn_borrow(scheme: str, host: str, port: int, timeout: float) -> http.client.HTTPConnection:
key = (scheme, host, port)
with _upstream_pools_lock:
if key not in _upstream_pools:
_upstream_pools[key] = _queue_mod.Queue(maxsize=_UPSTREAM_POOL_SIZE)
pool = _upstream_pools[key]
try:
conn = pool.get_nowait()
conn.timeout = timeout
return conn
except _queue_mod.Empty:
if scheme == "https":
return http.client.HTTPSConnection(host, port, timeout=timeout)
return http.client.HTTPConnection(host, port, timeout=timeout)
def _upstream_conn_return(scheme: str, host: str, port: int, conn: http.client.HTTPConnection) -> None:
key = (scheme, host, port)
with _upstream_pools_lock:
pool = _upstream_pools.get(key)
if pool is not None:
try:
pool.put_nowait(conn)
return
except _queue_mod.Full:
pass
try:
conn.close()
except Exception:
pass
def _upstream_fetch(
url: str,
headers: dict[str, str],
timeout: float,
) -> tuple[http.client.HTTPResponse, http.client.HTTPConnection, str, str, int]:
"""Fetch URL via a pooled persistent connection.
Returns (response, conn, scheme, host, port).
Caller must fully consume the response body, then call _upstream_conn_return().
On any error the connection is closed automatically."""
parsed = urlparse(url)
scheme = parsed.scheme.lower()
host = parsed.hostname or ""
port = int(parsed.port or (443 if scheme == "https" else 80))
path = parsed.path or "/"
if parsed.query:
path = f"{path}?{parsed.query}"
for attempt in range(2):
conn = _upstream_conn_borrow(scheme, host, port, timeout)
try:
conn.request("GET", path, headers=headers)
resp = conn.getresponse()
return resp, conn, scheme, host, port
except Exception:
try:
conn.close()
except Exception:
pass
if attempt == 0:
continue
raise
def _issue_proxy_cookie_token(cookie: str) -> str:
"""Return a stable signed opaque reference for this cookie, creating one if needed.
The same cookie string always returns the same ref so HLS segment proxy URLs remain
stable across manifest re-fetches, preventing spurious ABR resets."""
with _proxy_cookie_tokens_lock:
if cookie in _proxy_cookie_refs:
return _proxy_cookie_refs[cookie]
token_id = secrets.token_urlsafe(16)
if len(_proxy_cookie_tokens) >= _PROXY_COOKIE_TOKEN_MAX:
evicted_id = next(iter(_proxy_cookie_tokens))
evicted_cookie = _proxy_cookie_tokens.pop(evicted_id)
_proxy_cookie_refs.pop(evicted_cookie, None)
_proxy_cookie_tokens[token_id] = cookie
ref = f"{token_id}.{sign(f'proxy-cookie-token:{token_id}')}"
_proxy_cookie_refs[cookie] = ref
return ref
def _resolve_proxy_cookie_token(ref: str) -> str:
"""Verify ref signature and return the stored cookie, or '' if invalid/not found."""
if "." not in ref:
return ""
token_id, sig = ref.split(".", 1)
if not hmac.compare_digest(sig, sign(f"proxy-cookie-token:{token_id}")):
return ""
with _proxy_cookie_tokens_lock:
return _proxy_cookie_tokens.get(token_id, "")
def hls_proxy_url_token(url: str, cookie_ref: str = "") -> str:
# cookie_ref is an opaque token issued by _issue_proxy_cookie_token, not the raw cookie.
if cookie_ref:
return sign(f"hls-proxy-url:{url}:{cookie_ref}")
return sign(f"hls-proxy-url:{url}")
@@ -453,8 +568,13 @@ def verify_viewer_token(token: str, stream_id: int) -> bool:
return hmac.compare_digest(sig, sign(f"viewer-token:{payload}"))
def hls_proxy_path(url: str) -> str:
def hls_proxy_path(url: str, upstream_cookie: str = "") -> str:
encoded = encode_proxy_target(url)
if upstream_cookie:
cookie_ref = _issue_proxy_cookie_token(upstream_cookie)
token = hls_proxy_url_token(url, cookie_ref)
encoded_ref = base64.urlsafe_b64encode(cookie_ref.encode("utf-8")).decode("ascii").rstrip("=")
return f"{HLS_PROXY_PREFIX}/{token}/{encoded}/{encoded_ref}"
return f"{HLS_PROXY_PREFIX}/{hls_proxy_url_token(url)}/{encoded}"
@@ -477,7 +597,8 @@ def playback_url_for_mode(url: str, link: dict[str, object], proxy_mode: str) ->
return url
if mode == "manifest":
return hls_manifest_proxy_path(url)
return hls_proxy_path(url)
upstream_cookie = str(link.get("upstreamCookie", link.get("upstream_cookie", ""))).strip()
return hls_proxy_path(url, upstream_cookie)
PLAYABLE_VIDEO_EXTS = frozenset({".mp4", ".mkv", ".mov", ".webm", ".m4v"})
@@ -540,6 +661,8 @@ def add_playback_urls(links: list[dict[str, object]]) -> list[dict[str, object]]
config_item["playback_url"] = playback_url_for_mode(drm_playback_url, probe_item, proxy_mode)
prepared_configs.append(config_item)
item["drmConfigs"] = prepared_configs
# Cookie is already embedded in playback_url; strip it from viewer-facing data.
item.pop("upstreamCookie", None)
prepared.append(item)
return prepared
@@ -672,6 +795,7 @@ def normalize_links(raw: object) -> list[dict[str, object]]:
"type": link_type,
"url": url,
"proxyMode": proxy_mode,
"upstreamCookie": str(item.get("upstreamCookie", item.get("upstream_cookie", ""))).strip(),
"key": str(item.get("key", "")).strip(),
"clearkey": str(item.get("clearkey", "")).strip(),
"drmConfigs": drm_configs,
@@ -1145,6 +1269,7 @@ def probe_stream_url(
raw_url: object,
type_hint: object = "",
drm_configs: object | None = None,
upstream_cookie: str = "",
) -> dict[str, object]:
url = str(raw_url or "").strip()
if not url:
@@ -1166,6 +1291,8 @@ def probe_stream_url(
"User-Agent": "StreamHall/1.0",
"Accept": "*/*",
}
if upstream_cookie:
headers["Cookie"] = upstream_cookie
if not is_hls and not is_dash:
headers["Range"] = "bytes=0-4095"
@@ -1468,15 +1595,25 @@ def probe_stream_links(row: dict[str, object]) -> dict[str, object]:
links = normalize_links(json.loads(row["links_json"] or "[]"))
except json.JSONDecodeError:
links = []
probe_all = int(row.get("tg_notify_enabled", 0) or 0) == 1
all_live_names: list[str] = []
first_result: dict[str, object] | None = None
for index, link in enumerate(links):
result = probe_stream_url(link["url"], link.get("type", ""), link.get("drmConfigs", []))
result = probe_stream_url(link["url"], link.get("type", ""), link.get("drmConfigs", []), link.get("upstreamCookie", ""))
if result["valid"]:
return {
**result,
"index": index,
"url": link["url"],
"link_name": link["name"],
}
all_live_names.append(link["name"])
if first_result is None:
first_result = {
**result,
"index": index,
"url": link["url"],
"link_name": link["name"],
}
if not probe_all:
break
if first_result is not None:
first_result["all_live_names"] = all_live_names
return first_result
return stream_probe_response(False)
@@ -1518,6 +1655,32 @@ def notification_context_for_row(
}
def _send_tg_live_notification(
row: dict[str, object],
is_live: bool,
probe_result: dict[str, object],
headers: object | None = None,
) -> None:
if int(row["tg_notify_enabled"] or 0) != 1:
return
settings = telegram_settings()
label = normalize_stream_label(row.get("stream_label", "LIVE")).lower()
notify_key = f"telegram_{label}_notify_{'start' if is_live else 'stop'}"
template_key = f"telegram_{label}_{'start' if is_live else 'stop'}_template"
status = "start" if is_live else "stop"
if settings.get(notify_key, "0") != "1":
return
text = render_message_template(
settings.get(template_key, ""),
notification_context_for_row(row, probe_result, status, headers, settings),
)
stream_id = int(row["id"])
try:
send_telegram_message(settings, text)
except Exception as exc:
print(f"Telegram notification failed for stream {stream_id}: {exc}")
def maybe_notify_stream_transition(
row: dict[str, object],
is_live: bool,
@@ -1525,50 +1688,110 @@ def maybe_notify_stream_transition(
headers: object | None = None,
) -> None:
stream_id = int(row["id"])
current_live_names: list[str] = probe_result.get("all_live_names", []) if is_live else []
if is_live and not current_live_names and probe_result.get("link_name"):
current_live_names = [str(probe_result["link_name"])]
with db() as conn:
previous = conn.execute(
"SELECT is_live FROM stream_probe_states WHERE stream_id = ?", (stream_id,)
"SELECT is_live, live_links_json FROM stream_probe_states WHERE stream_id = ?", (stream_id,)
).fetchone()
previous_live = None if previous is None else bool(previous["is_live"])
previous_is_live = None if previous is None else bool(previous["is_live"])
try:
previous_live_names: set[str] = set(json.loads((previous["live_links_json"] or "[]") if previous else "[]"))
except (json.JSONDecodeError, TypeError):
previous_live_names = set()
conn.execute(
"""
INSERT INTO stream_probe_states (stream_id, is_live, updated_at)
VALUES (?, ?, ?)
INSERT INTO stream_probe_states (stream_id, is_live, live_links_json, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(stream_id) DO UPDATE SET
is_live = excluded.is_live,
live_links_json = excluded.live_links_json,
updated_at = excluded.updated_at
""",
(stream_id, 1 if is_live else 0, now()),
(stream_id, 1 if is_live else 0, json.dumps(current_live_names), now()),
)
if previous_live == is_live:
return
if previous_live is None and not is_live:
if previous_is_live is None and not is_live:
return
if int(row["tg_notify_enabled"] or 0) != 1:
return
settings = telegram_settings()
label = normalize_stream_label(row.get("stream_label", "LIVE")).lower()
if is_live:
notify_key = f"telegram_{label}_notify_start"
template_key = f"telegram_{label}_start_template"
status = "start"
else:
notify_key = f"telegram_{label}_notify_stop"
template_key = f"telegram_{label}_stop_template"
status = "stop"
if settings.get(notify_key, "0") != "1":
return
template = settings.get(template_key, "")
newly_live = [n for n in current_live_names if n not in previous_live_names]
text = render_message_template(
template,
notification_context_for_row(row, probe_result, status, headers, settings),
)
try:
send_telegram_message(settings, text)
except Exception as exc:
print(f"Telegram notification failed for stream {stream_id}: {exc}")
if is_live:
with _pending_stop_lock:
stop_timer = _pending_stop_timers.pop(stream_id, None)
if stop_timer is not None:
stop_timer.cancel()
return
if not newly_live:
return
# Schedule start notification with merge window
if TG_START_MERGE_SECS <= 0:
merged = dict(probe_result, link_name=" & ".join(newly_live))
_send_tg_live_notification(row, True, merged, headers)
return
with _pending_start_lock:
existing = _pending_start_timers.get(stream_id)
if existing:
existing[0].cancel()
all_names = list(dict.fromkeys(existing[1] + newly_live))
else:
all_names = list(newly_live)
def _fire_start() -> None:
with _pending_start_lock:
entry = _pending_start_timers.get(stream_id)
if entry is None or entry[0] is not _the_start_timer:
return
del _pending_start_timers[stream_id]
merged = dict(probe_result, link_name=" & ".join(all_names))
_send_tg_live_notification(row, True, merged, headers)
_the_start_timer = threading.Timer(TG_START_MERGE_SECS, _fire_start)
_the_start_timer.daemon = True
_pending_start_timers[stream_id] = (_the_start_timer, all_names)
_the_start_timer.start()
else:
with _pending_start_lock:
start_entry = _pending_start_timers.pop(stream_id, None)
if start_entry:
start_entry[0].cancel()
if not previous_is_live:
return
if TG_RECONNECT_GRACE_SECS <= 0:
_send_tg_live_notification(row, False, probe_result, headers)
return
def _delayed_stop() -> None:
with _pending_stop_lock:
if _pending_stop_timers.get(stream_id) is not _the_timer:
return
del _pending_stop_timers[stream_id]
try:
with db() as conn:
state = conn.execute(
"SELECT is_live FROM stream_probe_states WHERE stream_id = ?", (stream_id,)
).fetchone()
if state and bool(state["is_live"]):
return
except Exception:
pass
_send_tg_live_notification(row, False, probe_result, headers)
_the_timer = threading.Timer(TG_RECONNECT_GRACE_SECS, _delayed_stop)
_the_timer.daemon = True
with _pending_stop_lock:
old = _pending_stop_timers.pop(stream_id, None)
if old is not None:
old.cancel()
_pending_stop_timers[stream_id] = _the_timer
_the_timer.start()
def notify_current_live_if_needed(stream_id: int, headers: object | None = None) -> None:
@@ -1586,11 +1809,25 @@ def notify_current_live_if_needed(stream_id: int, headers: object | None = None)
label = normalize_stream_label(row.get("stream_label", "LIVE")).lower()
if settings.get(f"telegram_{label}_notify_start", "0") != "1":
return
live_names = result.get("all_live_names", [result["link_name"]] if result.get("link_name") else [])
link_name = " & ".join(live_names) if live_names else result.get("link_name", "")
text = render_message_template(
settings.get(f"telegram_{label}_start_template", ""),
notification_context_for_row(row, result, "start", headers, settings),
notification_context_for_row(row, dict(result, link_name=link_name), "start", headers, settings),
)
# Cancel any start timer the monitor may have already scheduled,
# so it doesn't fire a merged notification that re-includes these links.
with _pending_start_lock:
entry = _pending_start_timers.pop(stream_id, None)
if entry:
entry[0].cancel()
send_telegram_message(settings, text)
# Mark these links as already-notified so the monitor won't re-send.
with db() as conn:
conn.execute(
"UPDATE stream_probe_states SET live_links_json = ? WHERE stream_id = ?",
(json.dumps(live_names), stream_id),
)
except Exception as exc:
print(f"Telegram current live notification failed for stream {stream_id}: {exc}")
@@ -1637,14 +1874,14 @@ def rewrite_hls_manifest(manifest: str, slug: str, stream_key: str) -> str:
return "\n".join(rewritten) + ("\n" if manifest.endswith("\n") else "")
def rewrite_external_hls_manifest(manifest: str, base_url: str) -> str:
def rewrite_external_hls_manifest(manifest: str, base_url: str, upstream_cookie: str = "") -> str:
# Rewrite all URLs in an external HLS manifest (segment lines and URI="..."
# attributes such as EXT-X-KEY) to route through the signed /proxy/hls/
# endpoint, enabling cross-origin playback and key override in the player.
def proxied_uri(value: str) -> str:
if not value or value.startswith(("data:", "skd:")):
return value
return hls_proxy_path(urljoin(base_url, value))
return hls_proxy_path(urljoin(base_url, value), upstream_cookie)
rewritten: list[str] = []
for line in manifest.splitlines():
@@ -1686,10 +1923,15 @@ def monitor_streams_loop() -> None:
try:
with db() as conn:
rows = conn.execute("SELECT * FROM streams ORDER BY id DESC").fetchall()
for row in rows:
check_stream_row_live(row)
except Exception as exc:
print(f"Stream monitor failed: {exc}")
print(f"Stream monitor: failed to fetch streams: {exc}", flush=True)
time.sleep(STREAM_MONITOR_INTERVAL)
continue
for row in rows:
try:
check_stream_row_live(row)
except Exception as exc:
print(f"Stream monitor: error on stream {row.get('id')}: {exc}", flush=True)
time.sleep(STREAM_MONITOR_INTERVAL)
@@ -1697,7 +1939,13 @@ class StreamHallHandler(BaseHTTPRequestHandler):
server_version = "StreamHall/1.0"
def log_message(self, fmt: str, *args: object) -> None:
print(f"{self.address_string()} - {fmt % args}")
raw_path = self.path if hasattr(self, "path") else ""
path = raw_path.split("?", 1)[0]
if path.startswith(("/h/", "/proxy/hls/", "/proxy/hls-manifest/")):
return
if any(k in raw_path for k in ("viewer_heartbeat", "check_player_stream", "check_stream", "stream_stats_summary")):
return
print(f"{self.address_string()} - {fmt % args}", flush=True)
def do_GET(self) -> None:
parsed = urlparse(self.path)
@@ -2503,12 +2751,18 @@ class StreamHallHandler(BaseHTTPRequestHandler):
if cur.rowcount == 0:
raise AppError("stream_not_found")
if enabled and row:
headers = dict(self.headers)
threading.Thread(
target=notify_current_live_if_needed,
args=(stream_id, headers),
daemon=True,
).start()
# Reset live_links_json and cancel any pending start timer so the monitor's
# next cycle treats all live links as newly-live via the merge window.
# Using a single notification path eliminates duplicate-send races.
with db() as conn:
conn.execute(
"UPDATE stream_probe_states SET live_links_json = '[]' WHERE stream_id = ?",
(stream_id,),
)
with _pending_start_lock:
entry = _pending_start_timers.pop(stream_id, None)
if entry:
entry[0].cancel()
self.send_json({"status": "success", "enabled": enabled})
def api_reorder_streams(self) -> None:
@@ -2578,7 +2832,12 @@ class StreamHallHandler(BaseHTTPRequestHandler):
def api_check_stream_url(self) -> None:
body = self.read_json()
result = probe_stream_url(body.get("url", ""), body.get("type", ""), body.get("drmConfigs", body.get("drm_configs", [])))
result = probe_stream_url(
body.get("url", ""),
body.get("type", ""),
body.get("drmConfigs", body.get("drm_configs", [])),
str(body.get("upstreamCookie", body.get("upstream_cookie", "")) or "").strip(),
)
self.send_json({"status": "success", "data": result})
def api_discover_drm(self) -> None:
@@ -3240,7 +3499,7 @@ class StreamHallHandler(BaseHTTPRequestHandler):
upstream_url = f"{SRS_HTTP_ORIGIN}{upstream_path}{query}"
try:
req = Request(upstream_url, headers={"User-Agent": "StreamHall/1.0"})
opener = urlopen(req, timeout=STREAM_PROBE_TIMEOUT) if rewrite_manifest else urlopen(req)
opener = urlopen(req, timeout=HLS_PROXY_TIMEOUT) if rewrite_manifest else urlopen(req)
with opener as resp:
if rewrite_manifest:
body = resp.read(1024 * 1024)
@@ -3274,14 +3533,28 @@ class StreamHallHandler(BaseHTTPRequestHandler):
self.send_error(HTTPStatus.BAD_GATEWAY)
def proxy_hls_route(self, request_path: str, send_body: bool = True) -> None:
# URL format: /proxy/hls/<token>/<base64-encoded-url>
# The token is an HMAC of the full target URL; it ensures that only URLs
# generated by hls_proxy_path() can be proxied, preventing open-proxy abuse.
# URL format: /proxy/hls/<token>/<base64-encoded-url>[/<base64-encoded-cookie-ref>]
# When a cookie is needed, the last segment is a server-issued opaque token reference
# (token_id.hmac_sig encoded as base64). The actual cookie is stored server-side only
# and never appears in the URL, preventing extraction from browser network logs.
parts = request_path.strip("/").split("/")
if len(parts) != 4 or parts[0] != "proxy" or parts[1] != "hls":
if parts[:2] != ["proxy", "hls"]:
self.send_error(HTTPStatus.NOT_FOUND)
return
if len(parts) == 4:
token, encoded_url = parts[2], parts[3]
cookie_ref = ""
elif len(parts) == 5:
token, encoded_url, encoded_ref = parts[2], parts[3], parts[4]
try:
padded = encoded_ref + "=" * (-len(encoded_ref) % 4)
cookie_ref = base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
except Exception:
self.send_error(HTTPStatus.BAD_REQUEST)
return
else:
self.send_error(HTTPStatus.NOT_FOUND)
return
token, encoded_url = parts[2], parts[3]
try:
target_url = decode_proxy_target(encoded_url)
except (ValueError, UnicodeDecodeError):
@@ -3291,49 +3564,80 @@ class StreamHallHandler(BaseHTTPRequestHandler):
if parsed.scheme not in ("http", "https"):
self.send_error(HTTPStatus.BAD_REQUEST)
return
if not hmac.compare_digest(token, hls_proxy_url_token(target_url)):
if not hmac.compare_digest(token, hls_proxy_url_token(target_url, cookie_ref)):
self.send_error(HTTPStatus.FORBIDDEN)
return
upstream_cookie = ""
if cookie_ref:
upstream_cookie = _resolve_proxy_cookie_token(cookie_ref)
if not upstream_cookie:
# Token not found — server may have restarted; player page needs reload.
self.send_error(HTTPStatus.FORBIDDEN)
return
try:
reject_private_http_url(target_url)
except AppError:
self.send_error(HTTPStatus.FORBIDDEN)
return
upstream_headers: dict[str, str] = {"User-Agent": "StreamHall/1.0"}
if upstream_cookie:
upstream_headers["Cookie"] = upstream_cookie
try:
req = Request(target_url, headers={"User-Agent": "StreamHall/1.0"})
with urlopen(req, timeout=STREAM_PROBE_TIMEOUT if parsed.path.lower().endswith(".m3u8") else 20) as resp:
content_type = resp.headers.get("Content-Type") or mimetypes.guess_type(parsed.path)[0] or "application/octet-stream"
is_manifest = parsed.path.lower().endswith(".m3u8") or "mpegurl" in content_type.lower()
if is_manifest:
body = resp.read(2 * 1024 * 1024)
content = rewrite_external_hls_manifest(decode_probe_text(body), target_url).encode("utf-8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "application/vnd.apple.mpegurl; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.send_header("Cache-Control", "no-cache")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
if send_body:
self.wfile.write(content)
return
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", content_type)
self.send_header("Cache-Control", "no-cache")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
if not send_body:
return
while True:
chunk = resp.read(65536)
if not chunk:
break
self.wfile.write(chunk)
except HTTPError as exc:
self.send_error(HTTPStatus(exc.code) if exc.code in HTTPStatus._value2member_map_ else HTTPStatus.BAD_GATEWAY)
except (URLError, TimeoutError, OSError):
resp, conn, _scheme, _host, _port = _upstream_fetch(target_url, upstream_headers, HLS_PROXY_TIMEOUT)
except (http.client.HTTPException, OSError, TimeoutError):
self.send_error(HTTPStatus.BAD_GATEWAY)
return
if resp.status >= 400:
resp.read()
_upstream_conn_return(_scheme, _host, _port, conn)
self.send_error(HTTPStatus(resp.status) if resp.status in HTTPStatus._value2member_map_ else HTTPStatus.BAD_GATEWAY)
return
content_type = resp.getheader("Content-Type") or mimetypes.guess_type(parsed.path)[0] or "application/octet-stream"
is_manifest = parsed.path.lower().endswith(".m3u8") or "mpegurl" in content_type.lower()
if is_manifest:
body = resp.read(2 * 1024 * 1024)
_upstream_conn_return(_scheme, _host, _port, conn)
content = rewrite_external_hls_manifest(decode_probe_text(body), target_url, upstream_cookie).encode("utf-8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "application/vnd.apple.mpegurl; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.send_header("Cache-Control", "no-cache")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
if send_body:
self.wfile.write(content)
return
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", content_type)
self.send_header("Cache-Control", "no-cache")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
if not send_body:
resp.read()
_upstream_conn_return(_scheme, _host, _port, conn)
return
conn_returned = False
try:
while True:
chunk = resp.read(65536)
if not chunk:
break
self.wfile.write(chunk)
_upstream_conn_return(_scheme, _host, _port, conn)
conn_returned = True
except OSError:
pass
finally:
if not conn_returned:
try:
conn.close()
except Exception:
pass
def proxy_hls_manifest_route(self, request_path: str, send_body: bool = True) -> None:
# URL format: /proxy/hls-manifest/<token>/<base64-encoded-url>