Resume System¶
VariantCentrifuge includes a robust checkpoint and resume system that allows you to recover from interrupted pipeline runs, significantly reducing computational waste and improving the user experience for long-running analyses.
Overview¶
The resume system automatically saves pipeline state to disk as stages complete, allowing you to:
Resume interrupted pipelines without losing progress
Skip completed stages and continue from where you left off
Recover from system failures or accidental interruptions
Optimize development workflows by resuming from specific stages
Quick Start¶
Basic Resume¶
# Run initial pipeline
variantcentrifuge --gene-name PKD1 --vcf-file input.vcf.gz --enable-checkpoint
# If interrupted, resume with the same command + --resume
variantcentrifuge --gene-name PKD1 --vcf-file input.vcf.gz --enable-checkpoint --resume
Resume from Specific Stage¶
# Resume from a specific stage (advanced usage)
variantcentrifuge --gene-name PKD1 --vcf-file input.vcf.gz --enable-checkpoint --resume-from dataframe_loading
How It Works¶
Checkpoint State File¶
The resume system creates a .variantcentrifuge_state.json file in your output directory that tracks:
Stage completion status (
pending,running,finalizing,completed,failed)Execution times for performance monitoring
Input/output files with optional checksums for validation
Configuration hash to detect parameter changes
Pipeline version to ensure compatibility
Stage Lifecycle¶
Each pipeline stage follows this checkpoint lifecycle:
Start: Stage begins execution, marked as
runningProcess: Stage performs its main work
Finalize (optional): Stage enters
finalizingstate while moving temp files to final locationsComplete: Stage finishes, marked as
completedwith output files recordedSkip: On resume, completed stages are skipped and their state is restored
Atomic File Operations¶
For reliability, stages use atomic file operations to prevent partial file corruption:
from variantcentrifuge.checkpoint import AtomicFileOperation
# Safe file creation
with AtomicFileOperation('/path/to/final/output.tsv') as temp_path:
# Write to temporary file
with open(temp_path, 'w') as f:
f.write("data")
# File is atomically moved to final location on success
Configuration¶
Enabling Checkpoints¶
# Enable checkpoint system
variantcentrifuge --enable-checkpoint [other options]
# Enable with checksum validation (RECOMMENDED FOR PRODUCTION)
variantcentrifuge --enable-checkpoint --checkpoint-checksum [other options]
# Enable with custom output directory
variantcentrifuge --enable-checkpoint --output-dir /path/to/output [other options]
Checksum Validation (Production Recommendation)¶
For maximum reliability, especially in production environments, enable checksum validation:
# Production-grade checkpoint validation
variantcentrifuge --enable-checkpoint --checkpoint-checksum [other options]
Benefits of checksum validation:
File integrity verification: Detects corrupted or truncated files
Reliable recovery: Only recovers stages with verified complete outputs
Production safety: Prevents silent failures from partial files
Without checksum validation:
Faster checkpoint operations (size/time-based validation only)
Interrupted stages are always re-executed for safety
Suitable for development or when performance is critical
Resume Options¶
Option |
Description |
Example |
|---|---|---|
|
Resume from last completed stage |
|
|
Restart from specific stage (re-execute stage and all subsequent stages) |
|
|
Enable checkpoint system |
|
|
Enable checksum validation (recommended for production) |
|
Stage Types and Resume Behavior¶
File-Based Stages¶
Stages that produce output files (e.g., parallel_complete_processing, genotype_replacement):
# Example: GenotypeReplacementStage
def _handle_checkpoint_skip(self, context: PipelineContext) -> PipelineContext:
"""Restore file paths when stage is skipped."""
output_tsv = context.workspace.get_intermediate_path(
f"{context.workspace.base_name}.genotype_replaced.tsv.gz"
)
if output_tsv.exists():
context.genotype_replaced_tsv = output_tsv
context.data = output_tsv
return context
Memory-Based Stages¶
Stages that work with DataFrames (e.g., dataframe_loading, custom_annotation):
# Example: DataFrameLoadingStage
def _handle_checkpoint_skip(self, context: PipelineContext) -> PipelineContext:
"""Restore DataFrame from TSV file when stage is skipped."""
input_file = self._find_input_file(context)
df = pd.read_csv(input_file, sep="\t", dtype=str, compression="gzip")
context.current_dataframe = df
return context
Composite Stages¶
Stages that represent multiple operations (e.g., parallel_complete_processing):
# Example: ParallelCompleteProcessingStage
def _handle_checkpoint_skip(self, context: PipelineContext) -> PipelineContext:
"""Mark constituent stages as complete when composite stage is skipped."""
# Restore output file
context.extracted_tsv = merged_tsv
# Mark constituent stages as complete
context.mark_complete("variant_extraction")
context.mark_complete("field_extraction")
context.mark_complete("snpsift_filtering")
return context
Resume Validation¶
The resume system performs several validation checks:
Configuration Consistency¶
# Configuration hash validation
current_hash = self._hash_configuration(configuration)
stored_hash = self.state.get("configuration_hash")
if current_hash != stored_hash:
logger.warning("Configuration has changed, cannot resume")
return False
Pipeline Version Compatibility¶
# Version compatibility check
if self.state.get("pipeline_version") != pipeline_version:
logger.warning("Pipeline version mismatch, cannot resume")
return False
File Existence Validation¶
# Output file validation
for step_name, step_info in self.state["steps"].items():
if step_info.status == "completed":
for file_info in step_info.output_files:
if not file_info.validate():
logger.warning(f"Output file validation failed: {file_info.path}")
return False
Stale State Recovery¶
The system handles interrupted pipeline runs with robust recovery logic that prioritizes data integrity:
Improved Recovery Strategy¶
Safety-First Approach: For maximum reliability, interrupted stages (running or finalizing) are always re-executed unless checksum validation confirms complete files.
# Enhanced recovery logic
if step_info.status in ("running", "finalizing"):
logger.warning(f"Found stale {step_info.status} stage, marking for re-execution")
# Only attempt recovery with checksum validation enabled
if self.enable_checksum and step_info.output_files:
logger.info("Attempting checksum-based recovery")
# Validate all output files with checksums
all_valid = True
for file_info in step_info.output_files:
if not file_info.validate(calculate_checksum=True):
all_valid = False
break
if all_valid:
# Files verified - mark as completed
step_info.status = "completed"
return True
# Mark for re-execution (safe default)
step_info.status = "failed"
step_info.error = "Pipeline was interrupted - stage will be re-executed for safety"
Recovery Modes¶
1. Checksum Validation Mode (--checkpoint-checksum):
Validates file integrity using SHA256 checksums
Recovers stages only when files are verified as complete
Recommended for production pipelines
2. Conservative Mode (default):
Always re-executes interrupted stages for safety
Faster checkpoint operations (no checksum calculation)
Prevents potential issues from partial/corrupt files
Finalizing State¶
The new finalizing state indicates stages that are moving temporary files to final locations:
# Stage progression with finalizing state
context.checkpoint_state.start_step("example_stage") # running
context.checkpoint_state.finalize_step("example_stage") # finalizing
context.checkpoint_state.complete_step("example_stage") # completed
This intermediate state ensures that:
Partial files are never considered complete
Atomic file operations are properly tracked
Recovery logic can distinguish between main processing and file finalization
Advanced Usage¶
Resume from Specific Stage¶
# Resume from dataframe_loading stage
variantcentrifuge --gene-name PKD1 --vcf-file input.vcf.gz \
--enable-checkpoint --resume-from dataframe_loading
# Resume from variant_analysis stage
variantcentrifuge --gene-name PKD1 --vcf-file input.vcf.gz \
--enable-checkpoint --resume-from variant_analysis
Development Workflow¶
# Initial run with checkpoint
variantcentrifuge --gene-name PKD1 --vcf-file input.vcf.gz --enable-checkpoint
# Modify scoring configuration and resume from scoring stage
variantcentrifuge --gene-name PKD1 --vcf-file input.vcf.gz --enable-checkpoint \
--scoring-config-path new_scoring --resume-from variant_scoring
# Resume from report generation
variantcentrifuge --gene-name PKD1 --vcf-file input.vcf.gz --enable-checkpoint \
--resume-from tsv_output
Pipeline State Summary¶
The resume system provides detailed state information:
Pipeline State Summary:
State file: /path/to/output/.variantcentrifuge_state.json
Pipeline version: 0.5.0
Started: 2025-07-14 12:59:31
Steps:
✓ parallel_complete_processing (22.0s)
✓ genotype_replacement (2.9s)
✓ phenotype_integration (0.1s)
✓ dataframe_loading (0.1s)
◐ custom_annotation (finalizing)
→ inheritance_analysis (running)
✗ variant_scoring
Error: Pipeline was interrupted - stage will be re-executed for safety
Performance Benefits¶
Time Savings¶
Skip completed stages: Avoid re-running expensive operations
Parallel stage optimization: Resume from parallel processing points
Memory efficiency: Restore only necessary data structures
Development Efficiency¶
Iterative development: Test changes without full pipeline runs
Parameter tuning: Resume from analysis stages with new parameters
Report generation: Re-generate reports without reprocessing data
Troubleshooting¶
Common Issues¶
Configuration Mismatch¶
Cannot resume - configuration or version mismatch
Solution: Ensure all parameters match the original run, or start fresh without --resume.
Missing Output Files¶
Output file validation failed for step 'stage_name': /path/to/file.tsv
Solution: Check that intermediate files weren’t manually deleted. Remove checkpoint file to start fresh.
Corrupted Checkpoint¶
Checkpoint appears corrupted (all stages were stale)
Solution: Delete the .variantcentrifuge_state.json file and start fresh.
Manual Checkpoint Management¶
# Remove checkpoint file to start fresh
rm /path/to/output/.variantcentrifuge_state.json
# Check checkpoint status
variantcentrifuge --enable-checkpoint --resume --dry-run
Best Practices¶
1. Production Environments¶
# Production-grade configuration with checksum validation
variantcentrifuge --vcf-file large_cohort.vcf.gz \
--enable-checkpoint --checkpoint-checksum \
--output-dir /reliable/path [options]
Production recommendations:
Always use
--checkpoint-checksumfor file integrity verificationUse dedicated output directories with sufficient disk space
Monitor checkpoint file sizes (larger with checksums enabled)
Consider backup strategies for checkpoint files in critical workflows
2. Development Environments¶
# Development configuration optimized for speed
variantcentrifuge --vcf-file test_data.vcf.gz \
--enable-checkpoint \
--resume-from analysis_stage [options]
Development recommendations:
Checksum validation optional (faster iteration)
Use
--resume-fromfor rapid testing of specific stagesClean checkpoint files when changing major parameters
3. Consistent Output Directories¶
# Use same output directory for resume
variantcentrifuge --output-dir /consistent/path --enable-checkpoint [options]
4. Parameter Validation¶
# Verify parameters before resuming
variantcentrifuge --resume --dry-run [options]
5. File Safety Best Practices¶
# Use atomic file operations in custom stages
from variantcentrifuge.checkpoint import AtomicFileOperation
def write_output_safely(data, output_path):
with AtomicFileOperation(output_path) as temp_path:
# Write to temp file
with open(temp_path, 'w') as f:
f.write(data)
# Automatically moved to final location on success
Technical Implementation¶
State Management¶
The PipelineState class manages checkpoint state:
class PipelineState:
def __init__(self, output_dir: str, enable_checksum: bool = False):
self.state_file = os.path.join(output_dir, ".variantcentrifuge_state.json")
self.state = {
"version": "1.0",
"pipeline_version": None,
"start_time": None,
"steps": {},
"configuration_hash": None
}
Stage Integration¶
Stages integrate with the checkpoint system through:
class Stage(ABC):
def __call__(self, context: PipelineContext) -> PipelineContext:
# Check if stage should be skipped
if context.checkpoint_state.should_skip_step(self.name):
context.mark_complete(self.name)
# Restore state if needed
if hasattr(self, '_handle_checkpoint_skip'):
context = self._handle_checkpoint_skip(context)
return context
# Execute stage normally
result = self._process(context)
# Mark complete with output files
context.checkpoint_state.complete_step(
self.name,
output_files=self.get_output_files(context)
)
return result
Future Enhancements¶
Planned Features¶
Incremental updates: Resume with modified input files
Parallel resume: Resume multiple branches simultaneously
Cloud storage: Store checkpoints in cloud storage
Checkpoint compression: Reduce checkpoint file size
Visual resume interface: GUI for selecting resume points
API Extensions¶
# Future API for programmatic resume control
from variantcentrifuge.checkpoint import PipelineCheckpoint
checkpoint = PipelineCheckpoint("/path/to/output")
if checkpoint.can_resume():
pipeline.resume_from(checkpoint.get_last_stage())