|
|
|
|
|
""" |
|
|
update_pins.py |
|
|
- Fetch newest SHAs (release tag or default branch) for SAM2 + MatAnyone |
|
|
- Update ARG lines in Dockerfile: SAM2_SHA / MATANYONE_SHA |
|
|
- Supports dry-run and manual pins |
|
|
- Uses GitHub API; set GITHUB_TOKEN to avoid rate limits (optional) |
|
|
""" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import sys |
|
|
import json |
|
|
import argparse |
|
|
from urllib.parse import urlparse |
|
|
import requests |
|
|
from datetime import datetime, timezone |
|
|
from shutil import copyfile |
|
|
|
|
|
DOCKERFILE_PATH = "Dockerfile" |
|
|
|
|
|
|
|
|
SAM2_REPO_URL = "https://github.com/facebookresearch/segment-anything-2" |
|
|
MATANY_REPO_URL = "https://github.com/pq-yang/MatAnyone" |
|
|
|
|
|
SESSION = requests.Session() |
|
|
if os.getenv("GITHUB_TOKEN"): |
|
|
SESSION.headers.update({"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"}) |
|
|
SESSION.headers.update({ |
|
|
"Accept": "application/vnd.github+json", |
|
|
"User-Agent": "update-pins-script" |
|
|
}) |
|
|
|
|
|
def gh_owner_repo(repo_url: str): |
|
|
p = urlparse(repo_url) |
|
|
parts = p.path.strip("/").split("/") |
|
|
if len(parts) < 2: |
|
|
raise ValueError(f"Invalid repo URL: {repo_url}") |
|
|
return parts[0], parts[1] |
|
|
|
|
|
def gh_api(path: str): |
|
|
url = f"https://api.github.com{path}" |
|
|
r = SESSION.get(url, timeout=30) |
|
|
if r.status_code >= 400: |
|
|
raise RuntimeError(f"GitHub API error {r.status_code}: {r.text}") |
|
|
return r.json() |
|
|
|
|
|
def get_latest_release_sha(repo_url: str) -> tuple[str, str]: |
|
|
"""Return (ref_desc, commit_sha) using latest release tag.""" |
|
|
owner, repo = gh_owner_repo(repo_url) |
|
|
try: |
|
|
rel = gh_api(f"/repos/{owner}/{repo}/releases/latest") |
|
|
tag = rel["tag_name"] |
|
|
|
|
|
ref = gh_api(f"/repos/{owner}/{repo}/git/ref/tags/{tag}") |
|
|
obj = ref["object"] |
|
|
if obj["type"] == "tag": |
|
|
tag_obj = gh_api(f"/repos/{owner}/{repo}/git/tags/{obj['sha']}") |
|
|
sha = tag_obj["object"]["sha"] |
|
|
else: |
|
|
sha = obj["sha"] |
|
|
return (f"release:{tag}", sha) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Could not get latest release for {repo}: {e}") |
|
|
|
|
|
def get_latest_default_branch_sha(repo_url: str) -> tuple[str, str]: |
|
|
"""Return (ref_desc, commit_sha) using the default branch head.""" |
|
|
owner, repo = gh_owner_repo(repo_url) |
|
|
info = gh_api(f"/repos/{owner}/{repo}") |
|
|
default_branch = info["default_branch"] |
|
|
branch = gh_api(f"/repos/{owner}/{repo}/branches/{default_branch}") |
|
|
sha = branch["commit"]["sha"] |
|
|
return (f"branch:{default_branch}", sha) |
|
|
|
|
|
def get_sha_for_ref(repo_url: str, ref: str) -> tuple[str, str]: |
|
|
""" |
|
|
Resolve any Git ref (branch name, tag name, or commit SHA) to a commit SHA. |
|
|
""" |
|
|
owner, repo = gh_owner_repo(repo_url) |
|
|
|
|
|
if re.fullmatch(r"[0-9a-f]{40}", ref): |
|
|
return (f"commit:{ref[:7]}", ref) |
|
|
|
|
|
for kind, path in [ |
|
|
("branch", f"/repos/{owner}/{repo}/branches/{ref}"), |
|
|
("tag", f"/repos/{owner}/{repo}/git/ref/tags/{ref}"), |
|
|
("commit", f"/repos/{owner}/{repo}/commits/{ref}") |
|
|
]: |
|
|
try: |
|
|
data = gh_api(path) |
|
|
if kind == "branch": |
|
|
return (f"branch:{ref}", data["commit"]["sha"]) |
|
|
if kind == "tag": |
|
|
obj = data["object"] |
|
|
if obj["type"] == "tag": |
|
|
tag_obj = gh_api(f"/repos/{owner}/{repo}/git/tags/{obj['sha']}") |
|
|
return (f"tag:{ref}", tag_obj["object"]["sha"]) |
|
|
else: |
|
|
return (f"tag:{ref}", obj["sha"]) |
|
|
if kind == "commit": |
|
|
return (f"commit:{ref[:7]}", data["sha"]) |
|
|
except Exception: |
|
|
continue |
|
|
raise RuntimeError(f"Could not resolve ref '{ref}' for {repo}") |
|
|
|
|
|
def update_dockerfile_arg(dockerfile_text: str, arg_name: str, new_value: str) -> str: |
|
|
""" |
|
|
Replace a line like: |
|
|
ARG SAM2_SHA=... |
|
|
with: |
|
|
ARG SAM2_SHA=<new_value> |
|
|
""" |
|
|
pattern = rf"^(ARG\s+{re.escape(arg_name)}=).*$" |
|
|
|
|
|
|
|
|
def repl(m: re.Match) -> str: |
|
|
return m.group(1) + new_value |
|
|
|
|
|
new_text, n = re.subn(pattern, repl, dockerfile_text, flags=re.MULTILINE) |
|
|
if n == 0: |
|
|
raise RuntimeError(f"ARG {arg_name}=… line not found in Dockerfile.") |
|
|
return new_text |
|
|
|
|
|
def main(): |
|
|
ap = argparse.ArgumentParser(description="Update pinned SHAs in Dockerfile.") |
|
|
ap.add_argument("--mode", choices=["release", "default-branch"], default="release", |
|
|
help="Where to pull pins from (latest GitHub release tag or default branch head).") |
|
|
ap.add_argument("--sam2-ref", help="Explicit ref for SAM2 (tag/branch/sha). Overrides --mode.") |
|
|
ap.add_argument("--matany-ref", help="Explicit ref for MatAnyone (tag/branch/sha). Overrides --mode.") |
|
|
ap.add_argument("--dockerfile", default=DOCKERFILE_PATH, help="Path to Dockerfile.") |
|
|
ap.add_argument("--dry-run", action="store_true", help="Show changes but do not write file.") |
|
|
ap.add_argument("--json", action="store_true", help="Print resulting pins as JSON.") |
|
|
ap.add_argument("--no-backup", action="store_true", help="Do not create a Dockerfile.bak backup.") |
|
|
args = ap.parse_args() |
|
|
|
|
|
|
|
|
if args.sam2_ref: |
|
|
sam2_refdesc, sam2_sha = get_sha_for_ref(SAM2_REPO_URL, args.sam2_ref) |
|
|
else: |
|
|
sam2_refdesc, sam2_sha = ( |
|
|
get_latest_release_sha(SAM2_REPO_URL) if args.mode == "release" |
|
|
else get_latest_default_branch_sha(SAM2_REPO_URL) |
|
|
) |
|
|
|
|
|
if args.matany_ref: |
|
|
mat_refdesc, mat_sha = get_sha_for_ref(MATANY_REPO_URL, args.matany_ref) |
|
|
else: |
|
|
mat_refdesc, mat_sha = ( |
|
|
get_latest_release_sha(MATANY_REPO_URL) if args.mode == "release" |
|
|
else get_latest_default_branch_sha(MATANY_REPO_URL) |
|
|
) |
|
|
|
|
|
result = { |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"mode": args.mode, |
|
|
"SAM2": {"repo": SAM2_REPO_URL, "ref": sam2_refdesc, "sha": sam2_sha}, |
|
|
"MatAnyone": {"repo": MATANY_REPO_URL, "ref": mat_refdesc, "sha": mat_sha}, |
|
|
} |
|
|
|
|
|
|
|
|
if args.json: |
|
|
print(json.dumps(result, indent=2)) |
|
|
else: |
|
|
print(f"[Pins] SAM2 -> {sam2_refdesc} -> {sam2_sha}") |
|
|
print(f"[Pins] MatAnyone -> {mat_refdesc} -> {mat_sha}") |
|
|
|
|
|
|
|
|
if not os.path.isfile(args.dockerfile): |
|
|
raise FileNotFoundError(f"Dockerfile not found at: {args.dockerfile}") |
|
|
with open(args.dockerfile, "r", encoding="utf-8") as f: |
|
|
text = f.read() |
|
|
|
|
|
|
|
|
text = update_dockerfile_arg(text, "SAM2_SHA", sam2_sha) |
|
|
text = update_dockerfile_arg(text, "MATANYONE_SHA", mat_sha) |
|
|
|
|
|
if args.dry_run: |
|
|
print("\n--- Dockerfile (preview) ---\n") |
|
|
print(text) |
|
|
return |
|
|
|
|
|
|
|
|
if not args.no_backup: |
|
|
copyfile(args.dockerfile, args.dockerfile + ".bak") |
|
|
|
|
|
|
|
|
with open(args.dockerfile, "w", encoding="utf-8") as f: |
|
|
f.write(text) |
|
|
|
|
|
print(f"\n✅ Updated {args.dockerfile} with new pins.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
main() |
|
|
except Exception as e: |
|
|
print(f"\n❌ Error: {e}", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|