checkpoint

Pipeline checkpoint and resume system for VariantCentrifuge.

This module provides a robust checkpoint system that tracks pipeline execution state, allowing resumption from the last successful step in case of interruption or failure.

class variantcentrifuge.checkpoint.AtomicFileOperation(final_path, suffix='.tmp')[source]

Bases: object

Context manager for atomic file operations with automatic temp file cleanup.

This ensures that output files are only visible in their final location after being completely written, preventing issues with partial files during pipeline interruption.

Example

>>> with AtomicFileOperation('/path/to/final/output.tsv') as temp_path:
...     # Write to temp_path
...     with open(temp_path, 'w') as f:
...         f.write("data")
...     # File is automatically moved to final location on successful exit
Parameters:
  • final_path (str)

  • suffix (str)

__init__(final_path, suffix='.tmp')[source]

Initialize atomic file operation.

Parameters:
  • final_path (str) – Final path where the file should end up

  • suffix (str) – Suffix for temporary file (default: “.tmp”)

__enter__()[source]

Enter context and return temporary file path.

Return type:

Path

__exit__(exc_type, exc_val, exc_tb)[source]

Exit context and move temp file to final location if successful.

class variantcentrifuge.checkpoint.FileInfo(path, size, mtime, checksum=None)[source]

Bases: object

Information about a file for validation.

Parameters:
path: str
size: int
mtime: float
checksum: Optional[str] = None
classmethod from_file(filepath, calculate_checksum=False)[source]

Create FileInfo from an existing file.

Parameters:
  • filepath (str)

  • calculate_checksum (bool)

Return type:

FileInfo

validate(calculate_checksum=False)[source]

Validate that the file matches the stored info.

Parameters:

calculate_checksum (bool)

Return type:

bool

__init__(path, size, mtime, checksum=None)
Parameters:
class variantcentrifuge.checkpoint.StepInfo(name, status, start_time=None, end_time=None, command_hash=None, input_files=<factory>, output_files=<factory>, parameters=<factory>, error=None)[source]

Bases: object

Information about a pipeline step.

Parameters:
name: str
status: str
start_time: Optional[float] = None
end_time: Optional[float] = None
command_hash: Optional[str] = None
input_files: List[FileInfo]
output_files: List[FileInfo]
parameters: Dict[str, Any]
error: Optional[str] = None
property duration: float | None

Calculate step duration in seconds.

to_dict()[source]

Convert to dictionary for JSON serialization.

Return type:

Dict[str, Any]

classmethod from_dict(data)[source]

Create StepInfo from dictionary.

Parameters:

data (Dict[str, Any])

Return type:

StepInfo

__init__(name, status, start_time=None, end_time=None, command_hash=None, input_files=<factory>, output_files=<factory>, parameters=<factory>, error=None)
Parameters:
class variantcentrifuge.checkpoint.PipelineState(output_dir, enable_checksum=False)[source]

Bases: object

Manages pipeline execution state for checkpoint/resume functionality.

Parameters:
  • output_dir (str)

  • enable_checksum (bool)

STATE_FILE_NAME = '.variantcentrifuge_state.json'
STATE_VERSION = '1.0'
__init__(output_dir, enable_checksum=False)[source]

Initialize pipeline state manager.

Parameters:
  • output_dir (str) – Directory where pipeline outputs and state file are stored

  • enable_checksum (bool) – Whether to calculate file checksums (slower but more reliable)

initialize(configuration, pipeline_version)[source]

Initialize a new pipeline run.

Parameters:
Return type:

None

load()[source]

Load existing state from file.

Returns:

True if state was loaded successfully, False otherwise

Return type:

bool

save()[source]

Save current state to file (thread-safe).

Return type:

None

can_resume(configuration, pipeline_version)[source]

Check if pipeline can be resumed with given configuration.

Parameters:
  • configuration (Dict[str, Any]) – Current pipeline configuration

  • pipeline_version (str) – Current pipeline version

Returns:

True if resume is possible, False otherwise

Return type:

bool

get_resume_point()[source]

Get the name of the last successfully completed step.

Returns:

Name of last completed step, or None if no steps completed

Return type:

Optional[str]

should_skip_step(step_name)[source]

Check if a step should be skipped (already completed).

Parameters:

step_name (str) – Name of the step to check

Returns:

True if step should be skipped, False otherwise

Return type:

bool

start_step(step_name, command_hash=None, parameters=None)[source]

Mark a step as started.

Parameters:
Return type:

None

complete_step(step_name, input_files=None, output_files=None)[source]

Mark a step as completed.

Parameters:
Return type:

None

finalize_step(step_name)[source]

Mark a step as finalizing (transitioning from running to completed).

This intermediate state indicates that the step’s main work is done and it’s in the process of finalizing outputs (e.g., moving temp files to final locations).

Parameters:

step_name (str) – Name of the step to mark as finalizing

Return type:

None

fail_step(step_name, error)[source]

Mark a step as failed.

Parameters:
Return type:

None

clear_step_completion(step_name)[source]

Clear completion status for a step to force re-execution.

This method is used for restart functionality where you want to re-run a previously completed step.

Parameters:

step_name (str) – Name of the step to clear completion status for

Return type:

None

get_summary()[source]

Get a human-readable summary of the pipeline state.

Return type:

str

get_completed_stages()[source]

Get all completed stages sorted by completion time.

Returns:

