File size: 7,321 Bytes
a705dd3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
#!/usr/bin/env python3
"""
update_pins.py
- Fetch newest SHAs (release tag or default branch) for SAM2 + MatAnyone
- Update ARG lines in Dockerfile: SAM2_SHA / MATANYONE_SHA
- Supports dry-run and manual pins
- Uses GitHub API; set GITHUB_TOKEN to avoid rate limits (optional)
"""
import os
import re
import sys
import json
import argparse
from urllib.parse import urlparse
import requests
from datetime import datetime, timezone
from shutil import copyfile
DOCKERFILE_PATH = "Dockerfile"
# Default repos (must match your Dockerfile ARGs)
SAM2_REPO_URL = "https://github.com/facebookresearch/segment-anything-2"
MATANY_REPO_URL = "https://github.com/pq-yang/MatAnyone"
SESSION = requests.Session()
if os.getenv("GITHUB_TOKEN"):
SESSION.headers.update({"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"})
SESSION.headers.update({
"Accept": "application/vnd.github+json",
"User-Agent": "update-pins-script"
})
def gh_owner_repo(repo_url: str):
p = urlparse(repo_url)
parts = p.path.strip("/").split("/")
if len(parts) < 2:
raise ValueError(f"Invalid repo URL: {repo_url}")
return parts[0], parts[1]
def gh_api(path: str):
url = f"https://api.github.com{path}"
r = SESSION.get(url, timeout=30)
if r.status_code >= 400:
raise RuntimeError(f"GitHub API error {r.status_code}: {r.text}")
return r.json()
def get_latest_release_sha(repo_url: str) -> tuple[str, str]:
"""Return (ref_desc, commit_sha) using latest release tag."""
owner, repo = gh_owner_repo(repo_url)
try:
rel = gh_api(f"/repos/{owner}/{repo}/releases/latest")
tag = rel["tag_name"]
# Resolve tag to commit
ref = gh_api(f"/repos/{owner}/{repo}/git/ref/tags/{tag}")
obj = ref["object"]
if obj["type"] == "tag":
tag_obj = gh_api(f"/repos/{owner}/{repo}/git/tags/{obj['sha']}")
sha = tag_obj["object"]["sha"]
else:
sha = obj["sha"]
return (f"release:{tag}", sha)
except Exception as e:
raise RuntimeError(f"Could not get latest release for {repo}: {e}")
def get_latest_default_branch_sha(repo_url: str) -> tuple[str, str]:
"""Return (ref_desc, commit_sha) using the default branch head."""
owner, repo = gh_owner_repo(repo_url)
info = gh_api(f"/repos/{owner}/{repo}")
default_branch = info["default_branch"]
branch = gh_api(f"/repos/{owner}/{repo}/branches/{default_branch}")
sha = branch["commit"]["sha"]
return (f"branch:{default_branch}", sha)
def get_sha_for_ref(repo_url: str, ref: str) -> tuple[str, str]:
"""
Resolve any Git ref (branch name, tag name, or commit SHA) to a commit SHA.
"""
owner, repo = gh_owner_repo(repo_url)
# If it's already a full SHA, just return it
if re.fullmatch(r"[0-9a-f]{40}", ref):
return (f"commit:{ref[:7]}", ref)
# Try branches/<ref>, then tags/<ref>, then commits/<ref>
for kind, path in [
("branch", f"/repos/{owner}/{repo}/branches/{ref}"),
("tag", f"/repos/{owner}/{repo}/git/ref/tags/{ref}"),
("commit", f"/repos/{owner}/{repo}/commits/{ref}")
]:
try:
data = gh_api(path)
if kind == "branch":
return (f"branch:{ref}", data["commit"]["sha"])
if kind == "tag":
obj = data["object"]
if obj["type"] == "tag":
tag_obj = gh_api(f"/repos/{owner}/{repo}/git/tags/{obj['sha']}")
return (f"tag:{ref}", tag_obj["object"]["sha"])
else:
return (f"tag:{ref}", obj["sha"])
if kind == "commit":
return (f"commit:{ref[:7]}", data["sha"])
except Exception:
continue
raise RuntimeError(f"Could not resolve ref '{ref}' for {repo}")
def update_dockerfile_arg(dockerfile_text: str, arg_name: str, new_value: str) -> str:
"""
Replace a line like:
ARG SAM2_SHA=...
with:
ARG SAM2_SHA=<new_value>
"""
pattern = rf"^(ARG\s+{re.escape(arg_name)}=).*$"
# Use a callable replacement to avoid backreference ambiguity (e.g., \12)
def repl(m: re.Match) -> str:
return m.group(1) + new_value
new_text, n = re.subn(pattern, repl, dockerfile_text, flags=re.MULTILINE)
if n == 0:
raise RuntimeError(f"ARG {arg_name}=… line not found in Dockerfile.")
return new_text
def main():
ap = argparse.ArgumentParser(description="Update pinned SHAs in Dockerfile.")
ap.add_argument("--mode", choices=["release", "default-branch"], default="release",
help="Where to pull pins from (latest GitHub release tag or default branch head).")
ap.add_argument("--sam2-ref", help="Explicit ref for SAM2 (tag/branch/sha). Overrides --mode.")
ap.add_argument("--matany-ref", help="Explicit ref for MatAnyone (tag/branch/sha). Overrides --mode.")
ap.add_argument("--dockerfile", default=DOCKERFILE_PATH, help="Path to Dockerfile.")
ap.add_argument("--dry-run", action="store_true", help="Show changes but do not write file.")
ap.add_argument("--json", action="store_true", help="Print resulting pins as JSON.")
ap.add_argument("--no-backup", action="store_true", help="Do not create a Dockerfile.bak backup.")
args = ap.parse_args()
# Resolve SHAs
if args.sam2_ref:
sam2_refdesc, sam2_sha = get_sha_for_ref(SAM2_REPO_URL, args.sam2_ref)
else:
sam2_refdesc, sam2_sha = (
get_latest_release_sha(SAM2_REPO_URL) if args.mode == "release"
else get_latest_default_branch_sha(SAM2_REPO_URL)
)
if args.matany_ref:
mat_refdesc, mat_sha = get_sha_for_ref(MATANY_REPO_URL, args.matany_ref)
else:
mat_refdesc, mat_sha = (
get_latest_release_sha(MATANY_REPO_URL) if args.mode == "release"
else get_latest_default_branch_sha(MATANY_REPO_URL)
)
result = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"mode": args.mode,
"SAM2": {"repo": SAM2_REPO_URL, "ref": sam2_refdesc, "sha": sam2_sha},
"MatAnyone": {"repo": MATANY_REPO_URL, "ref": mat_refdesc, "sha": mat_sha},
}
# Show pins
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"[Pins] SAM2 -> {sam2_refdesc} -> {sam2_sha}")
print(f"[Pins] MatAnyone -> {mat_refdesc} -> {mat_sha}")
# Read Dockerfile
if not os.path.isfile(args.dockerfile):
raise FileNotFoundError(f"Dockerfile not found at: {args.dockerfile}")
with open(args.dockerfile, "r", encoding="utf-8") as f:
text = f.read()
# Update lines
text = update_dockerfile_arg(text, "SAM2_SHA", sam2_sha)
text = update_dockerfile_arg(text, "MATANYONE_SHA", mat_sha)
if args.dry_run:
print("\n--- Dockerfile (preview) ---\n")
print(text)
return
# Backup
if not args.no_backup:
copyfile(args.dockerfile, args.dockerfile + ".bak")
# Write
with open(args.dockerfile, "w", encoding="utf-8") as f:
f.write(text)
print(f"\n✅ Updated {args.dockerfile} with new pins.")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"\n❌ Error: {e}", file=sys.stderr)
sys.exit(1)
|