Files
MeshChatX/scripts/ci/github-upload-bunny-storage-release-assets.py
T

180 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""Upload a directory tree to bunny.net Edge Storage (HTTP PUT per object)."""
from __future__ import annotations
import hashlib
import json
import mimetypes
import os
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from urllib.parse import quote
def encode_object_rel(rel: str) -> str:
return "/".join(quote(part, safe="") for part in rel.split("/"))
def mime_for(path: Path) -> str:
if path.suffix.lower() == ".wasm":
return "application/wasm"
guessed, _enc = mimetypes.guess_type(path.name)
return guessed or "application/octet-stream"
def get_json(url: str, access_key: str, timeout: int = 120) -> object:
req = urllib.request.Request(
url,
method="GET",
headers={"AccessKey": access_key},
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read()
return json.loads(body.decode("utf-8"))
def delete_path(url: str, access_key: str, timeout: int = 120) -> None:
req = urllib.request.Request(
url,
method="DELETE",
headers={"AccessKey": access_key},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
code = resp.getcode()
except urllib.error.HTTPError as e:
code = e.code
if code == 404:
return
e.read(500)
raise SystemExit(f"HTTP {code} DELETE {url}") from e
else:
if code not in (200, 201, 204):
raise SystemExit(f"unexpected DELETE status {code} for {url}")
def prune_other_versions(
base: str,
access_key: str,
track: str,
keep_version: str,
) -> None:
"""Remove every version directory under ``track/`` except ``keep_version``."""
base = base.rstrip("/")
list_url = f"{base}/{encode_object_rel(track)}/"
try:
listing = get_json(list_url, access_key)
except urllib.error.HTTPError as e:
if e.code == 404:
return
raise
if not isinstance(listing, list):
return
seen: set[str] = set()
for item in listing:
if not isinstance(item, dict):
continue
if not item.get("IsDirectory"):
continue
name = item.get("ObjectName")
if not name or not isinstance(name, str):
continue
if name in seen:
continue
seen.add(name)
if name == keep_version:
continue
rel = f"{track}/{name}"
delete_url = f"{base}/{encode_object_rel(rel)}"
print(f"prune: DELETE {delete_url}", file=sys.stderr)
delete_path(delete_url, access_key)
def put_file(
url: str,
body: bytes,
access_key: str,
content_type: str,
max_attempts: int = 4,
) -> None:
checksum = hashlib.sha256(body).hexdigest().upper()
timeout = 600 if len(body) > 50_000_000 else 120
for attempt in range(1, max_attempts + 1):
req = urllib.request.Request(
url,
data=body,
method="PUT",
headers={
"AccessKey": access_key,
"Content-Type": content_type,
"Checksum": checksum,
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
code = resp.getcode()
except urllib.error.HTTPError as e:
code = e.code
err_body = e.read(500)
if code in (200, 201):
return
if 500 <= code < 600 and attempt < max_attempts:
time.sleep(0.5 * (2 ** (attempt - 1)))
continue
raise SystemExit(
f"HTTP {code} for {url}: {err_body!r}",
) from e
except (urllib.error.URLError, TimeoutError) as e:
if attempt < max_attempts:
time.sleep(0.5 * (2 ** (attempt - 1)))
continue
raise SystemExit(f"request failed for {url}: {e}") from e
else:
if code in (200, 201):
return
raise SystemExit(f"unexpected status {code} for {url}")
def main() -> None:
base = os.environ.get("BUNNY_STORAGE_BASE_URL", "").rstrip("/")
key = os.environ.get("BUNNY_STORAGE_ACCESS_KEY", "")
prefix = os.environ.get("BUNNY_STORAGE_OBJECT_PREFIX", "").strip("/")
if not base or not key:
print(
"BUNNY_STORAGE_BASE_URL and BUNNY_STORAGE_ACCESS_KEY must be set",
file=sys.stderr,
)
sys.exit(1)
if prefix:
seg = prefix.split("/", 1)
if len(seg) == 2 and seg[0] in ("master", "dev"):
prune_other_versions(base, key, seg[0], seg[1])
root = Path(sys.argv[1]).resolve()
if not root.is_dir():
print(f"not a directory: {root}", file=sys.stderr)
sys.exit(1)
files = sorted(p for p in root.rglob("*") if p.is_file())
if not files:
print(f"no files under {root}", file=sys.stderr)
sys.exit(1)
for path in files:
rel = path.relative_to(root).as_posix()
if prefix:
object_rel = f"{prefix}/{rel}"
else:
object_rel = rel
url = f"{base}/{encode_object_rel(object_rel)}"
body = path.read_bytes()
put_file(url, body, key, mime_for(path))
print(url)
if __name__ == "__main__":
main()