Part 6·6.5·30 min read

In Practice: VCF Files with Python

Parse, filter, annotate, and analyze VCF files with Python — from basic manipulation to somatic variant analysis and mutational signature decomposition.

VCFvariant analysispythonpracticalgenomics

VCF (Variant Call Format) files are the universal format for genomic variant data — the output of every variant caller, the input to every annotation pipeline, and the central data type in clinical genomics and cancer biology. Every bioinformatics practitioner who works with genomic data will spend significant time manipulating VCF files.

This chapter covers the practical skills: parsing VCFs, filtering variants, computing statistics, working with gnomAD frequencies, and performing basic mutational signature analysis.

Setup

bash
pip install cyvcf2 pandas numpy matplotlib seaborn requests
# cyvcf2 is faster than PyVCF for large files; PyVCF is also fine for learning
python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from collections import Counter, defaultdict

The VCF Format in Detail

Before writing code, understand the structure:

##fileformat=VCFv4.2                        ← format version
##reference=GRCh38                          ← reference genome
##FILTER=<ID=PASS,Description="...">        ← filter definitions
##INFO=<ID=AF,Number=A,Type=Float,...>      ← INFO field definitions
##FORMAT=<ID=GT,Number=1,Type=String,...>   ← FORMAT field definitions
#CHROM  POS   ID    REF  ALT  QUAL  FILTER  INFO              FORMAT  SAMPLE
chr17   7674220  .  G   T   100   PASS   AF=0.001;DP=50   GT:AD:DP  0/1:25,20:45

Key concepts:

  • POS is 1-based (not 0-based like Python strings)
  • REF/ALT describe the change at that position
  • INFO contains per-variant annotations (;-delimited key=value pairs)
  • FORMAT specifies the per-sample fields; SAMPLE columns contain values in that order
  • GT=0/1 means heterozygous (one ref, one alt allele); 0/0=homref; 1/1=homalt

Parsing VCF Files

python
import cyvcf2

def parse_vcf(vcf_path: str) -> list[dict]:
    """Parse a VCF file into a list of variant dictionaries."""
    variants = []
    vcf = cyvcf2.VCF(vcf_path)

    for variant in vcf:
        record = {
            "chrom": variant.CHROM,
            "pos": variant.POS,
            "ref": variant.REF,
            "alt": ",".join(variant.ALT),
            "qual": variant.QUAL,
            "filter": list(variant.FILTER),
            "id": variant.ID or ".",
        }

        # INFO fields
        for key in variant.INFO:
            record[f"info_{key}"] = variant.INFO.get(key)

        # Per-sample genotypes
        for i, sample in enumerate(vcf.samples):
            gt = variant.genotypes[i]
            record[f"gt_{sample}"] = f"{gt[0]}/{gt[1]}"
            # AD = allele depths, DP = depth
            ad = variant.format("AD")
            if ad is not None:
                record[f"ad_{sample}"] = list(ad[i])

        variants.append(record)

    vcf.close()
    return variants

Simple Parser (No Dependencies)

For learning or small files:

python
def parse_vcf_simple(vcf_path: str) -> tuple[list, list]:
    """Returns (header_lines, variant_rows)."""
    header_lines = []
    column_headers = []
    variants = []

    with open(vcf_path) as f:
        for line in f:
            line = line.rstrip()
            if line.startswith("##"):
                header_lines.append(line)
            elif line.startswith("#CHROM"):
                column_headers = line[1:].split("\t")  # remove leading #
            else:
                fields = line.split("\t")
                record = dict(zip(column_headers, fields))
                variants.append(record)

    return header_lines, column_headers, variants

def parse_info_field(info_str: str) -> dict:
    """Parse semicolon-delimited INFO field into a dict."""
    result = {}
    if info_str == ".":
        return result
    for item in info_str.split(";"):
        if "=" in item:
            key, val = item.split("=", 1)
            result[key] = val
        else:
            result[item] = True  # flag field
    return result

def parse_genotype(format_str: str, sample_str: str) -> dict:
    """Parse FORMAT + SAMPLE fields."""
    keys = format_str.split(":")
    values = sample_str.split(":")
    return dict(zip(keys, values))

Working with a VCF DataFrame

Converting to pandas makes filtering and analysis easier:

