VCF (Variant Call Format) files are the universal format for genomic variant data — the output of every variant caller, the input to every annotation pipeline, and the central data type in clinical genomics and cancer biology. Every bioinformatics practitioner who works with genomic data will spend significant time manipulating VCF files.
This chapter covers the practical skills: parsing VCFs, filtering variants, computing statistics, working with gnomAD frequencies, and performing basic mutational signature analysis.
Setup
pip install cyvcf2 pandas numpy matplotlib seaborn requests
# cyvcf2 is faster than PyVCF for large files; PyVCF is also fine for learning
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from collections import Counter, defaultdict
The VCF Format in Detail
Before writing code, understand the structure:
##fileformat=VCFv4.2 ← format version
##reference=GRCh38 ← reference genome
##FILTER=<ID=PASS,Description="..."> ← filter definitions
##INFO=<ID=AF,Number=A,Type=Float,...> ← INFO field definitions
##FORMAT=<ID=GT,Number=1,Type=String,...> ← FORMAT field definitions
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SAMPLE
chr17 7674220 . G T 100 PASS AF=0.001;DP=50 GT:AD:DP 0/1:25,20:45
Key concepts:
- POS is 1-based (not 0-based like Python strings)
- REF/ALT describe the change at that position
- INFO contains per-variant annotations (;-delimited key=value pairs)
- FORMAT specifies the per-sample fields; SAMPLE columns contain values in that order
- GT=0/1 means heterozygous (one ref, one alt allele); 0/0=homref; 1/1=homalt
Parsing VCF Files
Using cyvcf2 (Recommended for Large Files)
import cyvcf2
def parse_vcf(vcf_path: str) -> list[dict]:
"""Parse a VCF file into a list of variant dictionaries."""
variants = []
vcf = cyvcf2.VCF(vcf_path)
for variant in vcf:
record = {
"chrom": variant.CHROM,
"pos": variant.POS,
"ref": variant.REF,
"alt": ",".join(variant.ALT),
"qual": variant.QUAL,
"filter": list(variant.FILTER),
"id": variant.ID or ".",
}
# INFO fields
for key in variant.INFO:
record[f"info_{key}"] = variant.INFO.get(key)
# Per-sample genotypes
for i, sample in enumerate(vcf.samples):
gt = variant.genotypes[i]
record[f"gt_{sample}"] = f"{gt[0]}/{gt[1]}"
# AD = allele depths, DP = depth
ad = variant.format("AD")
if ad is not None:
record[f"ad_{sample}"] = list(ad[i])
variants.append(record)
vcf.close()
return variants
Simple Parser (No Dependencies)
For learning or small files:
def parse_vcf_simple(vcf_path: str) -> tuple[list, list]:
"""Returns (header_lines, variant_rows)."""
header_lines = []
column_headers = []
variants = []
with open(vcf_path) as f:
for line in f:
line = line.rstrip()
if line.startswith("##"):
header_lines.append(line)
elif line.startswith("#CHROM"):
column_headers = line[1:].split("\t") # remove leading #
else:
fields = line.split("\t")
record = dict(zip(column_headers, fields))
variants.append(record)
return header_lines, column_headers, variants
def parse_info_field(info_str: str) -> dict:
"""Parse semicolon-delimited INFO field into a dict."""
result = {}
if info_str == ".":
return result
for item in info_str.split(";"):
if "=" in item:
key, val = item.split("=", 1)
result[key] = val
else:
result[item] = True # flag field
return result
def parse_genotype(format_str: str, sample_str: str) -> dict:
"""Parse FORMAT + SAMPLE fields."""
keys = format_str.split(":")
values = sample_str.split(":")
return dict(zip(keys, values))
Working with a VCF DataFrame
Converting to pandas makes filtering and analysis easier:
def vcf_to_dataframe(vcf_path: str) -> pd.DataFrame:
"""Parse VCF to a pandas DataFrame."""
rows = []
with open(vcf_path) as f:
headers = []
samples = []
for line in f:
line = line.rstrip()
if line.startswith("##"):
continue
elif line.startswith("#CHROM"):
headers = line[1:].split("\t")
samples = headers[9:] # samples start at column 9
else:
fields = line.split("\t")
row = dict(zip(headers[:9], fields[:9]))
# Parse INFO
info = parse_info_field(row.get("INFO", "."))
row.update({f"info_{k}": v for k, v in info.items()})
# Parse per-sample FORMAT
fmt = row.get("FORMAT", "")
for i, sample in enumerate(samples):
if 9 + i < len(fields):
gt_data = parse_genotype(fmt, fields[9 + i])
row.update({f"{sample}_{k}": v for k, v in gt_data.items()})
rows.append(row)
return pd.DataFrame(rows)
Filtering Variants
Standard quality filters:
def filter_variants(df: pd.DataFrame, sample: str = "TUMOR") -> pd.DataFrame:
filtered = df.copy()
# 1. Only PASS variants
filtered = filtered[filtered["FILTER"] == "PASS"]
# 2. Minimum depth
depth_col = f"{sample}_DP"
if depth_col in filtered.columns:
filtered[depth_col] = pd.to_numeric(filtered[depth_col], errors="coerce")
filtered = filtered[filtered[depth_col] >= 20]
# 3. Minimum variant allele frequency (VAF) for somatic calling
ad_col = f"{sample}_AD"
if ad_col in filtered.columns:
def compute_vaf(ad_str):
try:
parts = str(ad_str).split(",")
ref_count, alt_count = int(parts[0]), int(parts[1])
total = ref_count + alt_count
return alt_count / total if total > 0 else 0
except:
return 0
filtered["vaf"] = filtered[ad_col].apply(compute_vaf)
filtered = filtered[filtered["vaf"] >= 0.05] # ≥5% VAF
return filtered
print(f"Variants before filtering: {len(df)}")
df_filtered = filter_variants(df)
print(f"Variants after filtering: {len(df_filtered)}")
Computing Variant Statistics
def variant_statistics(df: pd.DataFrame) -> dict:
"""Compute a summary of variant types and metrics."""
def classify_variant(row):
ref = str(row["REF"])
alts = str(row["ALT"]).split(",")
alt = alts[0] # take first alt allele
if len(ref) == 1 and len(alt) == 1:
return "SNV"
elif len(ref) > len(alt):
return "DEL"
elif len(ref) < len(alt):
return "INS"
else:
return "MNV"
df = df.copy()
df["variant_type"] = df.apply(classify_variant, axis=1)
# Transition/Transversion ratio (Ti/Tv)
def classify_substitution(row):
if row["variant_type"] != "SNV":
return None
ref, alt = row["REF"], row["ALT"].split(",")[0]
purines = {"A", "G"}
if (ref in purines) == (alt in purines):
return "transition" # A↔G, C↔T
else:
return "transversion" # A↔C, A↔T, G↔C, G↔T
df["sub_type"] = df.apply(classify_substitution, axis=1)
snvs = df[df["variant_type"] == "SNV"]
ti = (snvs["sub_type"] == "transition").sum()
tv = (snvs["sub_type"] == "transversion").sum()
return {
"total_variants": len(df),
"variant_types": df["variant_type"].value_counts().to_dict(),
"ti_count": int(ti),
"tv_count": int(tv),
"ti_tv_ratio": round(ti / tv, 3) if tv > 0 else float("inf"),
}
stats = variant_statistics(df_filtered)
for key, val in stats.items():
print(f"{key}: {val}")
In the human genome, the expected Ti/Tv ratio for exomes is ~2.8–3.5 (due to CpG deamination preferentially causing transitions). For whole genomes it's ~2.0–2.1. A ratio significantly below 2 suggests variant calling artifacts (false positives tend to be random → equal ti/tv). A ratio well above 3 in an exome is also worth investigating. Ti/Tv is one of the first QC checks in any variant calling pipeline.
Mutational Spectrum Analysis
COMPLEMENT = {"A": "T", "T": "A", "C": "G", "G": "C"}
def reverse_complement(seq: str) -> str:
return "".join(COMPLEMENT[b] for b in reversed(seq))
def normalize_mutation(ref: str, alt: str, context: str) -> tuple[str, str]:
"""Normalize mutation to pyrimidine reference (C or T)."""
if ref in ("C", "T"):
return f"{context[0]}[{ref}>{alt}]{context[2]}", ref
else: # ref is A or G — use complement
ref_c = COMPLEMENT[ref]
alt_c = COMPLEMENT[alt]
ctx_c = reverse_complement(context)
return f"{ctx_c[0]}[{ref_c}>{alt_c}]{ctx_c[2]}", ref_c
def compute_mutation_spectrum(
df: pd.DataFrame,
reference_fasta_path: str = None
) -> dict:
"""
Compute the 96-trinucleotide mutation spectrum from SNVs.
If reference_fasta_path is None, uses simplified context from position.
"""
spectrum = defaultdict(int)
snvs = df[df["ALT"].str.len() == 1][df["REF"].str.len() == 1].copy()
for _, row in snvs.iterrows():
ref = str(row["REF"]).upper()
alt = str(row["ALT"]).split(",")[0].upper()
if ref not in "ACGT" or alt not in "ACGT" or ref == alt:
continue
# Without reference, use simplified mutation type
mut_type = f"{ref}>{alt}"
spectrum[mut_type] += 1
return dict(spectrum)
spectrum = compute_mutation_spectrum(df_filtered)
print("\nMutation spectrum:")
for mut_type, count in sorted(spectrum.items()):
bar = "█" * (count // max(1, max(spectrum.values()) // 30))
print(f" {mut_type}: {count:5d} {bar}")
Visualizing the SBS Spectrum
def plot_sbs_spectrum(spectrum: dict, title: str = "Mutation Spectrum"):
"""Plot a simplified SBS spectrum bar chart."""
# Group by substitution type
sub_types = ["C>A", "C>G", "C>T", "T>A", "T>C", "T>G"]
colors = ["steelblue", "black", "red", "lightgray", "green", "pink"]
fig, ax = plt.subplots(figsize=(10, 5))
x_positions = range(len(sub_types))
counts = [spectrum.get(s, 0) for s in sub_types]
total = sum(counts)
fractions = [c / total if total > 0 else 0 for c in counts]
bars = ax.bar(x_positions, fractions, color=colors, edgecolor="white", linewidth=0.5)
ax.set_xticks(x_positions)
ax.set_xticklabels(sub_types, rotation=45, ha="right")
ax.set_ylabel("Fraction of mutations")
ax.set_title(title)
ax.set_ylim(0, 1)
for bar, count in zip(bars, counts):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
str(count), ha="center", va="bottom", fontsize=9)
plt.tight_layout()
plt.savefig("mutation_spectrum.png", dpi=150)
plt.show()
plot_sbs_spectrum(spectrum)
Querying gnomAD for Population Frequencies
def query_gnomad_v4(chrom: str, pos: int, ref: str, alt: str) -> dict:
"""Query gnomAD v4 GraphQL API for variant allele frequency."""
query = """
query VariantDetails($variantId: String!) {
variant(variantId: $variantId, dataset: gnomad_r4) {
variantId
exome {
ac
an
af
filters
}
genome {
ac
an
af
filters
}
}
}
"""
import requests
# gnomAD variant ID format: chrom-pos-ref-alt
variant_id = f"{chrom.replace('chr', '')}-{pos}-{ref}-{alt}"
response = requests.post(
"https://gnomad.broadinstitute.org/api",
json={"query": query, "variables": {"variantId": variant_id}},
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code != 200:
return {"error": f"HTTP {response.status_code}"}
data = response.json()
if "errors" in data:
return {"error": data["errors"]}
variant_data = data.get("data", {}).get("variant", {})
if not variant_data:
return {"ac": 0, "an": 0, "af": 0, "found": False}
exome = variant_data.get("exome") or {}
genome = variant_data.get("genome") or {}
return {
"found": True,
"exome_af": exome.get("af", 0),
"exome_ac": exome.get("ac", 0),
"genome_af": genome.get("af", 0),
"genome_ac": genome.get("ac", 0),
"combined_af": (exome.get("af", 0) or 0) + (genome.get("af", 0) or 0)
}
# Example: look up a known variant
tp53_variant = query_gnomad_v4("chr17", 7674220, "G", "T")
print(f"TP53 R248W in gnomAD: {tp53_variant}")
Complete Analysis Pipeline
def analyze_tumor_vcf(vcf_path: str, sample_name: str = "TUMOR") -> dict:
"""Full pipeline: load VCF → filter → compute statistics → check gnomAD."""
import time
print(f"Loading {vcf_path}...")
df = vcf_to_dataframe(vcf_path)
print(f" Loaded {len(df)} raw variants")
df_filt = filter_variants(df, sample_name)
print(f" After filtering: {len(df_filt)} variants")
stats = variant_statistics(df_filt)
spectrum = compute_mutation_spectrum(df_filt)
# Check gnomAD for top variants
print(" Querying gnomAD for top variants...")
gnomad_results = []
for _, row in df_filt.head(10).iterrows():
try:
result = query_gnomad_v4(row["CHROM"], int(row["POS"]), row["REF"], row["ALT"])
gnomad_results.append({
"variant": f"{row['CHROM']}:{row['POS']} {row['REF']}>{row['ALT']}",
"gnomad_af": result.get("combined_af", 0),
"in_gnomad": result.get("found", False)
})
time.sleep(0.3) # rate limiting
except Exception as e:
gnomad_results.append({"variant": f"{row['POS']}", "error": str(e)})
return {
"stats": stats,
"spectrum": spectrum,
"gnomad_check": gnomad_results
}
Creating a Synthetic VCF for Testing
def create_synthetic_vcf(output_path: str, n_variants: int = 100):
"""Create a synthetic VCF for testing pipelines."""
import random
random.seed(42)
header = [
"##fileformat=VCFv4.2",
"##reference=GRCh38",
"##FILTER=<ID=PASS,Description='All filters passed'>",
"##FORMAT=<ID=GT,Number=1,Type=String,Description='Genotype'>",
"##FORMAT=<ID=DP,Number=1,Type=Integer,Description='Total depth'>",
"##FORMAT=<ID=AD,Number=R,Type=Integer,Description='Allele depths'>",
"#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tTUMOR",
]
bases = list("ACGT")
chroms = [f"chr{i}" for i in range(1, 23)] + ["chrX"]
with open(output_path, "w") as f:
f.write("\n".join(header) + "\n")
for _ in range(n_variants):
chrom = random.choice(chroms)
pos = random.randint(1000000, 200000000)
ref = random.choice(bases)
alt = random.choice([b for b in bases if b != ref])
qual = random.randint(50, 200)
depth = random.randint(30, 200)
alt_count = random.randint(5, depth // 2)
ref_count = depth - alt_count
line = f"{chrom}\t{pos}\t.\t{ref}\t{alt}\t{qual}\tPASS\t.\tGT:DP:AD\t0/1:{depth}:{ref_count},{alt_count}"
f.write(line + "\n")
print(f"Created {output_path} with {n_variants} synthetic variants")
# Create test data
create_synthetic_vcf("test.vcf", 200)
df = vcf_to_dataframe("test.vcf")
df_filt = filter_variants(df)
stats = variant_statistics(df_filt)
print(f"\nTest VCF stats: {stats}")
Summary
| Task | Tool/Approach |
|---|---|
| Parse VCF | cyvcf2 (fast), PyVCF (pure Python), or manual parsing |
| Filter variants | FILTER=PASS, depth, VAF thresholds |
| Compute statistics | Ti/Tv ratio, variant type distribution |
| Annotate | VEP, ANNOVAR, SnpEff (command-line) |
| Query population freq | gnomAD GraphQL API or local tabix index |
| Mutational signatures | SigProfilerExtractor, MutationalPatterns (R), or custom |
For large-scale variant analysis (whole genome cohorts), use command-line tools: bcftools for manipulation, VEP or ANNOVAR for annotation, GATK or DeepVariant for variant calling. Python scripts wrap these tools and parse their outputs.