import os
import sys
import shutil
import tempfile
import numpy as np
import pandas as pd
from pathlib import Path
from scanpy import logging as logg
from subprocess import run
from typing import Literal
from changeo.Gene import getGene
from dandelion.base.core._core import Dandelion, load_data
from dandelion.base.io._io import write_airr
from dandelion.utilities._utilities import (
set_germline_env,
set_igblast_env,
write_fasta,
NO_DS,
)
[docs]
def assigngenes_igblast(
fasta: Path | str,
igblast_db: Path | str | None = None,
org: Literal["human", "mouse"] = "human",
loci: Literal["ig", "tr"] = "ig",
additional_args: list[str] = [],
):
"""
Reannotate with IgBLASTn.
Parameters
----------
fasta : Path | str
path to fasta file for reannotation.
igblast_db : Path | str | None, optional
path to igblast database.
org : Literal["human", "mouse"], optional
organism for germline sequences.
loci : Literal["ig", "tr"], optional
`ig` or `tr` mode for running igblastn.
additional_args : list[str], optional
Additional arguments to pass to `AssignGenes.py`.
"""
env, igdb, fasta = set_igblast_env(igblast_db=igblast_db, input_file=fasta)
outfolder = fasta.parent / "tmp"
outfolder.mkdir(parents=True, exist_ok=True)
informat_dict = {"blast": "_igblast.fmt7", "airr": "_igblast.tsv"}
for fileformat in ["blast", "airr"]:
outfile = fasta.stem + informat_dict[fileformat]
cmd = [
"AssignGenes.py",
"igblast",
"-s",
str(fasta),
"-b",
str(igdb),
"--organism",
org,
"--loci",
loci,
"--format",
fileformat,
"-o",
str(outfolder / outfile),
]
cmd += additional_args
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd, env=env) # logs are printed to terminal
[docs]
def makedb_igblast(
fasta: Path | str,
igblast_output: Path | str | None = None,
germline: str | None = None,
org: Literal["human", "mouse"] = "human",
db: Literal["imgt", "ogrdb"] = "imgt",
extended: bool = True,
additional_args: list[str] = [],
loci: Literal["ig", "tr"] = "ig",
):
"""
Parse IgBLAST output to AIRR format.
Parameters
----------
fasta : Path | str
path to fasta file used for reannotation.
igblast_output : Path | str | None, optional
path to igblast output file.
germline : str | None, optional
path to germline database.
org : Literal["human", "mouse"], optional
organism of germline sequences.
db : Literal["imgt", "ogrdb"], optional
`imgt` or `ogrdb` reference database for running igblastn.
extended : bool, optional
whether or not to parse extended 10x annotations.
additional_args: list[str], optional
Additional arguments to pass to `MakeDb.py`.
"""
env, gml, fasta = set_germline_env(
germline=germline,
org=org,
input_file=fasta,
db=db,
)
if igblast_output is None:
indir = fasta.parent / "tmp"
infile = fasta.stem + "_igblast.fmt7"
igbo = indir / infile
else:
igbo = Path(igblast_output)
cellranger_annotation = fasta.parent / (fasta.stem + "_annotations.csv")
if (org == "mouse") and (loci.lower() == "tr"):
cmd = [
"MakeDb_gentle",
"igblast",
"-i",
str(igbo),
"-s",
str(fasta),
"-r",
str(gml),
"--10x",
str(cellranger_annotation),
]
else:
cmd = [
"MakeDb.py",
"igblast",
"-i",
str(igbo),
"-s",
str(fasta),
"-r",
str(gml),
"--10x",
str(cellranger_annotation),
]
if extended:
cmd = cmd + ["--extended"]
for add_cmd in [[], ["--failed"]]:
cmd = cmd + add_cmd + additional_args
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd, env=env) # logs are printed to terminal
[docs]
def parsedb_heavy(airr_file: Path | str):
"""
Parse AIRR tsv file (heavy chain contigs only).
Parameters
----------
airr_file : Path | str
path to AIRR tsv file.
"""
outname = Path(airr_file).stem + "_heavy"
cmd = [
"ParseDb.py",
"select",
"-d",
str(airr_file),
"-f",
"locus",
"-u",
"IGH",
"--logic",
"all",
"--regex",
"--outname",
outname,
]
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd) # logs are printed to terminal
[docs]
def parsedb_light(airr_file: Path | str):
"""
Parse AIRR tsv file (light chain contigs only).
Parameters
----------
airr_file : Path | str
path to AIRR tsv file.
"""
outname = Path(airr_file).stem + "_light"
cmd = [
"ParseDb.py",
"select",
"-d",
str(airr_file),
"-f",
"locus",
"-u",
"IG[LK]",
"--logic",
"all",
"--regex",
"--outname",
outname,
]
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd) # logs are printed to terminal
[docs]
def creategermlines(
airr_file: Path | str,
germline: list[str] | None = None,
org: Literal["human", "mouse"] = "human",
genotyped_fasta: str | None = None,
mode: Literal["heavy", "light"] | None = None,
db: Literal["imgt", "ogrdb"] = "imgt",
strain: (
Literal[
"c57bl6",
"balbc",
"129S1_SvImJ",
"AKR_J",
"A_J",
"BALB_c_ByJ",
"BALB_c",
"C3H_HeJ",
"C57BL_6J",
"C57BL_6",
"CAST_EiJ",
"CBA_J",
"DBA_1J",
"DBA_2J",
"LEWES_EiJ",
"MRL_MpJ",
"MSM_MsJ",
"NOD_ShiLtJ",
"NOR_LtJ",
"NZB_BlNJ",
"PWD_PhJ",
"SJL_J",
]
| None
) = None,
additional_args: list[str] = [],
):
"""
Wrapper for CreateGermlines.py for reconstructing germline sequences.
Parameters
----------
airr_file : Path | str
path to AIRR tsv file.
germline : list[str] | None, optional
location to germline fasta files as a list.
org : Literal["human", "mouse"], optional
organism for germline sequences.
genotyped_fasta : str | None, optional
location to V genotyped fasta file.
mode : Literal["heavy", "light"] | None, optional
whether to run on heavy or light mode. If left as None, heavy and
light will be run together.
db : Literal["imgt", "ogrdb"], optional
`imgt` or `ogrdb` reference database.
strain : Literal["c57bl6", "balbc", "129S1_SvImJ", "AKR_J", "A_J", "BALB_c_ByJ", "BALB_c", "C3H_HeJ", "C57BL_6J", "C57BL_6", "CAST_EiJ", "CBA_J", "DBA_1J", "DBA_2J", "LEWES_EiJ", "MRL_MpJ", "MSM_MsJ", "NOD_ShiLtJ", "NOR_LtJ", "NZB_BlNJ", "PWD_PhJ", "SJL_J"] | None, optional
strain of mouse to use for germline sequences. Only for `db="ogrdb"`. Note that only "c57bl6", "balbc", "CAST_EiJ", "LEWES_EiJ", "MSM_MsJ", "NOD_ShiLt_J" and "PWD_PhJ" contains both heavy chain and light chain germline sequences as a set.
The rest will not allow igblastn and MakeDB.py to generate a successful airr table (check the failed file). "c57bl6" and "balbc" are merged databases of "C57BL_6" with "C57BL_6J" and "BALB_c" with "BALB_c_ByJ" respectively. None defaults to all combined.
additional_args : list[str], optional
Additional arguments to pass to `CreateGermlines.py`.
"""
env, gml, airr_file = set_germline_env(
germline=germline,
org=org,
input_file=airr_file,
db=db,
)
_strain = "_" + strain if strain is not None else ""
if germline is None:
if mode == "heavy":
if genotyped_fasta is None:
gml_ref = [
str(gml / ("imgt_" + org + _strain + "_IGHV.fasta")),
]
else:
gml_ref = [
str(genotyped_fasta),
]
gml_ref += [
str(gml / ("imgt_" + org + _strain + "_IGHJ.fasta")),
]
_strainD = "" if strain not in NO_DS else _strain
gml_ref += [
str(gml / ("imgt_" + org + _strainD + "_IGHD.fasta")),
]
elif mode == "light":
gml_ref = [
str(gml / ("imgt_" + org + _strain + "_IGKV.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGKJ.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGLV.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGLJ.fasta")),
]
elif mode is None:
if genotyped_fasta is None:
gml_ref = [
str(gml / ("imgt_" + org + _strain + "_IGHV.fasta")),
]
else:
gml_ref = [
str(genotyped_fasta),
]
gml_ref += [
str(gml / ("imgt_" + org + _strain + "_IGHJ.fasta")),
]
_strainD = "" if strain not in NO_DS else _strain
gml_ref += [
str(gml / ("imgt_" + org + _strainD + "_IGHD.fasta")),
]
gml_ref += [
str(gml / ("imgt_" + org + _strain + "_IGKV.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGKJ.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGLV.fasta")),
str(gml / ("imgt_" + org + _strain + "_IGLJ.fasta")),
]
else:
if not isinstance(germline, list):
germline = [str(germline)]
gml_ref = germline
cmd = [
"CreateGermlines.py",
"-d",
str(airr_file),
"-r",
]
cmd = cmd + gml_ref + additional_args
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd, env=env) # logs are printed to terminal
[docs]
def define_clones(
vdj: Dandelion | pd.DataFrame | str,
dist: float,
action: Literal["first", "set"] = "set",
model: Literal[
"ham",
"aa",
"hh_s1f",
"hh_s5f",
"mk_rs1nf",
"mk_rs5nf",
"hs1f_compat",
"m1n_compat",
] = "ham",
norm: Literal["len", "mut", "none"] = "len",
doublets: Literal["drop", "count"] = "drop",
fileformat: Literal["changeo", "airr"] = "airr",
n_cpus: int | None = None,
outFilePrefix: int | None = None,
key_added: int | None = None,
out_dir: Path | str | None = None,
additional_args: list[str] = [],
) -> Dandelion:
"""
Find clones using changeo's `DefineClones.py <https://changeo.readthedocs.io/en/stable/tools/DefineClones.html>`__.
Only callable for BCR data at the moment.
Parameters
----------
vdj : Dandelion | pd.DataFrame | str
Dandelion object, pandas DataFrame in changeo/airr format, or file path to changeo/airr file after
clones have been determined.
dist : float
The distance threshold for clonal grouping.
action : Literal["first", "set"], optional
Specifies how to handle multiple V(D)J assignments for initial grouping. Default is 'set'.
The "first" action will use only the first gene listed. The "set" action will use all gene assignments and
construct a larger gene grouping composed of any sequences sharing an assignment or linked to another sequence
by a common assignment (similar to single-linkage).
model : Literal["ham", "aa", "hh_s1f", "hh_s5f", "mk_rs1nf", "mk_rs5nf", "hs1f_compat", "m1n_compat", ], optional
Specifies which substitution model to use for calculating distance between sequences. Default is 'ham'.
The "ham" model is nucleotide Hamming distance and "aa" is amino acid Hamming distance. The "hh_s1f" and
"hh_s5f" models are human specific single nucleotide and 5-mer content models, respectively, from Yaari et al,
2013. The "mk_rs1nf" and "mk_rs5nf" models are mouse specific single nucleotide and 5-mer content models,
respectively, from Cui et al, 2016. The "m1n_compat" and "hs1f_compat" models are deprecated models provided
backwards compatibility with the "m1n" and "hs1f" models in Change-O v0.3.3 and SHazaM v0.1.4. Both 5-mer
models should be considered experimental.
norm : Literal["len", "mut", "none"], optional
Specifies how to normalize distances. Default is 'len'. 'none' (do not normalize), 'len' (normalize by length),
or 'mut' (normalize by number of mutations between sequences).
doublets : Literal["drop", "count"], optional
Option to control behaviour when dealing with heavy chain 'doublets'. Default is 'drop'. 'drop' will filter out
the doublets while 'count' will retain only the highest umi count contig.
fileformat : Literal["changeo", "airr"], optional
Format of V(D)J file/objects. Default is 'airr'. Also accepts 'changeo'.
n_cpus : int | None, optional
Number of cpus for parallelization. Default is 1, no parallelization.
outFilePrefix : int | None, optional
If specified, the out file name will have this prefix. `None` defaults to 'dandelion_define_clones'
key_added : int | None, optional
Column name to add for define_clones.
out_dir : Path | str | None, optional
If specified, the files will be written to this directory.
additional_args : list[str], optional
Additional arguments to pass to `DefineClones.py`.
Returns
-------
Dandelion
Dandelion object with clone_id annotated in `.data` slot and `.metadata` initialized.
"""
start = logg.info("Finding clones")
if n_cpus is None:
nproc = 1
else:
nproc = n_cpus
clone_key = key_added if key_added is not None else "clone_id"
if isinstance(vdj, Dandelion):
dat_ = load_data(vdj._data)
else:
dat_ = load_data(vdj)
if "ambiguous" in dat_:
dat = dat_[dat_["ambiguous"] == "F"].copy()
else:
dat = dat_.copy()
dat_h = dat[dat["locus"] == "IGH"]
dat_l = dat[dat["locus"].isin(["IGK", "IGL"])]
if os.path.isfile(str(vdj)):
vdj_path = Path(vdj)
tmpFolder = vdj_path.parent / "tmp"
outFolder = vdj_path.parent
elif out_dir is not None:
vdj_path = Path(out_dir)
tmpFolder = vdj_path / "tmp"
outFolder = vdj_path
else:
import tempfile
outFolder = Path(tempfile.TemporaryDirectory().name)
tmpFolder = outFolder / "tmp"
for _ in [outFolder, tmpFolder]:
_.mkdir(parents=True, exist_ok=True)
if "vdj_path" in locals():
h_file1 = tmpFolder / (vdj_path.stem + "_heavy-clone.tsv")
h_file2 = outFolder / (vdj_path.stem + "_heavy-clone.tsv")
l_file = tmpFolder / (vdj_path.stem + "_light.tsv")
outfile = outFolder / (vdj_path.stem + "_clone.tsv")
else:
out_FilePrefix = (
"dandelion_define_clones"
if outFilePrefix is None
else outFilePrefix
)
h_file1 = tmpFolder / (out_FilePrefix + "_heavy-clone.tsv")
h_file2 = outFolder / (out_FilePrefix + "_heavy-clone.tsv")
l_file = tmpFolder / (out_FilePrefix + "_light.tsv")
outfile = outFolder / (out_FilePrefix + "_clone.tsv")
write_airr(dat_h, h_file1)
write_airr(dat_l, l_file)
v_field = (
"v_call_genotyped" if "v_call_genotyped" in dat.columns else "v_call"
)
cmd = [
"DefineClones.py",
"-d",
str(h_file1),
"-o",
str(h_file2),
"--act",
action,
"--model",
model,
"--norm",
norm,
"--dist",
str(dist),
"--nproc",
str(nproc),
"--vf",
v_field,
]
cmd = cmd + additional_args
def clusterLinkage(cell_series, group_series):
"""
Return a dictionary of {cell_id : cluster_id}.
that identifies clusters of cells by analyzing their shared
features (group_series) using single linkage.
Arguments:
cell_series (iter): iter of cell ids.
group_series (iter): iter of group ids.
Returns:
dict: dictionary of {cell_id : cluster_id}.
"""
# assign initial clusters
# initial_dict = {cluster1: [cell1], cluster2: [cell1]}
initial_dict = {}
for cell, group in zip(cell_series, group_series):
try:
initial_dict[group].append(cell)
except KeyError:
initial_dict[group] = [cell]
# naive single linkage clustering (ON^2 best case, ON^3 worst case) ...ie for cells with multiple light chains
# cluster_dict = {cluster1: [cell1, cell2]}, 2 cells belong in same group if they share 1 light chain
while True:
cluster_dict = {}
for i, group in enumerate(initial_dict.keys()):
cluster_dict[i] = initial_dict[group]
for cluster in cluster_dict:
# if initial_dict[group] and cluster_dict[cluster] share common cells, add initial_dict[group] to
# cluster
if cluster != i and any(
cell in initial_dict[group]
for cell in cluster_dict[cluster]
):
cluster_dict[cluster] = (
cluster_dict[cluster] + initial_dict[group]
)
del cluster_dict[i]
break
# break if clusters stop changing, otherwise restart
if len(cluster_dict.keys()) == len(initial_dict.keys()):
break
else:
initial_dict = cluster_dict.copy()
# invert cluster_dict for return
assign_dict = {
cell: k for k, v in cluster_dict.items() for cell in set(v)
}
return assign_dict
# TODO: might need to remove this function to drop requirement to maintain this as a dependency internally
def _lightCluster(heavy_file, light_file, out_file, doublets, fileformat):
"""
Split heavy chain clones based on light chains.
Arguments:
heavy_file (str): heavy chain input file.
light_file (str): light chain input file.
out_file (str): heavy chain output file.
doublets (str): method for handling multiple heavy chains per cell. one of 'drop' or 'count'.
format (str): file format. one of 'changeo' or 'airr'.
"""
# Set column names
if fileformat == "changeo":
cell_id = "cell_id"
clone_id = "clone_id"
v_call = "v_call"
j_call = "j_call"
junction_length = "junction_length"
umi_count = "umicount"
elif fileformat == "airr":
cell_id = "cell_id"
clone_id = "clone_id"
v_call = "v_call"
j_call = "j_call"
junction_length = "junction_length"
umi_count = "umi_count"
else:
sys.exit("Invalid format %s" % fileformat)
# read in heavy and light DFs
heavy_df = pd.read_csv(
heavy_file, dtype="object", na_values=["", "None", "NA"], sep="\t"
)
light_df = pd.read_csv(
light_file, dtype="object", na_values=["", "None", "NA"], sep="\t"
)
# column checking
expected_heavy_columns = [
cell_id,
clone_id,
v_call,
j_call,
junction_length,
umi_count,
]
if set(expected_heavy_columns).issubset(heavy_df.columns) is False:
raise ValueError(
"Missing one or more columns in heavy chain file: "
+ ", ".join(expected_heavy_columns)
)
expected_light_columns = [
cell_id,
v_call,
j_call,
junction_length,
umi_count,
]
if set(expected_light_columns).issubset(light_df.columns) is False:
raise ValueError(
"Missing one or more columns in light chain file: "
+ ", ".join(expected_light_columns)
)
# Fix types
try:
heavy_df[junction_length] = heavy_df[junction_length].astype("int")
light_df[junction_length] = light_df[junction_length].astype("int")
except:
heavy_df[junction_length] = heavy_df[junction_length].replace(
np.nan, pd.NA
)
light_df[junction_length] = light_df[junction_length].replace(
np.nan, pd.NA
)
heavy_df[junction_length] = heavy_df[junction_length].astype(
"Int64"
)
light_df[junction_length] = light_df[junction_length].astype(
"Int64"
)
# filter multiple heavy chains
if doublets == "drop":
heavy_df = heavy_df.drop_duplicates(cell_id, keep=False)
if heavy_df.empty is True:
raise ValueError(
"Empty heavy chain data, after doublets drop. Are you combining experiments "
"in a single file? If so, split your data into multiple files."
)
elif doublets == "count":
heavy_df[umi_count] = heavy_df[umi_count].astype("int")
heavy_df = heavy_df.groupby(cell_id, sort=False).apply(
lambda x: x.nlargest(1, umi_count)
)
# transfer clone IDs from heavy chain df to light chain df
clone_dict = {
v[cell_id]: v[clone_id]
for k, v in heavy_df[[clone_id, cell_id]].T.to_dict().items()
}
light_df = light_df.loc[
light_df[cell_id].apply(lambda x: x in clone_dict.keys()),
]
light_df[clone_id] = light_df.apply(
lambda row: clone_dict[row[cell_id]], axis=1
)
# generate a "cluster_dict" of CELL:CLONE dictionary from light df (TODO: use receptor object V/J gene names)
cluster_dict = clusterLinkage(
light_df[cell_id],
light_df.apply(
lambda row: getGene(row[v_call])
+ ","
+ getGene(row[j_call])
+ ","
+ str(row[junction_length])
+ ","
+ row[clone_id],
axis=1,
),
)
# add assignments to heavy_df
heavy_df = heavy_df.loc[
heavy_df[cell_id].apply(lambda x: x in cluster_dict.keys()), :
]
heavy_df[clone_id] = (
heavy_df[clone_id]
+ "_"
+ heavy_df.apply(
lambda row: str(cluster_dict[row[cell_id]]), axis=1
)
)
# write heavy chains
write_airr(heavy_df, out_file)
return (heavy_df, light_df)
logg.info("Running command: %s\n" % (" ".join(cmd)))
run(cmd)
h_df, l_df = _lightCluster(
h_file2, l_file, outfile, doublets=doublets, fileformat=fileformat
)
h_df = load_data(h_df)
# create a dictionary for cell_id : clone_id from h_df
linked_clones = dict(zip(h_df["cell_id"], h_df["clone_id"]))
# create a clone_reference
clone_ref = list(set(h_df["clone_id"]))
clone_ref = [c.split("_")[1] if c is not np.nan else c for c in clone_ref]
l_df = load_data(l_df)
for x in l_df.index:
if l_df.loc[x, "clone_id"] in clone_ref:
l_df.at[x, "clone_id"] = linked_clones[l_df.loc[x, "cell_id"]]
else:
try:
l_df.at[x, "clone_id"] = l_df.loc[x, "cell_id"] + "_notlinked"
except:
pass
cloned_ = pd.concat([h_df, l_df])
# transfer the new clone_id to the heavy + light file
dat_[str(clone_key)] = pd.Series(cloned_["clone_id"])
dat_[str(clone_key)] = dat_[str(clone_key)].fillna("")
if isinstance(vdj, Dandelion):
vdj._data[str(clone_key)] = dat_[str(clone_key)]
vdj.update_metadata(clone_key=str(clone_key))
else:
out = Dandelion(
data=dat_,
clone_key=clone_key,
verbose=False,
)
return out
logg.info(
" finished",
time=start,
deep=(
"Updated Dandelion object: \n"
" 'data', contig-indexed AIRR table\n"
" 'metadata', cell-indexed observations table\n"
),
)
def create_germlines(
vdj: Dandelion | pd.DataFrame | str,
germline: str | None = None,
org: Literal["human", "mouse"] = "human",
db: Literal["imgt", "ogrdb"] = "imgt",
strain: (
Literal[
"c57bl6",
"balbc",
"129S1_SvImJ",
"AKR_J",
"A_J",
"BALB_c_ByJ",
"BALB_c",
"C3H_HeJ",
"C57BL_6J",
"C57BL_6",
"CAST_EiJ",
"CBA_J",
"DBA_1J",
"DBA_2J",
"LEWES_EiJ",
"MRL_MpJ",
"MSM_MsJ",
"NOD_ShiLtJ",
"NOR_LtJ",
"NZB_BlNJ",
"PWD_PhJ",
"SJL_J",
]
| None
) = None,
genotyped_fasta: str | None = None,
additional_args: list[str] = [],
save: str | None = None,
) -> Dandelion:
"""
Run CreateGermlines.py to reconstruct the germline V(D)J sequence.
Parameters
----------
vdj : Dandelion | pd.DataFrame | str
Dandelion object, pandas DataFrame in changeo/airr format, or file path to changeo/airr
file after clones have been determined.
germline : str | None, optional
path to germline database folder. `None` defaults to environmental variable.
org : Literal["human", "mouse"], optional
organism of germline database.
db : Literal["imgt", "ogrdb"], optional
`imgt` or `ogrdb` reference database.
strain : Literal["c57bl6", "balbc", "129S1_SvImJ", "AKR_J", "A_J", "BALB_c_ByJ", "BALB_c", "C3H_HeJ", "C57BL_6J", "C57BL_6", "CAST_EiJ", "CBA_J", "DBA_1J", "DBA_2J", "LEWES_EiJ", "MRL_MpJ", "MSM_MsJ", "NOD_ShiLtJ", "NOR_LtJ", "NZB_BlNJ", "PWD_PhJ", "SJL_J"] | None, optional
strain of mouse to use for germline sequences. Only for `db="ogrdb"`. Note that only "c57bl6", "balbc", "CAST_EiJ", "LEWES_EiJ", "MSM_MsJ", "NOD_ShiLt_J" and "PWD_PhJ" contains both heavy chain and light chain germline sequences as a set.
The rest will not allow igblastn and MakeDB.py to generate a successful airr table (check the failed file). "c57bl6" and "balbc" are merged databases of "C57BL_6" with "C57BL_6J" and "BALB_c" with "BALB_c_ByJ" respectively. None defaults to all combined.
genotyped_fasta : str | None, optional
location to corrected v genotyped fasta file.
additional_args : list[str], optional
additional arguments to pass to `CreateGermlines.py.`
save : str | None, optional
if provided, saves to specified file path.
Returns
-------
Dandelion
Dandelion object with `.germlines` slot populated.
"""
start = logg.info("Reconstructing germline sequences")
if not isinstance(vdj, Dandelion):
tmpfile = (
Path(vdj)
if os.path.isfile(vdj)
else Path(tempfile.TemporaryDirectory().name) / "tmp.tsv"
)
if isinstance(vdj, pd.DataFrame):
write_airr(data=vdj.germline, save=tmpfile)
creategermlines(
airr_file=tmpfile,
germline=germline,
org=org,
genotyped_fasta=genotyped_fasta,
db=db,
strain=strain,
additional_args=additional_args,
)
else:
tmppath = Path(tempfile.TemporaryDirectory().name)
tmppath.mkdir(parents=True, exist_ok=True)
tmpfile = tmppath / "tmp.tsv"
vdj.write_airr(filename=tmpfile)
if len(vdj.germline) > 0:
tmpgmlfile = tmppath / "germ.fasta"
write_fasta(fasta_dict=vdj.germline, out_fasta=tmpgmlfile)
creategermlines(
airr_file=tmpfile,
germline=tmpgmlfile,
org=org,
db=db,
strain=strain,
additional_args=additional_args,
)
else:
creategermlines(
airr_file=tmpfile,
germline=germline,
org=org,
genotyped_fasta=genotyped_fasta,
db=db,
strain=strain,
additional_args=additional_args,
)
# return as Dandelion object
germpass_outfile = tmpfile.parent / (tmpfile.stem + "_germ-pass.tsv")
if isinstance(vdj, Dandelion):
vdj.__init__(
data=germpass_outfile,
metadata=vdj._metadata,
germline=vdj.germline,
layout=vdj.layout,
graph=vdj.graph,
verbose=False,
)
out_vdj = vdj.copy()
else:
out_vdj = Dandelion(germpass_outfile, verbose=False)
out_vdj.store_germline_reference(
corrected=genotyped_fasta, germline=germline, org=org
)
if save is not None:
shutil.move(germpass_outfile, save)
logg.info(
" finished",
time=start,
deep=("Returning Dandelion object: \n"),
)
return out_vdj