python
def vcf_to_dataframe(vcf_path: str) -> pd.DataFrame:
    """Parse VCF to a pandas DataFrame."""
    rows = []
    with open(vcf_path) as f:
        headers = []
        samples = []
        for line in f:
            line = line.rstrip()
            if line.startswith("##"):
                continue
            elif line.startswith("#CHROM"):
                headers = line[1:].split("\t")
                samples = headers[9:]  # samples start at column 9
            else:
                fields = line.split("\t")
                row = dict(zip(headers[:9], fields[:9]))

                # Parse INFO
                info = parse_info_field(row.get("INFO", "."))
                row.update({f"info_{k}": v for k, v in info.items()})

                # Parse per-sample FORMAT
                fmt = row.get("FORMAT", "")
                for i, sample in enumerate(samples):
                    if 9 + i < len(fields):
                        gt_data = parse_genotype(fmt, fields[9 + i])
                        row.update({f"{sample}_{k}": v for k, v in gt_data.items()})

                rows.append(row)

    return pd.DataFrame(rows)

Filtering Variants

Standard quality filters:

python
def filter_variants(df: pd.DataFrame, sample: str = "TUMOR") -> pd.DataFrame:
    filtered = df.copy()

    # 1. Only PASS variants
    filtered = filtered[filtered["FILTER"] == "PASS"]

    # 2. Minimum depth
    depth_col = f"{sample}_DP"
    if depth_col in filtered.columns:
        filtered[depth_col] = pd.to_numeric(filtered[depth_col], errors="coerce")
        filtered = filtered[filtered[depth_col] >= 20]

    # 3. Minimum variant allele frequency (VAF) for somatic calling
    ad_col = f"{sample}_AD"
    if ad_col in filtered.columns:
        def compute_vaf(ad_str):
            try:
                parts = str(ad_str).split(",")
                ref_count, alt_count = int(parts[0]), int(parts[1])
                total = ref_count + alt_count
                return alt_count / total if total > 0 else 0
            except:
                return 0

        filtered["vaf"] = filtered[ad_col].apply(compute_vaf)
        filtered = filtered[filtered["vaf"] >= 0.05]  # ≥5% VAF

    return filtered

print(f"Variants before filtering: {len(df)}")
df_filtered = filter_variants(df)
print(f"Variants after filtering: {len(df_filtered)}")

Computing Variant Statistics

python
def variant_statistics(df: pd.DataFrame) -> dict:
    """Compute a summary of variant types and metrics."""

    def classify_variant(row):
        ref = str(row["REF"])
        alts = str(row["ALT"]).split(",")
        alt = alts[0]  # take first alt allele

        if len(ref) == 1 and len(alt) == 1:
            return "SNV"
        elif len(ref) > len(alt):
            return "DEL"
        elif len(ref) < len(alt):
            return "INS"
        else:
            return "MNV"

    df = df.copy()
    df["variant_type"] = df.apply(classify_variant, axis=1)

    # Transition/Transversion ratio (Ti/Tv)
    def classify_substitution(row):
        if row["variant_type"] != "SNV":
            return None
        ref, alt = row["REF"], row["ALT"].split(",")[0]
        purines = {"A", "G"}
        if (ref in purines) == (alt in purines):
            return "transition"  # A↔G, C↔T
        else:
            return "transversion"  # A↔C, A↔T, G↔C, G↔T

    df["sub_type"] = df.apply(classify_substitution, axis=1)
    snvs = df[df["variant_type"] == "SNV"]
    ti = (snvs["sub_type"] == "transition").sum()
    tv = (snvs["sub_type"] == "transversion").sum()

    return {
        "total_variants": len(df),
        "variant_types": df["variant_type"].value_counts().to_dict(),
        "ti_count": int(ti),
        "tv_count": int(tv),
        "ti_tv_ratio": round(ti / tv, 3) if tv > 0 else float("inf"),
    }

stats = variant_statistics(df_filtered)
for key, val in stats.items():
    print(f"{key}: {val}")
Ti/Tv ratio as quality metric

In the human genome, the expected Ti/Tv ratio for exomes is ~2.8–3.5 (due to CpG deamination preferentially causing transitions). For whole genomes it's ~2.0–2.1. A ratio significantly below 2 suggests variant calling artifacts (false positives tend to be random → equal ti/tv). A ratio well above 3 in an exome is also worth investigating. Ti/Tv is one of the first QC checks in any variant calling pipeline.

Mutational Spectrum Analysis

