You have a directory of evidence files that need blockchain-anchored timestamps. Maybe it's a pre-loss documentation archive. Maybe it's a set of site inspection photos captured before a dispute filed.
A for-loop over the directory will work until it doesn't. Hit a rate limit, crash without saving state, and you're starting over with no record of what actually made it through.
This is the worker pattern I'd reach for when bulk archiving through ProofLedger's v1 API. Three things the naive loop skips: concurrent submissions, 429 backoff that reads the Retry-After header, and resumable progress so a crash doesn't reset the whole job.
Hash First, Then Submit
The ProofLedger v1 API anchors a SHA-256 digest, not the file itself. The file stays on your machine. Step one is always local hashing.
import hashlib
def hash_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
Chunked reads keep memory flat regardless of file size. A 4GB video and a 10-page PDF go through the same function.
The base API call takes the digest in the request body, with your API key in the Authorization header.
import requests
API_URL = "https://proofledger.io/api/v1/proof"
API_KEY = "sk_YOUR_KEY_HERE"
def anchor_hash(digest: str, filename: str) -> dict:
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
payload = {"sha256": digest, "filename": filename}
resp = requests.post(API_URL, json=payload, headers=headers, timeout=30)
resp.raise_for_status()
return resp.json()
That's fine for a single file. A batch job needs real error handling on top.
Handling 429s
The API is rate-limited. When you're submitting files concurrently, you'll hit 429s. The right response is to back off and retry, reading the Retry-After header rather than guessing.
import time
def anchor_hash(digest: str, filename: str) -> dict:
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
payload = {"sha256": digest, "filename": filename}
for attempt in range(6):
try:
resp = requests.post(API_URL, json=payload, headers=headers, timeout=30)
except requests.RequestException:
if attempt == 5:
raise
time.sleep(2 ** attempt)
continue
if resp.status_code == 201:
return resp.json()
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 60))
print(f" Rate limited. Waiting {wait}s (attempt {attempt + 1}/6)")
time.sleep(wait)
continue
if resp.status_code in (400, 401, 403):
raise ValueError(f"Non-retryable {resp.status_code}: {resp.text}")
time.sleep(2 ** attempt)
raise RuntimeError(f"Anchoring failed for {filename} after 6 attempts")
The 400/401/403 group is non-retryable. A 400 means your hash is malformed. A 401 or 403 means your key is wrong or on the wrong plan tier. Retrying those wastes time and burns rate limit. The requests.RequestException branch handles network timeouts and connection drops separately from HTTP status codes.
Six attempts is enough ceiling to clear a sustained rate-limit window without hanging the job indefinitely.
Concurrency
ThreadPoolExecutor with five workers gets you parallel submissions without hammering the endpoint hard enough to trigger a flood of 429s.
The choice of as_completed over executor.map matters here. With executor.map, you'd have to wait for all futures before processing any results. With as_completed, each resolved future triggers a disk write immediately. If the script crashes, you restart and pick up where the last successful write landed.
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
MAX_WORKERS = 5
def process_file(path: Path) -> tuple:
digest = hash_file(str(path))
result = anchor_hash(digest, path.name)
return str(path), digest, result
Five concurrent threads also keeps log output readable. Bump MAX_WORKERS to 20 and you get 20 interleaved error messages whenever something goes wrong.
Resumable Progress
Write to disk after every successful anchor. Every one. The progress file is cheap to write; restarting an entire archive job is not.
import json
PROGRESS_FILE = "anchor_progress.json"
def load_progress() -> dict:
p = Path(PROGRESS_FILE)
return json.loads(p.read_text()) if p.exists() else {}
def save_progress(progress: dict) -> None:
Path(PROGRESS_FILE).write_text(json.dumps(progress, indent=2))
The progress dict is keyed by file path. Each entry stores the SHA-256 digest, proof ID, status, and certificate URL. Any path already in the file is skipped on startup.
One caveat: this assumes files in your archive are immutable after capture. If a file might get overwritten between runs, you'd want to re-hash and compare the stored digest before deciding to skip.
The duplicate_of Response
If you submit a hash that's already been anchored, the API returns a duplicate_of field pointing to the original proof ID.
It can come from multiple places. Two team members photographing the same document separately. A file copied into multiple subdirectories before ingestion. The same report exported under two filenames.
When you see duplicate_of, the original proof is canonical. Don't create a new anchor for identical bytes.
def record_result(progress: dict, key: str, digest: str, result: dict) -> None:
entry = {
"sha256": digest,
"proof_id": result.get("id"),
"status": result.get("status"),
"certificate_url": result.get("certificate_url"),
}
if result.get("duplicate_of"):
entry["duplicate_of"] = result["duplicate_of"]
print(f" Duplicate: {Path(key).name} -> original {result['duplicate_of']}")
else:
print(f" Anchored: {Path(key).name} -> {result.get('id')}")
progress[key] = entry
save_progress(progress)
The certificate_url from the original anchor is what you'd attach for chain-of-custody documentation. The duplicate_of field gives you the ID to find it.
Full Script
import hashlib
import json
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
API_URL = "https://proofledger.io/api/v1/proof"
API_KEY = "sk_YOUR_KEY_HERE"
PROGRESS_FILE = "anchor_progress.json"
MAX_WORKERS = 5
def hash_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def load_progress() -> dict:
p = Path(PROGRESS_FILE)
return json.loads(p.read_text()) if p.exists() else {}
def save_progress(progress: dict) -> None:
Path(PROGRESS_FILE).write_text(json.dumps(progress, indent=2))
def anchor_hash(digest: str, filename: str) -> dict:
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
payload = {"sha256": digest, "filename": filename}
for attempt in range(6):
try:
resp = requests.post(API_URL, json=payload, headers=headers, timeout=30)
except requests.RequestException:
if attempt == 5:
raise
time.sleep(2 ** attempt)
continue
if resp.status_code == 201:
return resp.json()
if resp.status_code == 429:
time.sleep(int(resp.headers.get("Retry-After", 60)))
continue
if resp.status_code in (400, 401, 403):
raise ValueError(f"{resp.status_code}: {resp.text}")
time.sleep(2 ** attempt)
raise RuntimeError(f"Failed: {filename}")
def record_result(progress: dict, key: str, digest: str, result: dict) -> None:
entry = {
"sha256": digest,
"proof_id": result.get("id"),
"status": result.get("status"),
"certificate_url": result.get("certificate_url"),
}
if result.get("duplicate_of"):
entry["duplicate_of"] = result["duplicate_of"]
print(f" Duplicate: {Path(key).name} -> original {result['duplicate_of']}")
else:
print(f" Anchored: {Path(key).name}")
progress[key] = entry
save_progress(progress)
def main(directory: str) -> None:
progress = load_progress()
pending = [
f for f in Path(directory).rglob("*")
if f.is_file() and str(f) not in progress
]
print(f"Pending: {len(pending)}")
def process(path: Path) -> tuple:
digest = hash_file(str(path))
result = anchor_hash(digest, path.name)
return str(path), digest, result
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
futures = {pool.submit(process, f): f for f in pending}
for future in as_completed(futures):
f = futures[future]
try:
key, digest, result = future.result()
record_result(progress, key, digest, result)
except Exception as e:
print(f" Error: {f.name}: {e}")
if __name__ == "__main__":
main("./evidence-archive")
What to Run Next
The progress file is useful beyond crash recovery. Once the batch finishes, parse it to pull all certificate_url values for a summary report. Filter for duplicate_of entries to find which files in your archive have identical content.
For third-party verification, the GET /api/v1/verify?hash=<sha256> endpoint is public and requires no authentication. Opposing counsel, auditors, or anyone with the original file can independently confirm the anchor. It's rate-limited to 120 requests per hour per IP, which is plenty for verification workflows.
For offline verification against a saved proof JSON, the verify-proof package handles that without any network calls.