Push Tracker
ria-toolkit-oss/src/ria_toolkit_oss_cli/ria_toolkit_oss/upload.py
2026-06-04 15:06:28 -04:00

473 lines
16 KiB
Python

"""ria upload — stream large files to a RIA Hub Project via the LFS API.
How it works
------------
1. The file is hashed locally (SHA-256 + size) — this is the LFS object ID.
2. A single POST to the repo's LFS batch endpoint returns an upload URL
(and headers) for any object the server does not already have.
3. The file is streamed to that URL in fixed-size chunks — nothing is ever
fully loaded into memory, so files of any size work.
4. A commit is created via the Gitea contents API that records the LFS
pointer (a small text file) so the file appears in the repo tree.
No server-side changes are required — this uses the same authenticated LFS
protocol that `git lfs push` uses internally.
"""
import base64
import hashlib
import http.client
import json
import math
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
import click
# Read buffer for hashing and streaming — 8 MB keeps memory use flat
# for arbitrarily large files.
_CHUNK = 8 * 1024 * 1024
LFS_MEDIA_TYPE = "application/vnd.git-lfs+json"
# ---------------------------------------------------------------------------
# Credential helpers (reused from setup_repo pattern)
# ---------------------------------------------------------------------------
def _get_stored_credentials(hub_url: str) -> tuple[str | None, str | None]:
import subprocess
parsed = urllib.parse.urlparse(hub_url)
payload = f"protocol={parsed.scheme}\nhost={parsed.netloc}\n\n"
try:
result = subprocess.run(
["git", "credential", "fill"],
input=payload,
capture_output=True,
text=True,
timeout=5,
)
creds = {}
for line in result.stdout.splitlines():
k, sep, v = line.partition("=")
if sep:
creds[k.strip()] = v
return creds.get("username"), creds.get("password")
except Exception:
return None, None
def _resolve_credentials(hub: str) -> tuple[str, str]:
username, password = _get_stored_credentials(hub)
if username and password:
return username, password
click.echo(f"No stored credentials found for {hub}.")
username = click.prompt("RIA Hub username")
password = click.prompt("Password / personal access token", hide_input=True)
return username, password
def _basic_auth(username: str, password: str) -> str:
return "Basic " + base64.b64encode(f"{username}:{password}".encode()).decode()
# ---------------------------------------------------------------------------
# File helpers
# ---------------------------------------------------------------------------
def _hash_file(path: str) -> tuple[str, int]:
"""Return (sha256_hex, byte_size) by streaming the file."""
h = hashlib.sha256()
size = 0
with open(path, "rb") as f:
while True:
chunk = f.read(_CHUNK)
if not chunk:
break
h.update(chunk)
size += len(chunk)
return h.hexdigest(), size
def _lfs_pointer_text(oid: str, size: int) -> str:
return f"version https://git-lfs.github.com/spec/v1\noid sha256:{oid}\nsize {size}\n"
# ---------------------------------------------------------------------------
# LFS batch API
# ---------------------------------------------------------------------------
def _lfs_batch(
hub: str,
owner: str,
repo: str,
objects: list[dict],
username: str,
password: str,
) -> dict:
"""
POST to /{owner}/{repo}.git/info/lfs/objects/batch.
Returns the parsed JSON response.
Raises on HTTP error or JSON decode failure.
"""
url = (
f"{hub.rstrip('/')}"
f"/{urllib.parse.quote(owner, safe='')}"
f"/{urllib.parse.quote(repo, safe='')}"
f".git/info/lfs/objects/batch"
)
body = json.dumps(
{
"operation": "upload",
"transfers": ["basic"],
"objects": objects,
}
).encode()
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", LFS_MEDIA_TYPE)
req.add_header("Accept", LFS_MEDIA_TYPE)
req.add_header("Authorization", _basic_auth(username, password))
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body_text = e.read().decode(errors="replace")
raise RuntimeError(f"LFS batch request failed (HTTP {e.code}): {body_text}") from e
# ---------------------------------------------------------------------------
# Streaming PUT upload
# ---------------------------------------------------------------------------
def _stream_upload(href: str, headers: dict, file_path: str, size: int) -> None:
"""
PUT file_path content to href, streaming in _CHUNK-sized pieces.
Uses http.client directly so Content-Length is set without buffering
the whole file in memory. Works for files of any size.
"""
parsed = urllib.parse.urlparse(href)
host = parsed.netloc
path = parsed.path
if parsed.query:
path += "?" + parsed.query
if parsed.scheme == "https":
conn = http.client.HTTPSConnection(host, timeout=300)
else:
conn = http.client.HTTPConnection(host, timeout=300)
all_headers = dict(headers or {})
all_headers.setdefault("Content-Type", "application/octet-stream")
all_headers["Content-Length"] = str(size)
try:
conn.connect()
conn.putrequest("PUT", path)
for k, v in all_headers.items():
conn.putheader(k, v)
conn.endheaders()
with open(file_path, "rb") as f:
while True:
chunk = f.read(_CHUNK)
if not chunk:
break
conn.send(chunk)
resp = conn.getresponse()
resp.read() # drain
if resp.status not in (200, 201):
raise RuntimeError(f"LFS object upload failed: HTTP {resp.status}")
finally:
conn.close()
# ---------------------------------------------------------------------------
# Gitea contents API — create / update a file to record the LFS pointer
# ---------------------------------------------------------------------------
def _get_file_sha(
hub: str,
owner: str,
repo: str,
path: str,
branch: str,
username: str,
password: str,
) -> str | None:
"""Return the blob SHA of an existing file, or None if it doesn't exist."""
url = (
f"{hub.rstrip('/')}/api/v1"
f"/repos/{urllib.parse.quote(owner, safe='')}/{urllib.parse.quote(repo, safe='')}"
f"/contents/{urllib.parse.quote(path)}"
f"?ref={urllib.parse.quote(branch)}"
)
req = urllib.request.Request(url)
req.add_header("Authorization", _basic_auth(username, password))
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read()).get("sha")
except urllib.error.HTTPError as e:
if e.code == 404:
return None
raise
def _commit_lfs_pointer(
hub: str,
owner: str,
repo: str,
remote_path: str,
pointer_text: str,
branch: str,
message: str,
username: str,
password: str,
) -> None:
"""Create or update a file in the repo containing the LFS pointer."""
url = (
f"{hub.rstrip('/')}/api/v1"
f"/repos/{urllib.parse.quote(owner, safe='')}/{urllib.parse.quote(repo, safe='')}"
f"/contents/{urllib.parse.quote(remote_path)}"
)
existing_sha = _get_file_sha(hub, owner, repo, remote_path, branch, username, password)
body: dict = {
"message": message,
"content": base64.b64encode(pointer_text.encode()).decode(),
"branch": branch,
}
if existing_sha:
body["sha"] = existing_sha
method = "PUT" if existing_sha else "POST"
req = urllib.request.Request(url, data=json.dumps(body).encode(), method=method)
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", _basic_auth(username, password))
try:
with urllib.request.urlopen(req, timeout=30) as resp:
resp.read()
except urllib.error.HTTPError as e:
body_text = e.read().decode(errors="replace")
raise RuntimeError(f"Failed to commit LFS pointer for '{remote_path}' (HTTP {e.code}): {body_text}") from e
# ---------------------------------------------------------------------------
# Per-file upload logic
# ---------------------------------------------------------------------------
def _upload_single_file(
hub: str,
owner: str,
repo_name: str,
username: str,
password: str,
file_path: str,
remote_dir: str,
message: str | None,
branch: str,
) -> None:
"""Hash, upload (if needed), and commit the LFS pointer for one file."""
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
size_mb = file_size / (1024 * 1024)
click.echo(f"\n {filename} ({size_mb:.1f} MB)")
click.echo(" Hashing...", nl=False)
oid, size = _hash_file(file_path)
click.echo(f" sha256:{oid[:12]}...")
try:
batch = _lfs_batch(hub, owner, repo_name, [{"oid": oid, "size": size}], username, password)
except RuntimeError as e:
click.echo(f"\n Error: {e}", err=True)
sys.exit(1)
objects = batch.get("objects", [])
if not objects:
click.echo(" Already in LFS — skipping upload.")
else:
obj = objects[0]
if "error" in obj:
err_msg = obj["error"].get("message", "unknown error")
err_code = obj["error"].get("code", 0)
if err_code == 413 or "quota" in err_msg.lower() or "limit" in err_msg.lower():
click.echo(
f"\n Error: storage quota exceeded for this repo.\n Server: {err_msg}",
err=True,
)
else:
click.echo(f"\n Error from server: {err_msg}", err=True)
sys.exit(1)
upload_action = obj.get("actions", {}).get("upload")
if not upload_action:
click.echo(" Already in LFS — skipping upload.")
else:
href = upload_action["href"]
up_headers = upload_action.get("header", {})
chunks = math.ceil(size / _CHUNK)
click.echo(f" Uploading ({size_mb:.1f} MB, {chunks} chunk{'s' if chunks != 1 else ''})...")
try:
_stream_upload_progress(href, up_headers, file_path, size)
except RuntimeError as e:
click.echo(f"\n Upload failed: {e}", err=True)
sys.exit(1)
click.echo(" Upload complete.")
verify_action = obj.get("actions", {}).get("verify")
if verify_action:
try:
vreq = urllib.request.Request(
verify_action["href"],
data=json.dumps({"oid": oid, "size": size}).encode(),
method="POST",
)
vreq.add_header("Content-Type", LFS_MEDIA_TYPE)
vreq.add_header("Accept", LFS_MEDIA_TYPE)
for k, v in verify_action.get("header", {}).items():
vreq.add_header(k, v)
with urllib.request.urlopen(vreq, timeout=15):
pass
except Exception:
pass # verify is optional; non-fatal on failure
pointer = _lfs_pointer_text(oid, size)
remote_path = (f"{remote_dir.rstrip('/')}/{filename}").lstrip("/") if remote_dir else filename
commit_msg = message or f"chore: upload {filename} via ria"
click.echo(f" Committing pointer → {remote_path}...", nl=False)
try:
_commit_lfs_pointer(hub, owner, repo_name, remote_path, pointer, branch, commit_msg, username, password)
click.echo(" done.")
except RuntimeError as e:
click.echo(f"\n Error: {e}", err=True)
sys.exit(1)
def _stream_upload_progress(href: str, headers: dict, file_path: str, size: int) -> None:
"""Stream file_path to href with a click progress bar."""
parsed = urllib.parse.urlparse(href)
host = parsed.netloc
path_q = parsed.path + (f"?{parsed.query}" if parsed.query else "")
if parsed.scheme == "https":
conn = http.client.HTTPSConnection(host, timeout=300)
else:
conn = http.client.HTTPConnection(host, timeout=300)
all_headers = dict(headers)
all_headers.setdefault("Content-Type", "application/octet-stream")
all_headers["Content-Length"] = str(size)
with click.progressbar(
length=size,
label=" ",
width=40,
show_eta=True,
show_percent=True,
fill_char="",
empty_char="",
) as bar:
conn.connect()
conn.putrequest("PUT", path_q)
for k, v in all_headers.items():
conn.putheader(k, v)
conn.endheaders()
with open(file_path, "rb") as f:
while True:
chunk = f.read(_CHUNK)
if not chunk:
break
conn.send(chunk)
bar.update(len(chunk))
resp = conn.getresponse()
resp.read()
conn.close()
if resp.status not in (200, 201):
raise RuntimeError(f"HTTP {resp.status}")
# ---------------------------------------------------------------------------
# Command
# ---------------------------------------------------------------------------
@click.command("upload")
@click.argument("files", nargs=-1, required=True)
@click.option(
"--repo", required=True, metavar="OWNER/NAME", help="Target repository on RIA Hub (e.g. benchinnery/my-dataset)."
)
@click.option("--hub", default="http://localhost:3000", show_default=True, metavar="URL", help="RIA Hub base URL.")
@click.option("--branch", default="main", show_default=True, help="Branch to commit the files to.")
@click.option(
"--path",
"remote_dir",
default="",
metavar="DIR",
help="Remote directory path inside the repo (default: repo root).",
)
@click.option("--message", "-m", default=None, help="Commit message (default: 'chore: upload <filename> via ria').")
def upload(
files: tuple[str],
repo: str,
hub: str,
branch: str,
remote_dir: str,
message: str | None,
) -> None:
"""Upload large files to a RIA Hub Project via Git LFS.
Files are streamed directly to the repo's LFS object store — nothing is
buffered into memory, so files of any size work. Each file creates one
commit recording the LFS pointer.
\b
Examples:
ria upload recording.sigmf-data --repo benchinnery/my-recordings
ria upload *.npy --repo benchinnery/my-recordings --branch main
ria upload big.pt --repo benchinnery/models --path weights/
"""
# Validate repo argument
if "/" not in repo:
click.echo("Error: --repo must be in the form OWNER/NAME.", err=True)
sys.exit(1)
owner, repo_name = repo.split("/", 1)
# Expand and validate files
resolved = []
for pattern in files:
if not os.path.isfile(pattern):
click.echo(f"Error: '{pattern}' is not a file or does not exist.", err=True)
sys.exit(1)
resolved.append(os.path.abspath(pattern))
# Credentials
hub = hub.rstrip("/")
username, password = _resolve_credentials(hub)
click.echo(f"Uploading {len(resolved)} file(s) to {owner}/{repo_name} on {hub}...")
for file_path in resolved:
_upload_single_file(hub, owner, repo_name, username, password, file_path, remote_dir, message, branch)
click.echo(f"\nAll done. {len(resolved)} file(s) uploaded to {owner}/{repo_name}.")