python
COMPLEMENT = {"A": "T", "T": "A", "C": "G", "G": "C"}

def reverse_complement(seq: str) -> str:
    return "".join(COMPLEMENT[b] for b in reversed(seq))

def normalize_mutation(ref: str, alt: str, context: str) -> tuple[str, str]:
    """Normalize mutation to pyrimidine reference (C or T)."""
    if ref in ("C", "T"):
        return f"{context[0]}[{ref}>{alt}]{context[2]}", ref
    else:  # ref is A or G — use complement
        ref_c = COMPLEMENT[ref]
        alt_c = COMPLEMENT[alt]
        ctx_c = reverse_complement(context)
        return f"{ctx_c[0]}[{ref_c}>{alt_c}]{ctx_c[2]}", ref_c

def compute_mutation_spectrum(
    df: pd.DataFrame,
    reference_fasta_path: str = None
) -> dict:
    """
    Compute the 96-trinucleotide mutation spectrum from SNVs.
    If reference_fasta_path is None, uses simplified context from position.
    """
    spectrum = defaultdict(int)

    snvs = df[df["ALT"].str.len() == 1][df["REF"].str.len() == 1].copy()

    for _, row in snvs.iterrows():
        ref = str(row["REF"]).upper()
        alt = str(row["ALT"]).split(",")[0].upper()

        if ref not in "ACGT" or alt not in "ACGT" or ref == alt:
            continue

        # Without reference, use simplified mutation type
        mut_type = f"{ref}>{alt}"
        spectrum[mut_type] += 1

    return dict(spectrum)

spectrum = compute_mutation_spectrum(df_filtered)
print("\nMutation spectrum:")
for mut_type, count in sorted(spectrum.items()):
    bar = "█" * (count // max(1, max(spectrum.values()) // 30))
    print(f"  {mut_type}: {count:5d} {bar}")

Visualizing the SBS Spectrum

python
def plot_sbs_spectrum(spectrum: dict, title: str = "Mutation Spectrum"):
    """Plot a simplified SBS spectrum bar chart."""
    # Group by substitution type
    sub_types = ["C>A", "C>G", "C>T", "T>A", "T>C", "T>G"]
    colors = ["steelblue", "black", "red", "lightgray", "green", "pink"]

    fig, ax = plt.subplots(figsize=(10, 5))

    x_positions = range(len(sub_types))
    counts = [spectrum.get(s, 0) for s in sub_types]
    total = sum(counts)
    fractions = [c / total if total > 0 else 0 for c in counts]

    bars = ax.bar(x_positions, fractions, color=colors, edgecolor="white", linewidth=0.5)

    ax.set_xticks(x_positions)
    ax.set_xticklabels(sub_types, rotation=45, ha="right")
    ax.set_ylabel("Fraction of mutations")
    ax.set_title(title)
    ax.set_ylim(0, 1)

    for bar, count in zip(bars, counts):
        ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                str(count), ha="center", va="bottom", fontsize=9)

    plt.tight_layout()
    plt.savefig("mutation_spectrum.png", dpi=150)
    plt.show()

plot_sbs_spectrum(spectrum)

Querying gnomAD for Population Frequencies

python
def query_gnomad_v4(chrom: str, pos: int, ref: str, alt: str) -> dict:
    """Query gnomAD v4 GraphQL API for variant allele frequency."""
    query = """
    query VariantDetails($variantId: String!) {
        variant(variantId: $variantId, dataset: gnomad_r4) {
            variantId
            exome {
                ac
                an
                af
                filters
            }
            genome {
                ac
                an
                af
                filters
            }
        }
    }
    """
    import requests

    # gnomAD variant ID format: chrom-pos-ref-alt
    variant_id = f"{chrom.replace('chr', '')}-{pos}-{ref}-{alt}"

    response = requests.post(
        "https://gnomad.broadinstitute.org/api",
        json={"query": query, "variables": {"variantId": variant_id}},
        headers={"Content-Type": "application/json"},
        timeout=10
    )

    if response.status_code != 200:
        return {"error": f"HTTP {response.status_code}"}

    data = response.json()
    if "errors" in data:
        return {"error": data["errors"]}

    variant_data = data.get("data", {}).get("variant", {})
    if not variant_data:
        return {"ac": 0, "an": 0, "af": 0, "found": False}

    exome = variant_data.get("exome") or {}
    genome = variant_data.get("genome") or {}

    return {
        "found": True,
        "exome_af": exome.get("af", 0),
        "exome_ac": exome.get("ac", 0),
        "genome_af": genome.get("af", 0),
        "genome_ac": genome.get("ac", 0),
        "combined_af": (exome.get("af", 0) or 0) + (genome.get("af", 0) or 0)
    }

