#!/usr/bin/env python
import os
import sys
import shutil
import tempfile
import polars as pl
from changeo.Gene import getGene
from pathlib import Path
from scanpy import logging as logg
from subprocess import run
from typing import Literal
from dandelion.polars.core._core import (
DandelionPolars,
load_polars,
write_airr,
)
from dandelion.utilities._utilities import (
set_germline_env,
set_igblast_env,
write_fasta,
NO_DS,
)
[docs]
def assigngenes_igblast(
fasta: Path | str,
igblast_db: Path | str | None = None,
org: Literal["human", "mouse"] = "human",
loci: Literal["ig", "tr"] = "ig",
additional_args: list[str] = [],
):
"""
Reannotate with IgBLASTn.
Parameters
----------
fasta : Path | str
path to fasta file for reannotation.
igblast_db : Path | str | None, optional
path to igblast database.
org : Literal["human", "mouse"], optional
organism for germline sequences.
loci : Literal["ig", "tr"], optional
`ig` or `tr` mode for running igblastn.
additional_args : list[str], optional
Additional arguments to pass to `AssignGenes.py`.
"""
env, igdb, fasta = set_igblast_env(igblast_db=igblast_db, input_file=fasta)
outfolder = fasta.parent / "tmp"
outfolder.mkdir(parents=True, exist_ok=True)
informat_dict = {"blast": "_igblast.fmt7", "airr": "_igblast.tsv"}
for fileformat in ["blast", "airr"]:
outfile = fasta.stem + informat_dict[fileformat]
cmd = [
"AssignGenes.py",
"igblast",
"-s",
str(fasta),
"-b",
str(igdb),
"--organism",
org,
"--loci",
loci,
"--format",
fileformat,
"-o",
str(outfolder / outfile),
]
cmd += additional_args
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd, env=env) # logs are printed to terminal
[docs]
def makedb_igblast(
fasta: Path | str,
igblast_output: Path | str | None = None,
germline: str | None = None,
org: Literal["human", "mouse"] = "human",
db: Literal["imgt", "ogrdb"] = "imgt",
extended: bool = True,
additional_args: list[str] = [],
loci: Literal["ig", "tr"] = "ig",
):
"""
Parse IgBLAST output to AIRR format.
Parameters
----------
fasta : Path | str
path to fasta file used for reannotation.
igblast_output : Path | str | None, optional
path to igblast output file.
germline : str | None, optional
path to germline database.
org : Literal["human", "mouse"], optional
organism of germline sequences.
db : Literal["imgt", "ogrdb"], optional
`imgt` or `ogrdb` reference database for running igblastn.
extended : bool, optional
whether or not to parse extended 10x annotations.
additional_args: list[str], optional
Additional arguments to pass to `MakeDb.py`.
"""
env, gml, fasta = set_germline_env(
germline=germline,
org=org,
input_file=fasta,
db=db,
)
if igblast_output is None:
indir = fasta.parent / "tmp"
infile = fasta.stem + "_igblast.fmt7"
igbo = indir / infile
else:
igbo = Path(igblast_output)
cellranger_annotation = fasta.parent / (fasta.stem + "_annotations.csv")
if (org == "mouse") and (loci.lower() == "tr"):
cmd = [
"MakeDb_gentle",
"igblast",
"-i",
str(igbo),
"-s",
str(fasta),
"-r",
str(gml),
"--10x",
str(cellranger_annotation),
]
else:
cmd = [
"MakeDb.py",
"igblast",
"-i",
str(igbo),
"-s",
str(fasta),
"-r",
str(gml),
"--10x",
str(cellranger_annotation),
]
if extended:
cmd = cmd + ["--extended"]
for add_cmd in [[], ["--failed"]]:
cmd = cmd + add_cmd + additional_args
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd, env=env) # logs are printed to terminal
[docs]
def parsedb_heavy(airr_file: Path | str):
"""
Parse AIRR tsv file (heavy chain contigs only).
Parameters
----------
airr_file : Path | str
path to AIRR tsv file.
"""
outname = Path(airr_file).stem + "_heavy"
cmd = [
"ParseDb.py",
"select",
"-d",
str(airr_file),
"-f",
"locus",
"-u",
"IGH",
"--logic",
"all",
"--regex",
"--outname",
outname,
]
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd) # logs are printed to terminal
[docs]
def parsedb_light(airr_file: Path | str):
"""
Parse AIRR tsv file (light chain contigs only).
Parameters
----------
airr_file : Path | str
path to AIRR tsv file.
"""
outname = Path(airr_file).stem + "_light"
cmd = [
"ParseDb.py",
"select",
"-d",
str(airr_file),
"-f",
"locus",
"-u",
"IG[LK]",
"--logic",
"all",
"--regex",
"--outname",
outname,
]
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd) # logs are printed to terminal
[docs]
def creategermlines(
airr_file: Path | str,
germline: list[str] | None = None,
org: Literal["human", "mouse"] = "human",
genotyped_fasta: str | None = None,
mode: Literal["heavy", "light"] | None = None,
db: Literal["imgt", "ogrdb"] = "imgt",
strain: (
Literal[
"c57bl6",
"balbc",
"129S1_SvImJ",
"AKR_J",
"A_J",
"BALB_c_ByJ",
"BALB_c",
"C3H_HeJ",
"C57BL_6J",
"C57BL_6",
"CAST_EiJ",
"CBA_J",
"DBA_1J",
"DBA_2J",
"LEWES_EiJ",
"MRL_MpJ",
"MSM_MsJ",
"NOD_ShiLtJ",
"NOR_LtJ",
"NZB_BlNJ",
"PWD_PhJ",
"SJL_J",
]
| None
) = None,
additional_args: list[str] = [],
):
"""
Wrapper for CreateGermlines.py for reconstructing germline sequences.
Parameters
----------
airr_file : Path | str
path to AIRR tsv file.
germline : list[str] | None, optional
location to germline fasta files as a list.
org : Literal["human", "mouse"], optional
organism for germline sequences.
genotyped_fasta : str | None, optional
location to V genotyped fasta file.
mode : Literal["heavy", "light"] | None, optional
whether to run on heavy or light mode. If left as None, heavy and
light will be run together.
db : Literal["imgt", "ogrdb"], optional
`imgt` or `ogrdb` reference database.
strain : Literal["c57bl6", "balbc", "129S1_SvImJ", "AKR_J", "A_J", "BALB_c_ByJ", "BALB_c", "C3H_HeJ", "C57BL_6J", "C57BL_6", "CAST_EiJ", "CBA_J", "DBA_1J", "DBA_2J", "LEWES_EiJ", "MRL_MpJ", "MSM_MsJ", "NOD_ShiLtJ", "NOR_LtJ", "NZB_BlNJ", "PWD_PhJ", "SJL_J"] | None, optional
strain of mouse to use for germline sequences. Only for `db="ogrdb"`. Note that only "c57bl6", "balbc", "CAST_EiJ", "LEWES_EiJ", "MSM_MsJ", "NOD_ShiLt_J" and "PWD_PhJ" contains both heavy chain and light chain germline sequences as a set.
The rest will not allow igblastn and MakeDB.py to generate a successful airr table (check the failed file). "c57bl6" and "balbc" are merged databases of "C57BL_6" with "C57BL_6J" and "BALB_c" with "BALB_c_ByJ" respectively. None defaults to all combined.
additional_args : list[str], optional
Additional arguments to pass to `CreateGermlines.py`.
"""
env, gml, airr_file = set_germline_env(
germline=germline,
org=org,
input_file=airr_file,
db=db,
)
_strain = "_" + strain if strain is not None else ""
if germline is None:
if mode == "heavy":
if genotyped_fasta is None:
gml_ref = [
str(gml / ("imgt_" + org + _strain + "_IGHV.fasta")),
]
else:
gml_ref = [
str(genotyped_fasta),
]
gml_ref += [
str(gml / ("imgt_" + org + _strain + "_IGHJ.fasta")),
]
_strainD = "" if strain not in NO_DS else _strain
gml_ref += [
str(gml / ("imgt_" + org + _strainD + "_IGHD.fasta")),
]
elif mode == "light":
gml_ref = [
str(gml / ("imgt_" + org + _strain + "_IGKV.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGKJ.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGLV.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGLJ.fasta")),
]
elif mode is None:
if genotyped_fasta is None:
gml_ref = [
str(gml / ("imgt_" + org + _strain + "_IGHV.fasta")),
]
else:
gml_ref = [
str(genotyped_fasta),
]
gml_ref += [
str(gml / ("imgt_" + org + _strain + "_IGHJ.fasta")),
]
_strainD = "" if strain not in NO_DS else _strain
gml_ref += [
str(gml / ("imgt_" + org + _strainD + "_IGHD.fasta")),
]
gml_ref += [
str(gml / ("imgt_" + org + _strain + "_IGKV.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGKJ.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGLV.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGLJ.fasta")),
]
else:
if not isinstance(germline, list):
germline = [str(germline)]
gml_ref = germline
cmd = [
"CreateGermlines.py",
"-d",
str(airr_file),
"-r",
]
cmd = cmd + gml_ref + additional_args
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd, env=env) # logs are printed to terminal
[docs]
def define_clones(
vdj: DandelionPolars,
dist: float,
action: Literal["first", "set"] = "set",
model: Literal[
"ham",
"aa",
"hh_s1f",
"hh_s5f",
"mk_rs1nf",
"mk_rs5nf",
"hs1f_compat",
"m1n_compat",
] = "ham",
norm: Literal["len", "mut", "none"] = "len",
doublets: Literal["drop", "count"] = "drop",
fileformat: Literal["changeo", "airr"] = "airr",
n_cpus: int | None = None,
outFilePrefix: str | None = None,
key_added: str | None = None,
out_dir: Path | str | None = None,
additional_args: list[str] = [],
) -> DandelionPolars:
"""
Find clones using changeo's `DefineClones.py <https://changeo.readthedocs.io/en/stable/tools/DefineClones.html>`__.
Only callable for BCR data at the moment.
Parameters
----------
vdj : DandelionPolars
DandelionPolars object.
dist : float
The distance threshold for clonal grouping.
action : Literal["first", "set"], optional
Specifies how to handle multiple V(D)J assignments for initial grouping. Default is 'set'.
The "first" action will use only the first gene listed. The "set" action will use all gene assignments and
construct a larger gene grouping composed of any sequences sharing an assignment or linked to another sequence
by a common assignment (similar to single-linkage).
model : Literal["ham", "aa", "hh_s1f", "hh_s5f", "mk_rs1nf", "mk_rs5nf", "hs1f_compat", "m1n_compat", ], optional
Specifies which substitution model to use for calculating distance between sequences. Default is 'ham'.
The "ham" model is nucleotide Hamming distance and "aa" is amino acid Hamming distance. The "hh_s1f" and
"hh_s5f" models are human specific single nucleotide and 5-mer content models, respectively, from Yaari et al,
2013. The "mk_rs1nf" and "mk_rs5nf" models are mouse specific single nucleotide and 5-mer content models,
respectively, from Cui et al, 2016. The "m1n_compat" and "hs1f_compat" models are deprecated models provided
backwards compatibility with the "m1n" and "hs1f" models in Change-O v0.3.3 and SHazaM v0.1.4. Both 5-mer
models should be considered experimental.
norm : Literal["len", "mut", "none"], optional
Specifies how to normalize distances. Default is 'len'. 'none' (do not normalize), 'len' (normalize by length),
or 'mut' (normalize by number of mutations between sequences).
doublets : Literal["drop", "count"], optional
Option to control behaviour when dealing with heavy chain 'doublets'. Default is 'drop'. 'drop' will filter out
the doublets while 'count' will retain only the highest umi count contig.
fileformat : Literal["changeo", "airr"], optional
Format of V(D)J file/objects. Default is 'airr'. Also accepts 'changeo'.
n_cpus : int | None, optional
Number of cpus for parallelization. Default is 1, no parallelization.
outFilePrefix : str | None, optional
If specified, the out file name will have this prefix. `None` defaults to 'dandelion_define_clones'
key_added : str | None, optional
Column name to add for define_clones.
out_dir : Path | str | None, optional
If specified, the files will be written to this directory.
additional_args : list[str], optional
Additional arguments to pass to `DefineClones.py`.
Returns
-------
DandelionPolars
DandelionPolars object with clone_id annotated in `.data` slot and `.metadata` initialized.
"""
start = logg.info("Finding clones")
if n_cpus is None:
nproc = 1
else:
nproc = n_cpus
clone_key = key_added if key_added is not None else "clone_id"
# Load data as Polars
dat_ = load_polars(vdj._data)
# Ensure we have eager DataFrame for operations
if isinstance(dat_, pl.LazyFrame):
dat_ = dat_.collect(engine="streaming")
# Filter ambiguous sequences
if "ambiguous" in dat_.columns:
dat = dat_.filter(pl.col("ambiguous") == "F")
else:
dat = dat_
# Split into heavy and light chains
dat_h = dat.filter(pl.col("locus") == "IGH")
dat_l = dat.filter(pl.col("locus").is_in(["IGK", "IGL"]))
# Setup output directories
if os.path.isfile(str(vdj)):
vdj_path = Path(vdj)
tmpFolder = vdj_path.parent / "tmp"
outFolder = vdj_path.parent
elif out_dir is not None:
vdj_path = Path(out_dir)
tmpFolder = vdj_path / "tmp"
outFolder = vdj_path
else:
outFolder = Path(tempfile.TemporaryDirectory().name)
tmpFolder = outFolder / "tmp"
for folder in [outFolder, tmpFolder]:
folder.mkdir(parents=True, exist_ok=True)
# Setup file paths
if "vdj_path" in locals():
h_file1 = tmpFolder / (vdj_path.stem + "_heavy-clone.tsv")
h_file2 = outFolder / (vdj_path.stem + "_heavy-clone.tsv")
l_file = tmpFolder / (vdj_path.stem + "_light.tsv")
outfile = outFolder / (vdj_path.stem + "_clone.tsv")
else:
out_FilePrefix = (
"dandelion_define_clones"
if outFilePrefix is None
else outFilePrefix
)
h_file1 = tmpFolder / (out_FilePrefix + "_heavy-clone.tsv")
h_file2 = outFolder / (out_FilePrefix + "_heavy-clone.tsv")
l_file = tmpFolder / (out_FilePrefix + "_light.tsv")
outfile = outFolder / (out_FilePrefix + "_clone.tsv")
# Write files
write_airr(dat_h, h_file1)
write_airr(dat_l, l_file)
# Determine v_call field
v_field = (
"v_call_genotyped" if "v_call_genotyped" in dat.columns else "v_call"
)
# Build DefineClones.py command
cmd = [
"DefineClones.py",
"-d",
str(h_file1),
"-o",
str(h_file2),
"--act",
action,
"--model",
model,
"--norm",
norm,
"--dist",
str(dist),
"--nproc",
str(nproc),
"--vf",
v_field,
]
cmd = cmd + additional_args
def clusterLinkage(cell_series, group_series):
"""
Return a dictionary of {cell_id : cluster_id}.
that identifies clusters of cells by analyzing their shared
features (group_series) using single linkage.
Arguments:
cell_series (iter): iter of cell ids.
group_series (iter): iter of group ids.
Returns:
dict: dictionary of {cell_id : cluster_id}.
"""
# assign initial clusters
# initial_dict = {cluster1: [cell1], cluster2: [cell1]}
initial_dict = {}
for cell, group in zip(cell_series, group_series):
try:
initial_dict[group].append(cell)
except KeyError:
initial_dict[group] = [cell]
# naive single linkage clustering (ON^2 best case, ON^3 worst case) ...ie for cells with multiple light chains
# cluster_dict = {cluster1: [cell1, cell2]}, 2 cells belong in same group if they share 1 light chain
while True:
cluster_dict = {}
for i, group in enumerate(initial_dict.keys()):
cluster_dict[i] = initial_dict[group]
for cluster in cluster_dict:
# if initial_dict[group] and cluster_dict[cluster] share common cells, add initial_dict[group] to
# cluster
if cluster != i and any(
cell in initial_dict[group]
for cell in cluster_dict[cluster]
):
cluster_dict[cluster] = (
cluster_dict[cluster] + initial_dict[group]
)
del cluster_dict[i]
break
# break if clusters stop changing, otherwise restart
if len(cluster_dict.keys()) == len(initial_dict.keys()):
break
else:
initial_dict = cluster_dict.copy()
# invert cluster_dict for return
assign_dict = {
cell: k for k, v in cluster_dict.items() for cell in set(v)
}
return assign_dict
def _lightCluster(heavy_file, light_file, out_file, doublets, fileformat):
"""
Split heavy chain clones based on light chains using Polars.
Arguments:
heavy_file (str): heavy chain input file.
light_file (str): light chain input file.
out_file (str): heavy chain output file.
doublets (str): method for handling multiple heavy chains per cell. one of 'drop' or 'count'.
fileformat (str): file format. one of 'changeo' or 'airr'.
"""
# Set column names
if fileformat == "changeo":
cell_id = "cell_id"
clone_id = "clone_id"
v_call = "v_call"
j_call = "j_call"
junction_length = "junction_length"
umi_count = "umicount"
elif fileformat == "airr":
cell_id = "cell_id"
clone_id = "clone_id"
v_call = "v_call"
j_call = "j_call"
junction_length = "junction_length"
umi_count = "umi_count"
else:
sys.exit("Invalid format %s" % fileformat)
# Read in heavy and light DataFrames using Polars
heavy_df = pl.read_csv(
heavy_file,
separator="\t",
null_values=["", "None", "NA"],
infer_schema_length=10000,
)
light_df = pl.read_csv(
light_file,
separator="\t",
null_values=["", "None", "NA"],
infer_schema_length=10000,
)
# Column checking
expected_heavy_columns = [
cell_id,
clone_id,
v_call,
j_call,
junction_length,
umi_count,
]
if not set(expected_heavy_columns).issubset(heavy_df.columns):
raise ValueError(
"Missing one or more columns in heavy chain file: "
+ ", ".join(expected_heavy_columns)
)
expected_light_columns = [
cell_id,
v_call,
j_call,
junction_length,
umi_count,
]
if not set(expected_light_columns).issubset(light_df.columns):
raise ValueError(
"Missing one or more columns in light chain file: "
+ ", ".join(expected_light_columns)
)
# Fix types for junction_length
try:
heavy_df = heavy_df.with_columns(
pl.col(junction_length).cast(pl.Int64)
)
light_df = light_df.with_columns(
pl.col(junction_length).cast(pl.Int64)
)
except Exception:
# Already nullable integer type
pass
# Filter multiple heavy chains
if doublets == "drop":
# Keep only cells with exactly one heavy chain
cell_counts = heavy_df.group_by(cell_id).len()
single_cells = cell_counts.filter(pl.col("len") == 1)[cell_id]
heavy_df = heavy_df.filter(pl.col(cell_id).is_in(single_cells))
if len(heavy_df) == 0:
raise ValueError(
"Empty heavy chain data, after doublets drop. Are you combining experiments "
"in a single file? If so, split your data into multiple files."
)
elif doublets == "count":
# Keep only the highest UMI count contig per cell
heavy_df = heavy_df.with_columns(pl.col(umi_count).cast(pl.Int64))
heavy_df = (
heavy_df.sort(umi_count, descending=True)
.group_by(cell_id, maintain_order=True)
.first()
)
# Transfer clone IDs from heavy chain df to light chain df
# Ensure clone_id is string type for consistent handling
heavy_df = heavy_df.with_columns(pl.col(clone_id).cast(pl.String))
clone_dict = dict(
zip(
heavy_df[cell_id].to_list(),
heavy_df[clone_id].to_list(),
)
)
light_df = light_df.filter(
pl.col(cell_id).is_in(list(clone_dict.keys()))
)
light_df = light_df.with_columns(
pl.col(cell_id)
.map_elements(lambda x: clone_dict.get(x), return_dtype=pl.String)
.alias(clone_id)
)
# Generate a "cluster_dict" of CELL:CLONE dictionary from light df
# Build grouping key from v_call, j_call, junction_length, and clone_id
light_grouping = light_df.with_columns(
(
pl.col(v_call).map_elements(
lambda x: getGene(x) if x else "", return_dtype=pl.String
)
+ pl.lit(",")
+ pl.col(j_call).map_elements(
lambda x: getGene(x) if x else "", return_dtype=pl.String
)
+ pl.lit(",")
+ pl.col(junction_length).cast(pl.String)
+ pl.lit(",")
+ pl.col(clone_id)
).alias("grouping_key")
)
cluster_dict = clusterLinkage(
light_grouping[cell_id].to_list(),
light_grouping["grouping_key"].to_list(),
)
# Add assignments to heavy_df
heavy_df = heavy_df.filter(
pl.col(cell_id).is_in(list(cluster_dict.keys()))
)
heavy_df = heavy_df.with_columns(
(
pl.col(clone_id)
+ pl.lit("_")
+ pl.col(cell_id).map_elements(
lambda x: str(cluster_dict.get(x, "")),
return_dtype=pl.String,
)
).alias(clone_id)
)
# Write heavy chains
write_airr(heavy_df, out_file)
return (heavy_df, light_df)
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd)
h_df, l_df = _lightCluster(
h_file2, l_file, outfile, doublets=doublets, fileformat=fileformat
)
h_df = load_polars(h_df)
if isinstance(h_df, pl.LazyFrame):
h_df = h_df.collect(engine="streaming")
# Create a dictionary for cell_id : clone_id from h_df
linked_clones = dict(
zip(h_df["cell_id"].to_list(), h_df["clone_id"].to_list())
)
# Create a clone_reference
clone_ref = list(set(h_df["clone_id"].to_list()))
clone_ref = [
c.split("_")[1] if c and str(c) != "nan" else c for c in clone_ref
]
l_df = load_polars(l_df)
if isinstance(l_df, pl.LazyFrame):
l_df = l_df.collect(engine="streaming")
# Update light chain clone_ids based on linkage
def update_light_clone(cell_id, clone_id):
if clone_id in clone_ref:
return linked_clones.get(cell_id, cell_id + "_notlinked")
else:
return cell_id + "_notlinked"
l_df = l_df.with_columns(
pl.struct(["cell_id", "clone_id"])
.map_elements(
lambda row: update_light_clone(row["cell_id"], row["clone_id"]),
return_dtype=pl.String,
)
.alias("clone_id")
)
# Concatenate heavy and light chains
cloned_ = pl.concat([h_df, l_df], how="diagonal_relaxed")
# Transfer the new clone_id to the original data
clone_mapping = dict(
zip(cloned_["sequence_id"].to_list(), cloned_["clone_id"].to_list())
)
dat_ = dat_.with_columns(
pl.col("sequence_id")
.map_elements(
lambda x: clone_mapping.get(x, ""),
return_dtype=pl.String,
)
.alias(str(clone_key))
)
# Ensure clone_key has empty strings instead of nulls
dat_ = dat_.with_columns(pl.col(str(clone_key)).fill_null(""))
if isinstance(vdj, DandelionPolars):
if vdj._lazy:
vdj._data = dat_.lazy()
else:
vdj._data = dat_
vdj.update_metadata(clone_key=str(clone_key))
logg.info(
" finished",
time=start,
deep=(
"Updated DandelionPolars object: \n"
" 'data', contig-indexed AIRR table\n"
" 'metadata', cell-indexed observations table\n"
),
)
return vdj
else:
out = DandelionPolars(
data=dat_,
verbose=False,
)
out.update_metadata(clone_key=str(clone_key))
logg.info(
" finished",
time=start,
deep=(
"Returning DandelionPolars object: \n"
" 'data', contig-indexed AIRR table\n"
" 'metadata', cell-indexed observations table\n"
),
)
return out
def create_germlines(
vdj: DandelionPolars | pl.DataFrame | pl.LazyFrame | str,
germline: str | None = None,
org: Literal["human", "mouse"] = "human",
db: Literal["imgt", "ogrdb"] = "imgt",
strain: (
Literal[
"c57bl6",
"balbc",
"129S1_SvImJ",
"AKR_J",
"A_J",
"BALB_c_ByJ",
"BALB_c",
"C3H_HeJ",
"C57BL_6J",
"C57BL_6",
"CAST_EiJ",
"CBA_J",
"DBA_1J",
"DBA_2J",
"LEWES_EiJ",
"MRL_MpJ",
"MSM_MsJ",
"NOD_ShiLtJ",
"NOR_LtJ",
"NZB_BlNJ",
"PWD_PhJ",
"SJL_J",
]
| None
) = None,
genotyped_fasta: str | None = None,
additional_args: list[str] = [],
save: str | None = None,
) -> DandelionPolars:
"""
Run CreateGermlines.py to reconstruct the germline V(D)J sequence.
Parameters
----------
vdj : DandelionPolars | pl.DataFrame | pl.LazyFrame | str
DandelionPolars object, polars DataFrame/LazyFrame in changeo/airr format, or file path to changeo/airr
file after clones have been determined.
germline : str | None, optional
path to germline database folder. `None` defaults to environmental variable.
org : Literal["human", "mouse"], optional
organism of germline database.
db : Literal["imgt", "ogrdb"], optional
`imgt` or `ogrdb` reference database.
strain : Literal["c57bl6", "balbc", "129S1_SvImJ", "AKR_J", "A_J", "BALB_c_ByJ", "BALB_c", "C3H_HeJ", "C57BL_6J", "C57BL_6", "CAST_EiJ", "CBA_J", "DBA_1J", "DBA_2J", "LEWES_EiJ", "MRL_MpJ", "MSM_MsJ", "NOD_ShiLtJ", "NOR_LtJ", "NZB_BlNJ", "PWD_PhJ", "SJL_J"] | None, optional
strain of mouse to use for germline sequences. Only for `db="ogrdb"`. Note that only "c57bl6", "balbc", "CAST_EiJ", "LEWES_EiJ", "MSM_MsJ", "NOD_ShiLt_J" and "PWD_PhJ" contains both heavy chain and light chain germline sequences as a set.
The rest will not allow igblastn and MakeDB.py to generate a successful airr table (check the failed file). "c57bl6" and "balbc" are merged databases of "C57BL_6" with "C57BL_6J" and "BALB_c" with "BALB_c_ByJ" respectively. None defaults to all combined.
genotyped_fasta : str | None, optional
location to corrected v genotyped fasta file.
additional_args : list[str], optional
additional arguments to pass to `CreateGermlines.py.`
save : str | None, optional
if provided, saves to specified file path.
Returns
-------
DandelionPolars
DandelionPolars object with `.germlines` slot populated.
"""
start = logg.info("Reconstructing germline sequences")
if not isinstance(vdj, DandelionPolars):
tmpfile = (
Path(vdj)
if os.path.isfile(vdj)
else Path(tempfile.TemporaryDirectory().name) / "tmp.tsv"
)
if isinstance(vdj, (pl.DataFrame, pl.LazyFrame)):
write_airr(data=vdj, save=tmpfile)
creategermlines(
airr_file=tmpfile,
germline=germline,
org=org,
genotyped_fasta=genotyped_fasta,
db=db,
strain=strain,
additional_args=additional_args,
)
else:
tmppath = Path(tempfile.TemporaryDirectory().name)
tmppath.mkdir(parents=True, exist_ok=True)
tmpfile = tmppath / "tmp.tsv"
vdj.write_airr(filename=tmpfile)
if len(vdj.germline) > 0:
tmpgmlfile = tmppath / "germ.fasta"
write_fasta(fasta_dict=vdj.germline, out_fasta=tmpgmlfile)
creategermlines(
airr_file=tmpfile,
germline=tmpgmlfile,
org=org,
db=db,
strain=strain,
additional_args=additional_args,
)
else:
creategermlines(
airr_file=tmpfile,
germline=germline,
org=org,
genotyped_fasta=genotyped_fasta,
db=db,
strain=strain,
additional_args=additional_args,
)
# return as DandelionPolars object
germpass_outfile = tmpfile.parent / (tmpfile.stem + "_germ-pass.tsv")
if isinstance(vdj, DandelionPolars):
vdj._reinitialize_attributes(
data=load_polars(germpass_outfile),
metadata=vdj._metadata,
germline=vdj.germline,
layout=vdj.layout,
graph=vdj.graph,
)
out_vdj = vdj.copy()
else:
out_vdj = DandelionPolars(germpass_outfile, verbose=False)
out_vdj.store_germline_reference(
corrected=genotyped_fasta, germline=germline, org=org
)
if save is not None:
shutil.move(germpass_outfile, save)
logg.info(
" finished",
time=start,
deep=("Returning DandelionPolars object: \n"),
)
return out_vdj