checkpoint¶
Pipeline checkpoint and resume system for VariantCentrifuge.
This module provides a robust checkpoint system that tracks pipeline execution state, allowing resumption from the last successful step in case of interruption or failure.
- class variantcentrifuge.checkpoint.AtomicFileOperation(final_path, suffix='.tmp')[source]¶
Bases:
object
Context manager for atomic file operations with automatic temp file cleanup.
This ensures that output files are only visible in their final location after being completely written, preventing issues with partial files during pipeline interruption.
Example
>>> with AtomicFileOperation('/path/to/final/output.tsv') as temp_path: ... # Write to temp_path ... with open(temp_path, 'w') as f: ... f.write("data") ... # File is automatically moved to final location on successful exit
- class variantcentrifuge.checkpoint.FileInfo(path, size, mtime, checksum=None)[source]¶
Bases:
object
Information about a file for validation.
- classmethod from_file(filepath, calculate_checksum=False)[source]¶
Create FileInfo from an existing file.
- class variantcentrifuge.checkpoint.StepInfo(name, status, start_time=None, end_time=None, command_hash=None, input_files=<factory>, output_files=<factory>, parameters=<factory>, error=None)[source]¶
Bases:
object
Information about a pipeline step.
- Parameters:
- __init__(name, status, start_time=None, end_time=None, command_hash=None, input_files=<factory>, output_files=<factory>, parameters=<factory>, error=None)¶
- class variantcentrifuge.checkpoint.PipelineState(output_dir, enable_checksum=False)[source]¶
Bases:
object
Manages pipeline execution state for checkpoint/resume functionality.
- STATE_FILE_NAME = '.variantcentrifuge_state.json'¶
- STATE_VERSION = '1.0'¶
- load()[source]¶
Load existing state from file.
- Returns:
True if state was loaded successfully, False otherwise
- Return type:
- can_resume(configuration, pipeline_version)[source]¶
Check if pipeline can be resumed with given configuration.
- get_resume_point()[source]¶
Get the name of the last successfully completed step.
- Returns:
Name of last completed step, or None if no steps completed
- Return type:
Optional[str]
- finalize_step(step_name)[source]¶
Mark a step as finalizing (transitioning from running to completed).
This intermediate state indicates that the step’s main work is done and it’s in the process of finalizing outputs (e.g., moving temp files to final locations).
- clear_step_completion(step_name)[source]¶
Clear completion status for a step to force re-execution.
This method is used for restart functionality where you want to re-run a previously completed step.
- get_completed_stages()[source]¶
Get all completed stages sorted by completion time.
- Returns:
List of (stage_name, StepInfo) tuples sorted by completion time
- Return type:
List[tuple]
- get_available_resume_points()[source]¶
Get stage names that can be used as resume points.
- Returns:
List of stage names that are safe resume points
- Return type:
List[str]
- validate_resume_from_stage(stage_name, available_stages)[source]¶
Validate that resuming from specific stage is possible.
- get_stages_to_execute(resume_from, all_stages)[source]¶
Get ordered list of stages to execute when resuming from specific stage.
- get_detailed_status()[source]¶
Get detailed status information for enhanced display.
- Returns:
Detailed status information including suggestions
- Return type:
Dict[str, Any]
- can_resume_from_stage(stage_name, available_stages)[source]¶
Check if resuming from a specific stage is possible.
- variantcentrifuge.checkpoint.checkpoint(step_name, input_files=None, output_files=None, parameters=None)[source]¶
Add checkpoint functionality to pipeline steps.
- class variantcentrifuge.checkpoint.CheckpointContext(pipeline_state, step_name, command_hash=None, parameters=None)[source]¶
Bases:
object
Context manager for checkpoint operations within a code block.
- Parameters:
- __exit__(exc_type, exc_val, exc_tb)[source]¶
Exit the checkpoint context and save state if successful.
Overview¶
The checkpoint module provides pipeline state tracking and resume functionality for VariantCentrifuge. It enables robust handling of long-running analyses by saving progress at key pipeline steps and allowing resumption after interruptions.
Key Classes¶
PipelineState¶
The main class for managing pipeline checkpoints. It tracks:
Pipeline steps and their completion status
Input/output files for each step
Configuration hash to ensure consistency
Pipeline version for compatibility checks
CheckpointContext¶
A context manager that automatically tracks step execution:
Records start and end times
Captures input and output files
Handles errors gracefully
Saves state on successful completion
FileInfo¶
Tracks file metadata for validation:
File path, size, and modification time
Optional checksum calculation
Validation to ensure files haven’t changed
StepInfo¶
Stores information about each pipeline step:
Step name and status (pending, running, completed, failed)
Execution times
Input/output files
Step-specific parameters
Error information if failed
Usage Example¶
from variantcentrifuge.checkpoint import PipelineState, CheckpointContext
# Initialize pipeline state
pipeline_state = PipelineState(output_dir, enable_checksum=False)
pipeline_state.initialize(config, pipeline_version)
# Check if we can resume from a previous run
if pipeline_state.load() and pipeline_state.can_resume(config, pipeline_version):
logger.info("Resuming from checkpoint")
# Use checkpoint context for a pipeline step
if not pipeline_state.should_skip_step("variant_extraction"):
with CheckpointContext(pipeline_state, "variant_extraction") as ctx:
# Add input files
ctx.add_input_file(input_vcf)
# Perform the step
output_file = extract_variants(input_vcf, output_path)
# Add output files
ctx.add_output_file(output_file)
# State is automatically saved on successful completion
Parallel Processing Support¶
The checkpoint system is designed to work with parallel processing:
Thread-safe state updates
Tracks individual chunks in parallel runs
Proper ordering of parallel step completion
Handles worker failures gracefully
State File Format¶
The checkpoint state is stored as JSON in .variantcentrifuge_state.json
:
{
"version": "1.0",
"pipeline_version": "0.5.0",
"start_time": 1234567890.0,
"configuration_hash": "abc123...",
"steps": {
"gene_bed_creation": {
"name": "gene_bed_creation",
"status": "completed",
"start_time": 1234567890.0,
"end_time": 1234567891.0,
"input_files": [],
"output_files": [
{
"path": "output/genes.bed",
"size": 12345,
"mtime": 1234567891.0,
"checksum": null
}
],
"parameters": {},
"error": null
}
}
}