# Example: look up a known variant
tp53_variant = query_gnomad_v4("chr17", 7674220, "G", "T")
print(f"TP53 R248W in gnomAD: {tp53_variant}")

Complete Analysis Pipeline

python
def analyze_tumor_vcf(vcf_path: str, sample_name: str = "TUMOR") -> dict:
    """Full pipeline: load VCF → filter → compute statistics → check gnomAD."""
    import time

    print(f"Loading {vcf_path}...")
    df = vcf_to_dataframe(vcf_path)
    print(f"  Loaded {len(df)} raw variants")

    df_filt = filter_variants(df, sample_name)
    print(f"  After filtering: {len(df_filt)} variants")

    stats = variant_statistics(df_filt)
    spectrum = compute_mutation_spectrum(df_filt)

    # Check gnomAD for top variants
    print("  Querying gnomAD for top variants...")
    gnomad_results = []
    for _, row in df_filt.head(10).iterrows():
        try:
            result = query_gnomad_v4(row["CHROM"], int(row["POS"]), row["REF"], row["ALT"])
            gnomad_results.append({
                "variant": f"{row['CHROM']}:{row['POS']} {row['REF']}>{row['ALT']}",
                "gnomad_af": result.get("combined_af", 0),
                "in_gnomad": result.get("found", False)
            })
            time.sleep(0.3)  # rate limiting
        except Exception as e:
            gnomad_results.append({"variant": f"{row['POS']}", "error": str(e)})

    return {
        "stats": stats,
        "spectrum": spectrum,
        "gnomad_check": gnomad_results
    }

Creating a Synthetic VCF for Testing

python
def create_synthetic_vcf(output_path: str, n_variants: int = 100):
    """Create a synthetic VCF for testing pipelines."""
    import random
    random.seed(42)

    header = [
        "##fileformat=VCFv4.2",
        "##reference=GRCh38",
        "##FILTER=<ID=PASS,Description='All filters passed'>",
        "##FORMAT=<ID=GT,Number=1,Type=String,Description='Genotype'>",
        "##FORMAT=<ID=DP,Number=1,Type=Integer,Description='Total depth'>",
        "##FORMAT=<ID=AD,Number=R,Type=Integer,Description='Allele depths'>",
        "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tTUMOR",
    ]

    bases = list("ACGT")
    chroms = [f"chr{i}" for i in range(1, 23)] + ["chrX"]

    with open(output_path, "w") as f:
        f.write("\n".join(header) + "\n")
        for _ in range(n_variants):
            chrom = random.choice(chroms)
            pos = random.randint(1000000, 200000000)
            ref = random.choice(bases)
            alt = random.choice([b for b in bases if b != ref])
            qual = random.randint(50, 200)
            depth = random.randint(30, 200)
            alt_count = random.randint(5, depth // 2)
            ref_count = depth - alt_count
            line = f"{chrom}\t{pos}\t.\t{ref}\t{alt}\t{qual}\tPASS\t.\tGT:DP:AD\t0/1:{depth}:{ref_count},{alt_count}"
            f.write(line + "\n")

    print(f"Created {output_path} with {n_variants} synthetic variants")

# Create test data
create_synthetic_vcf("test.vcf", 200)
df = vcf_to_dataframe("test.vcf")
df_filt = filter_variants(df)
stats = variant_statistics(df_filt)
print(f"\nTest VCF stats: {stats}")

Summary

TaskTool/Approach
Parse VCFcyvcf2 (fast), PyVCF (pure Python), or manual parsing
Filter variantsFILTER=PASS, depth, VAF thresholds
Compute statisticsTi/Tv ratio, variant type distribution
AnnotateVEP, ANNOVAR, SnpEff (command-line)
Query population freqgnomAD GraphQL API or local tabix index
Mutational signaturesSigProfilerExtractor, MutationalPatterns (R), or custom

For large-scale variant analysis (whole genome cohorts), use command-line tools: bcftools for manipulation, VEP or ANNOVAR for annotation, GATK or DeepVariant for variant calling. Python scripts wrap these tools and parse their outputs.