You have a directory of evidence files that need blockchain-anchored timestamps. Maybe it's a pre-loss documentation archive. Maybe it's a set of site inspection photos captured before a dispute filed.

A for-loop over the directory will work until it doesn't. Hit a rate limit, crash without saving state, and you're starting over with no record of what actually made it through.

This is the worker pattern I'd reach for when bulk archiving through ProofLedger's v1 API. Three things the naive loop skips: concurrent submissions, 429 backoff that reads the Retry-After header, and resumable progress so a crash doesn't reset the whole job.

Hash First, Then Submit

The ProofLedger v1 API anchors a SHA-256 digest, not the file itself. The file stays on your machine. Step one is always local hashing.

import hashlib

def hash_file(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            h.update(chunk)
    return h.hexdigest()

Chunked reads keep memory flat regardless of file size. A 4GB video and a 10-page PDF go through the same function.

The base API call takes the digest in the request body, with your API key in the Authorization header.

import requests

API_URL = "https://proofledger.io/api/v1/proof"
API_KEY = "sk_YOUR_KEY_HERE"

def anchor_hash(digest: str, filename: str) -> dict:
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {"sha256": digest, "filename": filename}
    resp = requests.post(API_URL, json=payload, headers=headers, timeout=30)
    resp.raise_for_status()
    return resp.json()

That's fine for a single file. A batch job needs real error handling on top.

Handling 429s

The API is rate-limited. When you're submitting files concurrently, you'll hit 429s. The right response is to back off and retry, reading the Retry-After header rather than guessing.

import time

def anchor_hash(digest: str, filename: str) -> dict:
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {"sha256": digest, "filename": filename}

    for attempt in range(6):
        try:
            resp = requests.post(API_URL, json=payload, headers=headers, timeout=30)
        except requests.RequestException:
            if attempt == 5:
                raise
            time.sleep(2 ** attempt)
            continue

        if resp.status_code == 201:
            return resp.json()

        if resp.status_code == 429:
            wait = int(resp.headers.get("Retry-After", 60))
            print(f"  Rate limited. Waiting {wait}s (attempt {attempt + 1}/6)")
            time.sleep(wait)
            continue

        if resp.status_code in (400, 401, 403):
            raise ValueError(f"Non-retryable {resp.status_code}: {resp.text}")

        time.sleep(2 ** attempt)

    raise RuntimeError(f"Anchoring failed for {filename} after 6 attempts")

The 400/401/403 group is non-retryable. A 400 means your hash is malformed. A 401 or 403 means your key is wrong or on the wrong plan tier. Retrying those wastes time and burns rate limit. The requests.RequestException branch handles network timeouts and connection drops separately from HTTP status codes.

Six attempts is enough ceiling to clear a sustained rate-limit window without hanging the job indefinitely.

Concurrency

ThreadPoolExecutor with five workers gets you parallel submissions without hammering the endpoint hard enough to trigger a flood of 429s.

The choice of as_completed over executor.map matters here. With executor.map, you'd have to wait for all futures before processing any results. With as_completed, each resolved future triggers a disk write immediately. If the script crashes, you restart and pick up where the last successful write landed.

from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

MAX_WORKERS = 5

def process_file(path: Path) -> tuple:
    digest = hash_file(str(path))
    result = anchor_hash(digest, path.name)
    return str(path), digest, result

Five concurrent threads also keeps log output readable. Bump MAX_WORKERS to 20 and you get 20 interleaved error messages whenever something goes wrong.

Resumable Progress

Write to disk after every successful anchor. Every one. The progress file is cheap to write; restarting an entire archive job is not.

import json

PROGRESS_FILE = "anchor_progress.json"

def load_progress() -> dict:
    p = Path(PROGRESS_FILE)
    return json.loads(p.read_text()) if p.exists() else {}

def save_progress(progress: dict) -> None:
    Path(PROGRESS_FILE).write_text(json.dumps(progress, indent=2))

The progress dict is keyed by file path. Each entry stores the SHA-256 digest, proof ID, status, and certificate URL. Any path already in the file is skipped on startup.

One caveat: this assumes files in your archive are immutable after capture. If a file might get overwritten between runs, you'd want to re-hash and compare the stored digest before deciding to skip.

The duplicate_of Response

If you submit a hash that's already been anchored, the API returns a duplicate_of field pointing to the original proof ID.

It can come from multiple places. Two team members photographing the same document separately. A file copied into multiple subdirectories before ingestion. The same report exported under two filenames.

When you see duplicate_of, the original proof is canonical. Don't create a new anchor for identical bytes.

def record_result(progress: dict, key: str, digest: str, result: dict) -> None:
    entry = {
        "sha256": digest,
        "proof_id": result.get("id"),
        "status": result.get("status"),
        "certificate_url": result.get("certificate_url"),
    }
    if result.get("duplicate_of"):
        entry["duplicate_of"] = result["duplicate_of"]
        print(f"  Duplicate: {Path(key).name} -> original {result['duplicate_of']}")
    else:
        print(f"  Anchored: {Path(key).name} -> {result.get('id')}")

    progress[key] = entry
    save_progress(progress)

The certificate_url from the original anchor is what you'd attach for chain-of-custody documentation. The duplicate_of field gives you the ID to find it.

Full Script

import hashlib
import json
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

API_URL = "https://proofledger.io/api/v1/proof"
API_KEY = "sk_YOUR_KEY_HERE"
PROGRESS_FILE = "anchor_progress.json"
MAX_WORKERS = 5


def hash_file(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            h.update(chunk)
    return h.hexdigest()


def load_progress() -> dict:
    p = Path(PROGRESS_FILE)
    return json.loads(p.read_text()) if p.exists() else {}


def save_progress(progress: dict) -> None:
    Path(PROGRESS_FILE).write_text(json.dumps(progress, indent=2))


def anchor_hash(digest: str, filename: str) -> dict:
    headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
    payload = {"sha256": digest, "filename": filename}

    for attempt in range(6):
        try:
            resp = requests.post(API_URL, json=payload, headers=headers, timeout=30)
        except requests.RequestException:
            if attempt == 5:
                raise
            time.sleep(2 ** attempt)
            continue
        if resp.status_code == 201:
            return resp.json()
        if resp.status_code == 429:
            time.sleep(int(resp.headers.get("Retry-After", 60)))
            continue
        if resp.status_code in (400, 401, 403):
            raise ValueError(f"{resp.status_code}: {resp.text}")
        time.sleep(2 ** attempt)

    raise RuntimeError(f"Failed: {filename}")


def record_result(progress: dict, key: str, digest: str, result: dict) -> None:
    entry = {
        "sha256": digest,
        "proof_id": result.get("id"),
        "status": result.get("status"),
        "certificate_url": result.get("certificate_url"),
    }
    if result.get("duplicate_of"):
        entry["duplicate_of"] = result["duplicate_of"]
        print(f"  Duplicate: {Path(key).name} -> original {result['duplicate_of']}")
    else:
        print(f"  Anchored: {Path(key).name}")
    progress[key] = entry
    save_progress(progress)


def main(directory: str) -> None:
    progress = load_progress()
    pending = [
        f for f in Path(directory).rglob("*")
        if f.is_file() and str(f) not in progress
    ]
    print(f"Pending: {len(pending)}")

    def process(path: Path) -> tuple:
        digest = hash_file(str(path))
        result = anchor_hash(digest, path.name)
        return str(path), digest, result

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        futures = {pool.submit(process, f): f for f in pending}
        for future in as_completed(futures):
            f = futures[future]
            try:
                key, digest, result = future.result()
                record_result(progress, key, digest, result)
            except Exception as e:
                print(f"  Error: {f.name}: {e}")


if __name__ == "__main__":
    main("./evidence-archive")

What to Run Next

The progress file is useful beyond crash recovery. Once the batch finishes, parse it to pull all certificate_url values for a summary report. Filter for duplicate_of entries to find which files in your archive have identical content.

For third-party verification, the GET /api/v1/verify?hash=<sha256> endpoint is public and requires no authentication. Opposing counsel, auditors, or anyone with the original file can independently confirm the anchor. It's rate-limited to 120 requests per hour per IP, which is plenty for verification workflows.

For offline verification against a saved proof JSON, the verify-proof package handles that without any network calls.