You drop a file into a folder. Within seconds, you get its SHA-256 hash logged, stored, and ready for whatever comes next. No manual hashing, no missed files, no gaps in your audit trail.

I built this pattern while working on ProofLedger's evidence intake pipeline. You need reliable hashing the moment files arrive, not later when someone remembers to run a script. Here's how to build a file-watcher that automatically hashes everything that lands in your monitored directory.

Setting Up the File Watcher

Python's watchdog library handles filesystem events across platforms. It fires callbacks when files are created, modified, or deleted. We'll focus on creation events and hash each new file immediately.

import hashlib
import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileHashHandler(FileSystemEventHandler):
    def __init__(self, log_file="file_hashes.log"):
        self.log_file = Path(log_file)
        self.log_file.parent.mkdir(exist_ok=True)
        
    def on_created(self, event):
        if event.is_directory:
            return
            
        file_path = Path(event.src_path)
        
        # Wait a moment for file write to complete
        time.sleep(0.1)
        
        # Skip temp files and hidden files
        if file_path.name.startswith('.') or file_path.suffix == '.tmp':
            return
            
        try:
            file_hash = self.hash_file(file_path)
            self.log_hash(file_path, file_hash)
            print(f"Hashed: {file_path.name} -> {file_hash[:12]}...")
            
        except (PermissionError, FileNotFoundError) as e:
            print(f"Error hashing {file_path.name}: {e}")
    
    def hash_file(self, file_path):
        """Calculate SHA-256 hash of file contents"""
        hash_sha256 = hashlib.sha256()
        
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_sha256.update(chunk)
                
        return hash_sha256.hexdigest()
    
    def log_hash(self, file_path, file_hash):
        """Log the hash with timestamp and file info"""
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        file_size = file_path.stat().st_size
        
        log_entry = f"{timestamp},{file_path.name},{file_size},{file_hash}\n"
        
        with open(self.log_file, "a") as log:
            log.write(log_entry)

The on_created method fires every time a new file appears. We skip directories, temp files, and hidden files. The small sleep gives file operations time to complete before we try to hash.

Running the Monitor

Setting up the observer is straightforward. Point it at your directory and let it run:

def start_monitoring(watch_directory):
    """Start monitoring a directory for new files"""
    watch_path = Path(watch_directory)
    watch_path.mkdir(exist_ok=True)
    
    event_handler = FileHashHandler()
    observer = Observer()
    observer.schedule(event_handler, str(watch_path), recursive=False)
    
    observer.start()
    print(f"Monitoring {watch_path} for new files...")
    print("Press Ctrl+C to stop")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
        print("\nStopping monitor...")
    
    observer.join()

if __name__ == "__main__":
    # Monitor the ./incoming directory
    start_monitoring("./incoming")

Create an incoming directory, run the script, and drop files into it. You'll see immediate hash output plus a CSV log file that tracks everything.

The log format is simple: timestamp,filename,size,hash. Perfect for loading into a database or feeding to another system later.

Handling Edge Cases

Real-world file monitoring needs to handle incomplete writes, locked files, and rapid file operations. Here's a more robust version:

import os
import time
from pathlib import Path

class RobustFileHashHandler(FileSystemEventHandler):
    def __init__(self, log_file="file_hashes.log", min_stable_time=1.0):
        self.log_file = Path(log_file)
        self.log_file.parent.mkdir(exist_ok=True)
        self.min_stable_time = min_stable_time
        self.pending_files = {}
        
    def on_created(self, event):
        if event.is_directory:
            return
            
        file_path = Path(event.src_path)
        
        # Skip unwanted files
        if self.should_skip_file(file_path):
            return
            
        # Track file for stability checking
        self.pending_files[str(file_path)] = time.time()
        
    def on_modified(self, event):
        if not event.is_directory:
            file_path = str(Path(event.src_path))
            # Update timestamp for modified files
            if file_path in self.pending_files:
                self.pending_files[file_path] = time.time()
                
    def should_skip_file(self, file_path):
        """Check if we should skip this file"""
        if file_path.name.startswith('.'):
            return True
        if file_path.suffix in ['.tmp', '.part', '.crdownload']:
            return True
        return False
        
    def process_stable_files(self):
        """Process files that haven't changed recently"""
        current_time = time.time()
        stable_files = []
        
        for file_path, last_change in list(self.pending_files.items()):
            if current_time - last_change >= self.min_stable_time:
                stable_files.append(Path(file_path))
                del self.pending_files[file_path]
                
        for file_path in stable_files:
            if file_path.exists():
                try:
                    if self.file_is_accessible(file_path):
                        file_hash = self.hash_file(file_path)
                        self.log_hash(file_path, file_hash)
                        print(f"Hashed: {file_path.name} -> {file_hash[:12]}...")
                        
                except Exception as e:
                    print(f"Error processing {file_path.name}: {e}")
                    
    def file_is_accessible(self, file_path):
        """Check if file is ready for reading"""
        try:
            # Try to open exclusively (fails if file is locked)
            with open(file_path, "rb") as f:
                f.read(1)
            return True
        except (PermissionError, OSError):
            return False
            
    # ... (hash_file and log_hash methods same as before)

This version waits for files to stabilize before hashing them. It tracks modification events and only processes files that haven't changed for at least min_stable_time seconds. Much more reliable for large files or busy directories.

You'd run the stability check in a separate thread:

import threading

def run_stability_checker(handler):
    """Run stability checker in background thread"""
    while True:
        handler.process_stable_files()
        time.sleep(0.5)

# In your main monitoring function:
stability_thread = threading.Thread(target=run_stability_checker, args=(event_handler,))
stability_thread.daemon = True
stability_thread.start()

What You Can Build With This

This file-hashing foundation scales to bigger workflows. You could pipe the hashes to a blockchain anchoring service like ProofLedger, compare against known-good checksums for backup verification, or trigger alerts when unexpected files appear.

The CSV log format makes it easy to integrate with existing tools. Load it into a database, parse it with pandas, or feed it to any system that needs file integrity tracking.

I've used variations of this pattern for evidence intake pipelines, backup monitoring, and development workflows where file changes trigger automated builds. The core principle stays the same: catch files the moment they arrive, hash them reliably, and log everything for later use.