List of (stage_name, StepInfo) tuples sorted by completion time

Return type:

List[tuple]

get_available_resume_points()[source]

Get stage names that can be used as resume points.

Returns:

List of stage names that are safe resume points

Return type:

List[str]

validate_resume_from_stage(stage_name, available_stages)[source]

Validate that resuming from specific stage is possible.

Parameters:
  • stage_name (str) – Name of the stage to resume from

  • available_stages (List[str]) – List of all available stage names in current configuration

Returns:

(is_valid: bool, error_message: str)

Return type:

tuple

get_stages_to_execute(resume_from, all_stages)[source]

Get ordered list of stages to execute when resuming from specific stage.

Parameters:
  • resume_from (str) – Name of the stage to resume from

  • all_stages (List[str]) – List of all stage names in execution order

Returns:

Ordered list of stages to execute starting from resume_from

Return type:

List[str]

get_detailed_status()[source]

Get detailed status information for enhanced display.

Returns:

Detailed status information including suggestions

Return type:

Dict[str, Any]

can_resume_from_stage(stage_name, available_stages)[source]

Check if resuming from a specific stage is possible.

Parameters:
  • stage_name (str) – Name of the stage to resume from

  • available_stages (List[str]) – List of all available stage names

Returns:

True if resume is possible, False otherwise

Return type:

bool

cleanup_stale_stages()[source]

Clean up stages that were left in ‘running’ state from previous pipeline runs.

Return type:

None

__getstate__()[source]

Exclude the lock from serialization.

__setstate__(state)[source]

Recreate the lock after unpickling.

variantcentrifuge.checkpoint.checkpoint(step_name, input_files=None, output_files=None, parameters=None)[source]

Add checkpoint functionality to pipeline steps.

Parameters:
  • step_name (str) – Unique name for this pipeline step

  • input_files (Optional[Union[str, List[str]]]) – Input file(s) to track

  • output_files (Optional[Union[str, List[str]]]) – Output file(s) to track

  • parameters (Optional[Dict[str, Any]]) – Additional parameters to record

class variantcentrifuge.checkpoint.CheckpointContext(pipeline_state, step_name, command_hash=None, parameters=None)[source]

Bases: object

Context manager for checkpoint operations within a code block.

Parameters:
__init__(pipeline_state, step_name, command_hash=None, parameters=None)[source]
Parameters:
__enter__()[source]

Enter the checkpoint context.

__exit__(exc_type, exc_val, exc_tb)[source]

Exit the checkpoint context and save state if successful.

add_input_file(filepath)[source]

Add an input file to track.

Parameters:

filepath (str)

Return type:

None

add_output_file(filepath)[source]

Add an output file to track.

Parameters:

filepath (str)

Return type:

None

Overview

The checkpoint module provides pipeline state tracking and resume functionality for VariantCentrifuge. It enables robust handling of long-running analyses by saving progress at key pipeline steps and allowing resumption after interruptions.

Key Classes

PipelineState

The main class for managing pipeline checkpoints. It tracks:

  • Pipeline steps and their completion status

  • Input/output files for each step

  • Configuration hash to ensure consistency

  • Pipeline version for compatibility checks

CheckpointContext

A context manager that automatically tracks step execution:

  • Records start and end times

  • Captures input and output files

  • Handles errors gracefully

  • Saves state on successful completion

FileInfo

Tracks file metadata for validation:

  • File path, size, and modification time

  • Optional checksum calculation

  • Validation to ensure files haven’t changed

StepInfo

Stores information about each pipeline step:

  • Step name and status (pending, running, completed, failed)

  • Execution times

  • Input/output files

  • Step-specific parameters

  • Error information if failed

Usage Example

from variantcentrifuge.checkpoint import PipelineState, CheckpointContext

# Initialize pipeline state
pipeline_state = PipelineState(output_dir, enable_checksum=False)
pipeline_state.initialize(config, pipeline_version)

# Check if we can resume from a previous run
if pipeline_state.load() and pipeline_state.can_resume(config, pipeline_version):
    logger.info("Resuming from checkpoint")

# Use checkpoint context for a pipeline step
if not pipeline_state.should_skip_step("variant_extraction"):
    with CheckpointContext(pipeline_state, "variant_extraction") as ctx:
        # Add input files
        ctx.add_input_file(input_vcf)

        # Perform the step
        output_file = extract_variants(input_vcf, output_path)

        # Add output files
        ctx.add_output_file(output_file)

        # State is automatically saved on successful completion

Parallel Processing Support

The checkpoint system is designed to work with parallel processing:

  • Thread-safe state updates

  • Tracks individual chunks in parallel runs

  • Proper ordering of parallel step completion

  • Handles worker failures gracefully

State File Format

The checkpoint state is stored as JSON in .variantcentrifuge_state.json:

{
  "version": "1.0",
  "pipeline_version": "0.5.0",
  "start_time": 1234567890.0,
  "configuration_hash": "abc123...",
  "steps": {
    "gene_bed_creation": {
      "name": "gene_bed_creation",
      "status": "completed",
      "start_time": 1234567890.0,
      "end_time": 1234567891.0,
      "input_files": [],
      "output_files": [
        {
          "path": "output/genes.bed",
          "size": 12345,
          "mtime": 1234567891.0,
          "checksum": null
        }
      ],
      "parameters": {},
      "error": null
    }
  }
}