You set up a shared folder for evidence uploads. Files arrive throughout the day. By evening, you need SHA-256 hashes for everything that came in. Manual hashing works for ten files. It breaks at a hundred.

Here's how I built a file monitor that hashes incoming files automatically and stores the results in a way that survives server restarts.

Setting Up the File Watcher

Python's watchdog library provides a clean Observer pattern for monitoring filesystem events. The core components are an Observer (watches directories) and event handlers (respond to file changes).

import os
import hashlib
import json
import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class HashingEventHandler(FileSystemEventHandler):
    def __init__(self, hash_file="file_hashes.json"):
        self.hash_file = hash_file
        self.hash_db = self.load_hash_db()
    
    def load_hash_db(self):
        """Load existing hash database or create empty one."""
        if os.path.exists(self.hash_file):
            with open(self.hash_file, 'r') as f:
                return json.load(f)
        return {}
    
    def save_hash_db(self):
        """Persist hash database to disk."""
        with open(self.hash_file, 'w') as f:
            json.dump(self.hash_db, f, indent=2)
    
    def on_created(self, event):
        if event.is_directory:
            return
        self.process_file(event.src_path)
    
    def on_moved(self, event):
        if event.is_directory:
            return
        self.process_file(event.dest_path)

The on_created method fires when a new file appears. on_moved handles files that get renamed or moved into the watched directory. Both call the same processing function.

Hash Computation and Event Handling

The file processing needs to handle partial uploads, permission issues, and hash computation efficiently:

def process_file(self, file_path):
    """Process a new or moved file."""
    try:
        # Wait for file to be completely written
        if not self.wait_for_stable_file(file_path):
            print(f"Skipping unstable file: {file_path}")
            return
        
        # Skip if already processed
        file_key = str(Path(file_path).resolve())
        if file_key in self.hash_db:
            print(f"Already processed: {file_path}")
            return
        
        # Compute hash
        file_hash = self.compute_sha256(file_path)
        
        # Store result
        self.hash_db[file_key] = {
            "hash": file_hash,
            "processed_at": time.time(),
            "filename": os.path.basename(file_path),
            "size_bytes": os.path.getsize(file_path)
        }
        
        self.save_hash_db()
        print(f"Processed {os.path.basename(file_path)}: {file_hash}")
        
    except Exception as e:
        print(f"Error processing {file_path}: {e}")

def wait_for_stable_file(self, file_path, timeout=30):
    """Wait for file to stop changing (upload complete)."""
    try:
        start_time = time.time()
        last_size = -1
        
        while time.time() - start_time < timeout:
            try:
                current_size = os.path.getsize(file_path)
                if current_size == last_size and current_size > 0:
                    time.sleep(0.5)  # Final stability check
                    return os.path.getsize(file_path) == current_size
                last_size = current_size
                time.sleep(1)
            except OSError:
                time.sleep(1)  # File might be locked
        
        return False
    except Exception:
        return False

def compute_sha256(self, file_path):
    """Compute SHA-256 hash of file."""
    hasher = hashlib.sha256()
    
    with open(file_path, 'rb') as f:
        while chunk := f.read(8192):
            hasher.update(chunk)
    
    return hasher.hexdigest()

The stability check prevents processing partially uploaded files. Large files might take several minutes to upload completely. The hash computation uses 8KB chunks to handle files of any size without loading everything into memory.

Running the Monitor

The main loop sets up the observer and keeps it running:

def start_monitoring(watch_directory="./watched_files"):
    """Start the file monitoring service."""
    
    # Create watch directory if it doesn't exist
    os.makedirs(watch_directory, exist_ok=True)
    
    # Set up event handler and observer
    event_handler = HashingEventHandler()
    observer = Observer()
    observer.schedule(event_handler, watch_directory, recursive=True)
    
    # Process any existing files
    print("Processing existing files...")
    for root, dirs, files in os.walk(watch_directory):
        for file in files:
            file_path = os.path.join(root, file)
            event_handler.process_file(file_path)
    
    # Start monitoring
    observer.start()
    print(f"Monitoring {watch_directory} for new files...")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
        print("Stopping file monitor...")
    
    observer.join()

if __name__ == "__main__":
    start_monitoring("./evidence_uploads")

The monitor processes existing files on startup, then watches for new ones. Recursive watching handles files dropped into subdirectories.

Production Considerations

Real deployments need error recovery, logging, and integration points:

import logging
import signal
import sys
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('file_monitor.log'),
        logging.StreamHandler()
    ]
)

class ProductionHashingHandler(HashingEventHandler):
    def __init__(self, hash_file="file_hashes.json", webhook_url=None):
        super().__init__(hash_file)
        self.webhook_url = webhook_url
    
    def process_file(self, file_path):
        """Enhanced processing with logging and webhooks."""
        try:
            # ... (previous processing logic)
            
            # Optional: notify external system
            if self.webhook_url:
                self.notify_external_system(file_path, file_hash)
                
            logging.info(f"Processed {os.path.basename(file_path)}: {file_hash}")
            
        except Exception as e:
            logging.error(f"Error processing {file_path}: {e}")
    
    def notify_external_system(self, file_path, file_hash):
        """Send hash to external API or queue."""
        # This is where you'd integrate with your anchoring service
        # Could be a REST API call, message queue, or database insert
        payload = {
            "filename": os.path.basename(file_path),
            "hash": file_hash,
            "timestamp": datetime.utcnow().isoformat()
        }
        # Implementation depends on your downstream system
        pass

def signal_handler(signum, frame):
    """Clean shutdown on SIGTERM."""
    logging.info("Received shutdown signal")
    sys.exit(0)

signal.signal(signal.SIGTERM, signal_handler)

For integration with external services, you'd add the appropriate API calls in notify_external_system(). This could send hashes to a blockchain anchoring service, update a database, or publish to a message queue.

What You've Built

This monitor handles the complete file-to-hash pipeline automatically. It deduplicates by file path, persists results across restarts, and handles the edge cases that break simple solutions.

The hash database format makes it easy to query processed files or export hashes in bulk. You can extend it with additional metadata, implement retention policies, or integrate with downstream systems that need cryptographic proof of when files existed.

Run it on any shared folder where files arrive throughout the day. Point it at evidence uploads, backup directories, or content pipelines. The hashes are ready when you need them.