Source code for dandelion.tools._tools

#!/usr/bin/env python
import math
import os
import re
import sys

import networkx as nx
import numpy as np
import pandas as pd
import scanpy as sc

from anndata import AnnData
from changeo.Gene import getGene
from collections import defaultdict, Counter
from distance import hamming
from itertools import product
from scanpy import logging as logg
from scipy.sparse import csr_matrix
from scipy.spatial.distance import pdist, squareform
from subprocess import run
from tqdm import tqdm
from typing import Literal

from dandelion.tools._network import *
from dandelion.utilities._core import *
from dandelion.utilities._io import *
from dandelion.utilities._utilities import *


[docs] def find_clones( vdj_data: Dandelion | pd.DataFrame, identity: dict[str, float] | float = 0.85, key: str | None = None, by_alleles: bool = False, key_added: str | None = None, recalculate_length: bool = True, verbose: bool = True, **kwargs, ) -> Dandelion: """ Find clones based on VDJ chain and VJ chain CDR3 junction hamming distance. Parameters ---------- vdj_data : Dandelion | pd.DataFrame `Dandelion` object, pandas `DataFrame` in changeo/airr format, or file path to changeo/airr file after clones have been determined. identity : dict[str, float] | float, optional junction similarity parameter. Default 0.85. If provided as a dictionary, please use the following keys:'ig', 'tr-ab', 'tr-gd'. key : str | None, optional column name for performing clone clustering. `None` defaults to 'junction_aa'. by_alleles : bool, optional whether or not to collapse alleles to genes. `None` defaults to False. key_added : str | None, optional If specified, this will be the column name for clones. `None` defaults to 'clone_id' recalculate_length : bool, optional whether or not to re-calculate junction length, rather than rely on parsed assignment (which occasionally is wrong). Default is True verbose : bool, optional whether or not to print progress. **kwargs Additional arguments to pass to `Dandelion.update_metadata`. Returns ------- Dandelion `Dandelion` object with clone_id annotated in `.data` slot and `.metadata` initialized. Raises ------ ValueError if `key` not found in Dandelion.data. """ start = logg.info("Finding clonotypes") pd.set_option("mode.chained_assignment", None) if isinstance(vdj_data, Dandelion): dat_ = load_data(vdj_data.data) else: dat_ = load_data(vdj_data) clone_key = key_added if key_added is not None else "clone_id" dat_[clone_key] = "" dat = dat_.copy() if "ambiguous" in dat_: dat = dat_[dat_["ambiguous"] == "F"].copy() locus_log = {"ig": "B", "tr-ab": "abT", "tr-gd": "gdT"} locus_dict1 = {"ig": ["IGH"], "tr-ab": ["TRB"], "tr-gd": ["TRD"]} locus_dict2 = {"ig": ["IGK", "IGL"], "tr-ab": ["TRA"], "tr-gd": ["TRG"]} DEFAULTIDENTITY = {"ig": 0.85, "tr-ab": 1, "tr-gd": 1} key_ = key if key is not None else "junction_aa" # default if key_ not in dat.columns: raise ValueError("key {} not found in input table.".format(key_)) locuses = ["ig", "tr-ab", "tr-gd"] # quick check for locus in locuses: locus_1 = locus_dict1[locus] locus_2 = locus_dict2[locus] dat_vj = dat[dat["locus"].isin(locus_2)].copy() dat_vdj = dat[dat["locus"].isin(locus_1)].copy() chain_check = check_chains(dat_vdj=dat_vdj, dat_vj=dat_vj) if all(~chain_check["All VDJ"]) and all(~chain_check["All VJ"]): locuses.remove(locus) if len(locuses) > 0: for locusx in locuses: locus_1 = locus_dict1[locusx] locus_2 = locus_dict2[locusx] if isinstance(identity, dict): if locusx not in identity: identity.update({locusx: DEFAULTIDENTITY[locusx]}) warnings.warn( UserWarning( "Identity value for {} chains ".format(locusx) + "not specified in provided dictionary. " + "Defaulting to {} for {} chains.".format( DEFAULTIDENTITY[locusx], locusx ) ) ) identity_ = identity[locusx] else: identity_ = identity dat_vj = dat[dat["locus"].isin(locus_2)].copy() dat_vdj = dat[dat["locus"].isin(locus_1)].copy() chain_check = check_chains(dat_vdj=dat_vdj, dat_vj=dat_vj) if dat_vdj.shape[0] > 0: vj_len_grp_vdj, seq_grp_vdj = group_sequences( dat_vdj, junction_key=key_, recalculate_length=recalculate_length, by_alleles=by_alleles, locus=locusx, ) cid_vdj = group_pairwise_hamming_distance( clonotype_vj_len_group=vj_len_grp_vdj, clonotype_sequence_group=seq_grp_vdj, identity=identity_, locus=locus_log[locusx], chain="VDJ", verbose=verbose, ) clone_dict_vdj = rename_clonotype_ids( clonotype_groups=cid_vdj, cell_type=locus_log[locusx] + "_VDJ_", ) # add it to the original dataframes dat_vdj[clone_key] = pd.Series(clone_dict_vdj) # dat[clone_key].update(pd.Series(dat_vdj[clone_key])) for i, row in dat_vdj.iterrows(): if i in dat.index: dat.at[i, clone_key] = row[clone_key] if dat_vj.shape[0] > 0: vj_len_grp_vj, seq_grp_vj = group_sequences( dat_vj, junction_key=key_, recalculate_length=recalculate_length, by_alleles=by_alleles, locus=locusx, ) cid_vj = group_pairwise_hamming_distance( clonotype_vj_len_group=vj_len_grp_vj, clonotype_sequence_group=seq_grp_vj, identity=identity_, locus=locus_log[locusx], chain="VJ", verbose=verbose, ) clone_dict_vj = rename_clonotype_ids( clonotype_groups=cid_vj, cell_type=locus_log[locusx] + "_VJ_", ) refine_clone_assignment( dat=dat, clone_key=clone_key, clone_dict_vj=clone_dict_vj, verbose=verbose, ) # dat_[clone_key].update(pd.Series(dat[clone_key])) for i, row in dat.iterrows(): if i in dat_.index: dat_.at[i, clone_key] = row[clone_key] # dat_[clone_key].replace('', 'unassigned') if os.path.isfile(str(vdj_data)): data_path = Path(vdj_data) write_airr(dat_, data_path.parent / (data_path.stem + "_clone.tsv")) logg.info( " finished", time=start, deep=( "Updated Dandelion object: \n" " 'data', contig-indexed AIRR table\n" " 'metadata', cell-indexed observations table\n" ), ) if isinstance(vdj_data, Dandelion): if vdj_data.germline is not None: germline_ = vdj_data.germline else: germline_ = None if vdj_data.layout is not None: layout_ = vdj_data.layout else: layout_ = None if vdj_data.graph is not None: graph_ = vdj_data.graph else: graph_ = None if vdj_data.threshold is not None: threshold_ = vdj_data.threshold else: threshold_ = None if ("clone_id" in vdj_data.data.columns) and (key_added is None): # TODO: need to check the following bits if it works properly if only heavy chain tables are provided vdj_data.__init__( data=dat_, germline=germline_, layout=layout_, graph=graph_, ) vdj_data.update_metadata(reinitialize=True, **kwargs) elif ("clone_id" in vdj_data.data.columns) and (key_added is not None): vdj_data.__init__( data=dat_, germline=germline_, layout=layout_, graph=graph_, ) vdj_data.update_metadata( reinitialize=True, clone_key="clone_id", retrieve=clone_key, retrieve_mode="merge and unique only", **kwargs, ) else: vdj_data.__init__( data=dat_, germline=germline_, layout=layout_, graph=graph_, clone_key=clone_key, ) vdj_data.update_metadata( reinitialize=True, clone_key=clone_key, **kwargs ) vdj_data.threshold = threshold_ else: out = Dandelion( data=dat_, clone_key=clone_key, retrieve=clone_key, retrieve_mode="merge and unique only", **kwargs, ) return out
[docs] def transfer( adata: AnnData, dandelion: Dandelion, expanded_only: bool = False, neighbors_key: str | None = None, rna_key: str | None = None, vdj_key: str | None = None, clone_key: str | None = None, collapse_nodes: bool = False, overwrite: bool | list[str] | str | None = None, ) -> None: """ Transfer data in `Dandelion` slots to `AnnData` object, updating the `.obs`, `.uns`, `.obsm` and `.obsp`slots. Parameters ---------- adata : AnnData `AnnData` object. dandelion : Dandelion `Dandelion` object. expanded_only : bool, optional Whether or not to transfer the embedding with all cells with BCR (False) or only for expanded clones (True). neighbors_key : str | None, optional key for 'neighbors' slot in `.uns`. rna_key : str | None, optional prefix for stashed RNA connectivities and distances. vdj_key : str | None, optional prefix for stashed VDJ connectivities and distances. clone_key : str | None, optional column name of clone/clonotype ids. Only used for integration with scirpy. collapse_nodes : bool, optional Whether or not to transfer a cell x cell or clone x clone connectivity matrix into `.uns`. Only used for integration with scirpy. overwrite : bool | list[str] | str | None, optional Whether or not to overwrite existing anndata columns. Specifying a string indicating column name or list of column names will overwrite that specific column(s). """ start = logg.info("Transferring network") # always overwrite with whatever columns are in dandelion's metadata: for x in dandelion.metadata.columns: if x not in adata.obs.columns: adata.obs[x] = pd.Series(dandelion.metadata[x]) elif overwrite is True: adata.obs[x] = pd.Series(dandelion.metadata[x]) if type_check(dandelion.metadata, x): adata.obs[x] = adata.obs[x].replace(np.nan, "No_contig") if adata.obs[x].dtype == "bool": adata.obs[x] = [str(x) for x in adata.obs[x]] if (overwrite is not None) and (overwrite is not True): if not type(overwrite) is list: overwrite = [overwrite] for ow in overwrite: adata.obs[ow] = pd.Series(dandelion.metadata[ow]) if type_check(dandelion.metadata, ow): adata.obs[ow] = adata.obs[ow].replace(np.nan, "No_contig") if dandelion.graph is not None: if expanded_only: G = dandelion.graph[1] else: G = dandelion.graph[0] logg.info("converting matrices") distances = nx.to_pandas_adjacency( G, dtype=np.float32, weight="weight", nonedge=np.nan ) df_distances = distances.reindex( index=adata.obs_names, columns=adata.obs_names ) # convert to connectivity distances = distances.apply(lambda x: np.maximum(1e-45, 1 / np.exp(x))) df_connectivities = distances.reindex( index=adata.obs_names, columns=adata.obs_names ) df_connectivities = df_connectivities.values df_connectivities[np.isnan(df_connectivities)] = 0 df_distances = df_distances.values df_distances[np.isnan(df_distances)] = 0 df_connectivities_ = csr_matrix(df_connectivities, dtype=np.float32) df_distances = csr_matrix(df_distances, dtype=np.float32) logg.info("Updating anndata slots") if neighbors_key is None: neighbors_key = "neighbors" if neighbors_key not in adata.uns: skip_stash = True adata.uns[neighbors_key] = {} rna_neighbors_key = "rna_" + neighbors_key vdj_neighbors_key = "vdj_" + neighbors_key if rna_neighbors_key not in adata.uns: adata.uns[rna_neighbors_key] = adata.uns[neighbors_key].copy() if rna_key is None: r_connectivities_key = "rna_connectivities" r_distances_key = "rna_distances" else: r_connectivities_key = rna_key + "_connectivitites" r_distances_key = rna_key + "_distances" if vdj_key is None: b_connectivities_key = "vdj_connectivities" b_distances_key = "vdj_distances" else: b_connectivities_key = vdj_key + "_connectivitites" b_distances_key = vdj_key + "_distances" # stash_rna_connectivities: if r_connectivities_key not in adata.obsp: if "skip_stash" not in locals(): try: adata.obsp[r_connectivities_key] = adata.obsp[ "connectivities" ].copy() adata.obsp[r_distances_key] = adata.obsp["distances"].copy() except: adata.obsp[r_connectivities_key] = adata.uns[neighbors_key][ "connectivities" ] adata.obsp[r_distances_key] = adata.uns[neighbors_key][ "distances" ] # always overwrite the bcr slots adata.obsp["connectivities"] = df_connectivities_.copy() adata.obsp["distances"] = df_distances.copy() adata.obsp[b_connectivities_key] = adata.obsp["connectivities"].copy() adata.obsp[b_distances_key] = adata.obsp["distances"].copy() # create the dictionary that will enable the use of scirpy's plotting. clonekey = clone_key if clone_key is not None else "clone_id" if not collapse_nodes: df_connectivities_[df_connectivities_.nonzero()] = 1 cell_indices = { str(i): np.array([k]) for i, k in zip(range(0, len(adata.obs_names)), adata.obs_names) } else: cell_indices = Tree() for x, y in adata.obs[clonekey].items(): if y not in [ "", "unassigned", np.nan, "NaN", "NA", "nan", "None", None, "none", ]: cell_indices[y][x].value = 1 cell_indices = { str(x): np.array(list(r)) for x, r in zip( range(0, len(cell_indices)), cell_indices.values() ) } df_connectivities_ = np.zeros( [len(cell_indices), len(cell_indices)] ) np.fill_diagonal(df_connectivities_, 1) df_connectivities_ = csr_matrix(df_connectivities_) adata.uns[clonekey] = { # this is a symmetrical, pairwise, sparse distance matrix of clonotypes # the matrix is offset by 1, i.e. 0 = no connection, 1 = distance 0 "distances": df_connectivities_, # '0' refers to the row/col index in the `distances` matrix # (numeric index, but needs to be strbecause of h5py) # np.array(["cell1", "cell2"]) points to the rows in `adata.obs` "cell_indices": cell_indices, } tmp = adata.obs.copy() if dandelion.graph is not None: if dandelion.layout is not None: if expanded_only: coord = pd.DataFrame.from_dict( dandelion.layout[1], orient="index" ) else: coord = pd.DataFrame.from_dict( dandelion.layout[0], orient="index" ) for x in coord.columns: tmp[x] = coord[x] X_vdj = np.array(tmp[[0, 1]], dtype=np.float32) adata.obsm["X_vdj"] = X_vdj logg.info( " finished", time=start, deep=( "updated `.obs` with `.metadata`\n" "added to `.uns['" + neighbors_key + "']` and `.uns['" + clonekey + "']`\n" "and `.obsp`\n" " 'distances', clonotype-weighted adjacency matrix\n" " 'connectivities', clonotype-weighted adjacency matrix" ), ) else: logg.info( " finished", time=start, deep=("updated `.obs` with `.metadata`\n") )
[docs] def define_clones( vdj_data: Dandelion | pd.DataFrame | str, dist: float | None = None, action: Literal["first", "set"] = "set", model: Literal[ "ham", "aa", "hh_s1f", "hh_s5f", "mk_rs1nf", "mk_rs5nf", "hs1f_compat", "m1n_compat", ] = "ham", norm: Literal["len", "mut", "none"] = "len", doublets: Literal["drop", "count"] = "drop", fileformat: Literal["changeo", "airr"] = "airr", ncpu: int | None = None, outFilePrefix: int | None = None, key_added: int | None = None, out_dir: Path | str | None = None, additional_args: list[str] = [], ) -> Dandelion: """ Find clones using changeo's `DefineClones.py <https://changeo.readthedocs.io/en/stable/tools/DefineClones.html>`__. Only callable for BCR data at the moment. Parameters ---------- vdj_data : Dandelion | pd.DataFrame | str `Dandelion` object, pandas `DataFrame` in changeo/airr format, or file path to changeo/airr file after clones have been determined. dist : float | None, optional The distance threshold for clonal grouping. If None, the value will be retrieved from the Dandelion class `.threshold` slot. action : Literal["first", "set"], optional Specifies how to handle multiple V(D)J assignments for initial grouping. Default is 'set'. The “first” action will use only the first gene listed. The “set” action will use all gene assignments and construct a larger gene grouping composed of any sequences sharing an assignment or linked to another sequence by a common assignment (similar to single-linkage). model : Literal["ham", "aa", "hh_s1f", "hh_s5f", "mk_rs1nf", "mk_rs5nf", "hs1f_compat", "m1n_compat", ], optional Specifies which substitution model to use for calculating distance between sequences. Default is 'ham'. The “ham” model is nucleotide Hamming distance and “aa” is amino acid Hamming distance. The “hh_s1f” and “hh_s5f” models are human specific single nucleotide and 5-mer content models, respectively, from Yaari et al, 2013. The “mk_rs1nf” and “mk_rs5nf” models are mouse specific single nucleotide and 5-mer content models, respectively, from Cui et al, 2016. The “m1n_compat” and “hs1f_compat” models are deprecated models provided backwards compatibility with the “m1n” and “hs1f” models in Change-O v0.3.3 and SHazaM v0.1.4. Both 5-mer models should be considered experimental. norm : Literal["len", "mut", "none"], optional Specifies how to normalize distances. Default is 'len'. 'none' (do not normalize), 'len' (normalize by length), or 'mut' (normalize by number of mutations between sequences). doublets : Literal["drop", "count"], optional Option to control behaviour when dealing with heavy chain 'doublets'. Default is 'drop'. 'drop' will filter out the doublets while 'count' will retain only the highest umi count contig. fileformat : Literal["changeo", "airr"], optional Format of V(D)J file/objects. Default is 'airr'. Also accepts 'changeo'. ncpu : int | None, optional Number of cpus for parallelization. Default is 1, no parallelization. outFilePrefix : int | None, optional If specified, the out file name will have this prefix. `None` defaults to 'dandelion_define_clones' key_added : int | None, optional Column name to add for define_clones. out_dir : Path | str | None, optional If specified, the files will be written to this directory. additional_args : list[str], optional Additional arguments to pass to `DefineClones.py`. Returns ------- Dandelion `Dandelion` object with clone_id annotated in `.data` slot and `.metadata` initialized. Raises ------ ValueError if .threshold not found in `Dandelion`. """ start = logg.info("Finding clones") if ncpu is None: nproc = 1 else: nproc = ncpu if key_added is None: clone_key = "clone_id" else: clone_key = key_added if isinstance(vdj_data, Dandelion): dat_ = load_data(vdj_data.data) else: dat_ = load_data(vdj_data) if "ambiguous" in dat_: dat = dat_[dat_["ambiguous"] == "F"].copy() else: dat = dat_.copy() dat_h = dat[dat["locus"] == "IGH"] dat_l = dat[dat["locus"].isin(["IGK", "IGL"])] if os.path.isfile(str(vdj_data)): vdj_path = Path(vdj_data) tmpFolder = vdj_path.parent / "tmp" outFolder = vdj_path.parent elif out_dir is not None: vdj_path = Path(out_dir) tmpFolder = vdj_path / "tmp" outFolder = vdj_path else: import tempfile outFolder = Path(tempfile.TemporaryDirectory().name) tmpFolder = outFolder / "tmp" for _ in [outFolder, tmpFolder]: _.mkdir(parents=True, exist_ok=True) if "vdj_path" in locals(): h_file1 = tmpFolder / (vdj_path.stem + "_heavy-clone.tsv") h_file2 = outFolder / (vdj_path.stem + "_heavy-clone.tsv") l_file = tmpFolder / (vdj_path.stem + "_light.tsv") outfile = outFolder / (vdj_path.stem + "_clone.tsv") else: out_FilePrefix = ( "dandelion_define_clones" if outFilePrefix is None else outFilePrefix ) h_file1 = tmpFolder / (out_FilePrefix + "_heavy-clone.tsv") h_file2 = outFolder / (out_FilePrefix + "_heavy-clone.tsv") l_file = tmpFolder / (out_FilePrefix + "_light.tsv") outfile = outFolder / (out_FilePrefix + "_clone.tsv") write_airr(dat_h, h_file1) write_airr(dat_l, l_file) v_field = ( "v_call_genotyped" if "v_call_genotyped" in dat.columns else "v_call" ) if dist is None: if isinstance(vdj_data, Dandelion): if vdj_data.threshold is not None: dist_ = vdj_data.threshold else: raise ValueError( "Threshold value in Dandelion object is None. Please run calculate_threshold first" ) else: raise ValueError( "Distance value is None. Please provide a distance value (float)" ) else: dist_ = dist cmd = [ "DefineClones.py", "-d", str(h_file1), "-o", str(h_file2), "--act", action, "--model", model, "--norm", norm, "--dist", str(dist_), "--nproc", str(nproc), "--vf", v_field, ] cmd = cmd + additional_args def clusterLinkage(cell_series, group_series): """ Return a dictionary of {cell_id : cluster_id}. that identifies clusters of cells by analyzing their shared features (group_series) using single linkage. Arguments: cell_series (iter): iter of cell ids. group_series (iter): iter of group ids. Returns: dict: dictionary of {cell_id : cluster_id}. """ # assign initial clusters # initial_dict = {cluster1: [cell1], cluster2: [cell1]} initial_dict = {} for cell, group in zip(cell_series, group_series): try: initial_dict[group].append(cell) except KeyError: initial_dict[group] = [cell] # naive single linkage clustering (ON^2 best case, ON^3 worst case) ...ie for cells with multiple light chains # cluster_dict = {cluster1: [cell1, cell2]}, 2 cells belong in same group if they share 1 light chain while True: cluster_dict = {} for i, group in enumerate(initial_dict.keys()): cluster_dict[i] = initial_dict[group] for cluster in cluster_dict: # if initial_dict[group] and cluster_dict[cluster] share common cells, add initial_dict[group] to # cluster if cluster != i and any( cell in initial_dict[group] for cell in cluster_dict[cluster] ): cluster_dict[cluster] = ( cluster_dict[cluster] + initial_dict[group] ) del cluster_dict[i] break # break if clusters stop changing, otherwise restart if len(cluster_dict.keys()) == len(initial_dict.keys()): break else: initial_dict = cluster_dict.copy() # invert cluster_dict for return assign_dict = { cell: k for k, v in cluster_dict.items() for cell in set(v) } return assign_dict # TODO: might need to remove this function to drop requirement to maintain this as a dependency internally def _lightCluster(heavy_file, light_file, out_file, doublets, fileformat): """ Split heavy chain clones based on light chains. Arguments: heavy_file (str): heavy chain input file. light_file (str): light chain input file. out_file (str): heavy chain output file. doublets (str): method for handling multiple heavy chains per cell. one of 'drop' or 'count'. format (str): file format. one of 'changeo' or 'airr'. """ # Set column names if fileformat == "changeo": cell_id = "cell_id" clone_id = "clone_id" v_call = "v_call" j_call = "j_call" junction_length = "junction_length" umi_count = "umicount" elif fileformat == "airr": cell_id = "cell_id" clone_id = "clone_id" v_call = "v_call" j_call = "j_call" junction_length = "junction_length" umi_count = "umi_count" else: sys.exit("Invalid format %s" % fileformat) # read in heavy and light DFs heavy_df = pd.read_csv( heavy_file, dtype="object", na_values=["", "None", "NA"], sep="\t" ) light_df = pd.read_csv( light_file, dtype="object", na_values=["", "None", "NA"], sep="\t" ) # column checking expected_heavy_columns = [ cell_id, clone_id, v_call, j_call, junction_length, umi_count, ] if set(expected_heavy_columns).issubset(heavy_df.columns) is False: raise ValueError( "Missing one or more columns in heavy chain file: " + ", ".join(expected_heavy_columns) ) expected_light_columns = [ cell_id, v_call, j_call, junction_length, umi_count, ] if set(expected_light_columns).issubset(light_df.columns) is False: raise ValueError( "Missing one or more columns in light chain file: " + ", ".join(expected_light_columns) ) # Fix types try: heavy_df[junction_length] = heavy_df[junction_length].astype("int") light_df[junction_length] = light_df[junction_length].astype("int") except: heavy_df[junction_length] = heavy_df[junction_length].replace( np.nan, pd.NA ) light_df[junction_length] = light_df[junction_length].replace( np.nan, pd.NA ) heavy_df[junction_length] = heavy_df[junction_length].astype( "Int64" ) light_df[junction_length] = light_df[junction_length].astype( "Int64" ) # filter multiple heavy chains if doublets == "drop": heavy_df = heavy_df.drop_duplicates(cell_id, keep=False) if heavy_df.empty is True: raise ValueError( "Empty heavy chain data, after doublets drop. Are you combining experiments " "in a single file? If so, split your data into multiple files." ) elif doublets == "count": heavy_df[umi_count] = heavy_df[umi_count].astype("int") heavy_df = heavy_df.groupby(cell_id, sort=False).apply( lambda x: x.nlargest(1, umi_count) ) # transfer clone IDs from heavy chain df to light chain df clone_dict = { v[cell_id]: v[clone_id] for k, v in heavy_df[[clone_id, cell_id]].T.to_dict().items() } light_df = light_df.loc[ light_df[cell_id].apply(lambda x: x in clone_dict.keys()), ] light_df[clone_id] = light_df.apply( lambda row: clone_dict[row[cell_id]], axis=1 ) # generate a "cluster_dict" of CELL:CLONE dictionary from light df (TODO: use receptor object V/J gene names) cluster_dict = clusterLinkage( light_df[cell_id], light_df.apply( lambda row: getGene(row[v_call]) + "," + getGene(row[j_call]) + "," + str(row[junction_length]) + "," + row[clone_id], axis=1, ), ) # add assignments to heavy_df heavy_df = heavy_df.loc[ heavy_df[cell_id].apply(lambda x: x in cluster_dict.keys()), : ] heavy_df[clone_id] = ( heavy_df[clone_id] + "_" + heavy_df.apply( lambda row: str(cluster_dict[row[cell_id]]), axis=1 ) ) # write heavy chains write_airr(heavy_df, out_file) return (heavy_df, light_df) logg.info("Running command: %s\n" % (" ".join(cmd))) run(cmd) h_df, l_df = _lightCluster( h_file2, l_file, outfile, doublets=doublets, fileformat=fileformat ) h_df = load_data(h_df) # create a dictionary for cell_id : clone_id from h_df linked_clones = dict(zip(h_df["cell_id"], h_df["clone_id"])) # create a clone_reference clone_ref = list(set(h_df["clone_id"])) clone_ref = [c.split("_")[1] if c is not np.nan else c for c in clone_ref] l_df = load_data(l_df) for x in l_df.index: if l_df.loc[x, "clone_id"] in clone_ref: l_df.at[x, "clone_id"] = linked_clones[l_df.loc[x, "cell_id"]] else: try: l_df.at[x, "clone_id"] = l_df.loc[x, "cell_id"] + "_notlinked" except: pass cloned_ = pd.concat([h_df, l_df]) # transfer the new clone_id to the heavy + light file dat_[str(clone_key)] = pd.Series(cloned_["clone_id"]) dat_[str(clone_key)] = dat_[str(clone_key)].fillna("") if isinstance(vdj_data, Dandelion): germline_ = vdj_data.germline if vdj_data.germline is not None else None layout_ = vdj_data.layout if vdj_data.layout is not None else None graph_ = vdj_data.graph if vdj_data.graph is not None else None threshold_ = ( vdj_data.threshold if vdj_data.threshold is not None else None ) if ("clone_id" in vdj_data.data) and (clone_key is not None): vdj_data.__init__( data=dat_, germline=germline_, layout=layout_, graph=graph_, initialize=True, retrieve=clone_key, retrieve_mode="merge and unique only", ) elif ("clone_id" not in vdj_data.data) and (clone_key is not None): vdj_data.__init__( data=dat_, germline=germline_, layout=layout_, graph=graph_, initialize=True, clone_key=clone_key, retrieve=clone_key, retrieve_mode="merge and unique only", ) else: vdj_data.__init__( data=dat_, germline=germline_, layout=layout_, graph=graph_, initialize=True, clone_key=clone_key, ) vdj_data.threshold = threshold_ else: if ("clone_id" in dat_.columns) and (clone_key is not None): out = Dandelion( data=dat_, retrieve=clone_key, retrieve_mode="merge and unique only", ) elif ("clone_id" not in dat_.columns) and (clone_key is not None): out = Dandelion(data=dat_, clone_key=clone_key) else: out = Dandelion(data=dat_) return out logg.info( " finished", time=start, deep=( "Updated Dandelion object: \n" " 'data', contig-indexed AIRR table\n" " 'metadata', cell-indexed observations table\n" ), )
def tabuluate_clone_sizes( metadata_: pd.DataFrame, clonesize_dict: dict, clonekey: str ) -> pd.Series: """Tabulate clone sizes.""" return pd.Series( dict( zip( metadata_.index, [ str(y) if pd.notnull(y) else str(0) for y in [ ( sorted( list( {clonesize_dict[c_] for c_ in c.split("|")} ), key=lambda x: ( int(x.split(">= ")[1]) if type(x) is str else int(x) ), reverse=True, )[0] if "|" in c else clonesize_dict[c] ) for c in metadata_[str(clonekey)] ] ], ) ) )
[docs] def clone_size( vdj_data: Dandelion, max_size: int | None = None, clone_key: str | None = None, key_added: str | None = None, ) -> None: """ Quantify size of clones. Parameters ---------- vdj_data : Dandelion `Dandelion` object max_size : int | None, optional The maximum size before value gets clipped. If None, the value will be returned as a numerical value. clone_key : str | None, optional Column name specifying the clone_id column in metadata. key_added : str | None, optional column name where clone size is tabulated into. """ start = logg.info("Quantifying clone sizes") metadata_ = vdj_data.metadata.copy() clonekey = "clone_id" if clone_key is None else clone_key tmp = metadata_[str(clonekey)].str.split("|", expand=True).stack() tmp = tmp.reset_index(drop=False) tmp.columns = ["cell_id", "tmp", str(clonekey)] clonesize = tmp[str(clonekey)].value_counts() if "None" in clonesize.index: clonesize.drop("None", inplace=True) if max_size is not None: clonesize_ = clonesize.astype("object") for i in clonesize.index: if clonesize.loc[i] >= max_size: clonesize_.at[i] = ">= " + str(max_size) clonesize_ = clonesize_.astype("category") else: clonesize_ = clonesize.copy() clonesize_dict = dict(clonesize_) clonesize_dict.update({"None": np.nan}) col_key, col_key_suffix = "", "" col_key = str(clonekey) if key_added is None else key_added col_key_suffix = ( "_size" if max_size is None else "_size_max_" + str(max_size) ) vdj_data.metadata[col_key + col_key_suffix] = tabuluate_clone_sizes( metadata_, clonesize_dict, clonekey ) if max_size is not None: vdj_data.metadata[col_key + col_key_suffix] = vdj_data.metadata[ col_key + col_key_suffix ].astype("category") else: try: vdj_data.metadata[col_key + col_key_suffix] = [ float(x) for x in vdj_data.metadata[col_key + col_key_suffix] ] except ValueError: # this happens if there are multiple clonotypes associated to the cell pass logg.info( " finished", time=start, deep=( "Updated Dandelion object: \n" " 'metadata', cell-indexed clone table" ), )
[docs] def clone_overlap( vdj_data: Dandelion | AnnData, groupby: str, min_clone_size: int | None = None, weighted_overlap: bool = False, clone_key: str | None = None, ) -> pd.DataFrame: """ A function to tabulate clonal overlap for input as a circos-style plot. Parameters ---------- vdj_data : Dandelion | AnnData `Dandelion` or `AnnData` object. groupby : str column name in obs/metadata for collapsing to columns in the clone_id x groupby data frame. min_clone_size : int | None, optional minimum size of clone for plotting connections. Defaults to 2 if left as None. weighted_overlap : bool, optional if True, instead of collapsing to overlap to binary, overlap will be returned as the number of cells. In the future, there will be the option to use something like a jaccard index. clone_key : str | None, optional column name for clones. `None` defaults to 'clone_id'. Returns ------- pd.DataFrame clone_id x groupby overlap :class:`pandas.core.frame.DataFrame'. Raises ------ ValueError if min_clone_size is 0. """ start = logg.info("Finding clones") if isinstance(vdj_data, Dandelion): data = vdj_data.metadata.copy() elif isinstance(vdj_data, AnnData): data = vdj_data.obs.copy() if min_clone_size is None: min_size = 2 else: min_size = int(min_clone_size) if clone_key is None: clone_ = "clone_id" else: clone_ = clone_key # get rid of problematic rows that appear because of category conversion? allgroups = list(data[groupby].unique()) data = data[ ~( data[clone_].isin( [np.nan, "nan", "NaN", "No_contig", "unassigned", "None", None] ) ) ] # prepare a summary table datc_ = data[clone_].str.split("|", expand=True).stack() datc_ = pd.DataFrame(datc_) datc_.reset_index(drop=False, inplace=True) datc_.columns = ["cell_id", "tmp", clone_] datc_.drop("tmp", inplace=True, axis=1) datc_ = datc_[ ~( datc_[clone_].isin( [ "", np.nan, "nan", "NaN", "No_contig", "unassigned", "None", None, ] ) ) ] dictg_ = dict(data[groupby]) datc_[groupby] = [dictg_[l] for l in datc_["cell_id"]] overlap = pd.crosstab(datc_[clone_], datc_[groupby]) for x in allgroups: if x not in overlap: overlap[x] = 0 if min_size == 0: raise ValueError("min_size must be greater than 0.") if not weighted_overlap: if min_size > 2: overlap[overlap < min_size] = 0 overlap[overlap >= min_size] = 1 elif min_size == 2: overlap[overlap >= min_size] = 1 overlap.index.name = None overlap.columns.name = None if isinstance(vdj_data, AnnData): vdj_data.uns["clone_overlap"] = overlap.copy() logg.info( " finished", time=start, deep=("Updated AnnData: \n" " 'uns', clone overlap table"), ) else: return overlap
def clustering( distance_dict: dict, threshold: float, sequences_dict: dict[str, str] ) -> dict: """Clustering the sequences.""" out_dict = {} # find out the unique indices in this subset i_unique = list(set(flatten(distance_dict))) # for every pair of i1,i2 is their dictance smaller than the thresholdeshold? i_pair_d = { (i1, i2): ( distance_dict[(i1, i2)] <= threshold if (i1, i2) in distance_dict else False ) for i1, i2 in product(i_unique, repeat=2) } i_pair_d.update( { (i2, i1): ( distance_dict[(i2, i1)] <= threshold if (i2, i1) in distance_dict else False ) for i1, i2 in product(i_unique, repeat=2) } ) # so which indices should not be part of a clone? canbetogether = defaultdict(list) for ii1, ii2 in product(i_unique, repeat=2): if i_pair_d[(ii1, ii2)] or i_pair_d[(ii2, ii1)]: if (ii1, ii2) in distance_dict: canbetogether[ii1].append((ii1, ii2)) canbetogether[ii2].append((ii1, ii2)) elif (ii2, ii1) in distance_dict: canbetogether[ii2].append((ii2, ii1)) canbetogether[ii1].append((ii2, ii1)) else: if (ii1, ii2) or (ii2, ii1) in distance_dict: canbetogether[ii1].append(()) canbetogether[ii2].append(()) for x in canbetogether: canbetogether[x] = list({y for y in canbetogether[x] if len(y) > 0}) # convert the indices to sequences for x in canbetogether: if len(canbetogether[x]) > 0: out_dict[sequences_dict[x]] = tuple( sorted( set( list( [ sequences_dict[y] for y in flatten(canbetogether[x]) ] ) + [sequences_dict[x]] ) ) ) else: out_dict[sequences_dict[x]] = tuple([sequences_dict[x]]) return out_dict
[docs] def productive_ratio( adata: AnnData, vdj: Dandelion, groupby: str, groups: list[str] | None = None, locus: Literal["TRB", "TRA", "TRD", "TRG", "IGH", "IGK", "IGL"] = "TRB", ): """ Compute the cell-level productive/non-productive contig ratio. Only the contig with the highest umi count in a cell will be used for this tabulation. Returns inplace `AnnData` with `.uns['productive_ratio']`. Parameters ---------- adata : AnnData AnnData object holding the cell level metadata (`.obs`). vdj : Dandelion Dandelion object holding the repertoire data (`.data`). groupby : str Name of column in `AnnData.obs` to return the row tabulations. groups : list[str] | None, optional Optional list of categories to return. locus : Literal["TRB", "TRA", "TRD", "TRG", "IGH", "IGK", "IGL"], optional One of the accepted locuses to perform the tabulation """ start = logg.info("Tabulating productive ratio") vdjx = vdj[(vdj.data.cell_id.isin(adata.obs_names))].copy() if "ambiguous" in vdjx.data: tmp = vdjx[ (vdjx.data.locus == locus) & (vdjx.data.ambiguous == "F") ].copy() else: tmp = vdjx[(vdjx.data.locus == locus)].copy() if groups is None: if is_categorical(adata.obs[groupby]): groups = list(adata.obs[groupby].cat.categories) else: groups = list(set(adata.obs[groupby])) df = tmp.data.drop_duplicates(subset="cell_id") dict_df = dict(zip(df.cell_id, df.productive)) res = pd.DataFrame( columns=["productive", "non-productive", "total"], index=groups, ) adata.obs[locus + "_productive"] = pd.Series(dict_df) for i in range(res.shape[0]): cell = res.index[i] res.loc[cell, "total"] = sum(adata.obs[groupby] == cell) if res.loc[cell, "total"] > 0: res.loc[cell, "productive"] = ( sum( adata.obs.loc[ adata.obs[groupby] == cell, locus + "_productive" ].isin(["T"]) ) / res.loc[cell, "total"] * 100 ) res.loc[cell, "non-productive"] = ( sum( adata.obs.loc[ adata.obs[groupby] == cell, locus + "_productive" ].isin( [ "F", ] ) ) / res.loc[cell, "total"] * 100 ) res[groupby] = res.index res["productive+non-productive"] = res["productive"] + res["non-productive"] out = {"results": res, "locus": locus, "groupby": groupby} adata.uns["productive_ratio"] = out logg.info( " finished", time=start, deep=("Updated AnnData: \n" " 'uns', productive_ratio"), )
[docs] def vj_usage_pca( adata: AnnData, groupby: str, min_size: int = 20, mode: Literal["B", "abT", "gdT"] = "abT", transfer_mapping=None, n_comps: int = 30, groups: list[str] | None = None, allowed_chain_status: list[str] | None = [ "Single pair", "Extra pair", "Extra pair-exception", "Orphan VDJ-exception", ], verbose=False, **kwargs, ) -> AnnData: """ Extract productive V/J gene usage from single cell data and compute PCA. Parameters ---------- adata : AnnData AnnData object holding the cell level metadata with Dandelion VDJ info transferred. groupby : str Column name in `adata.obs` to groupby as observations for PCA. min_size : int, optional Minimum cell size numbers to keep for computing the final matrix. Defaults to 20. mode : Literal["B", "abT", "gdT"], optional Mode for extract the V/J genes. transfer_mapping : None, optional If provided, the columns will be mapped to the output AnnData from the original AnnData. n_comps : int, optional Number of principal components to compute. Defaults to 30. groups : list[str] | None, optional If provided, only the following groups/categories will be used for computing the PCA. allowed_chain_status : list[str] | None, optional If provided, only the ones in this list are kept from the `chain_status` column. Defaults to ["Single pair", "Extra pair", "Extra pair-exception", "Orphan VDJ-exception"]. verbose : bool, optional Whether to display progress **kwargs Additional keyword arguments passed to `scanpy.pp.pca`. Returns ------- AnnData AnnData object with obs as groups and V/J genes as features. """ start = logg.info("Computing PCA for V/J gene usage") if allowed_chain_status is not None: adata_ = adata[ adata.obs["chain_status"].isin(allowed_chain_status) ].copy() if groups is not None: adata_ = adata_[adata_.obs[group].isin(groups)].copy() if "v_call_genotyped_VDJ" in adata_.obs: v_call = "v_call_genotyped_" else: v_call = "v_call_" # prep data adata_.obs[v_call + mode + "_VJ_main"] = [ x.split("|")[0] for x in adata_.obs[v_call + mode + "_VJ"] ] adata_.obs["j_call_" + mode + "_VJ_main"] = [ x.split("|")[0] for x in adata_.obs["j_call_" + mode + "_VJ"] ] adata_.obs[v_call + mode + "_VDJ_main"] = [ x.split("|")[0] for x in adata_.obs[v_call + mode + "_VDJ"] ] adata_.obs["j_call_" + mode + "_VDJ_main"] = [ x.split("|")[0] for x in adata_.obs["j_call_" + mode + "_VDJ"] ] df1 = pd.DataFrame( { groupby: Counter(adata_.obs[groupby]).keys(), "cellcount": Counter(adata_.obs[groupby]).values(), } ) new_list = df1.loc[df1["cellcount"] >= min_size, groupby] vj_v_list = [ x for x in list(set(adata_.obs[v_call + mode + "_VJ_main"])) if x not in ["None", "No_contig"] ] vj_j_list = [ x for x in list(set(adata_.obs["j_call_" + mode + "_VJ_main"])) if x not in ["None", "No_contig"] ] vdj_v_list = [ x for x in list(set(adata_.obs[v_call + mode + "_VDJ_main"])) if x not in ["None", "No_contig"] ] vdj_j_list = [ x for x in list(set(adata_.obs["j_call_" + mode + "_VDJ_main"])) if x not in ["None", "No_contig"] ] new_list = df1.loc[ df1["cellcount"] > min_size, groupby ] # smp_celltype of at least 20 cells vdj_list = vj_v_list + vj_j_list + vdj_v_list + vdj_j_list vdj_df = pd.DataFrame(columns=vdj_list, index=new_list) # this is a bit slow for i in tqdm( range(vdj_df.shape[0]), desc="Tabulating V/J gene usage", bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", disable=not verbose, ): cell = vdj_df.index[i] counter1 = Counter( adata_.obs.loc[adata_.obs[groupby] == cell, v_call + mode + "_VJ"] ) for vj_v in vj_v_list: vdj_df.loc[cell, vj_v] = counter1[vj_v] counter2 = Counter( adata_.obs.loc[ adata_.obs[groupby] == cell, "j_call_" + mode + "_VJ" ] ) for vj_j in vj_j_list: vdj_df.loc[cell, vj_j] = counter2[vj_j] counter3 = Counter( adata_.obs.loc[adata_.obs[groupby] == cell, v_call + mode + "_VDJ"] ) for vdj_v in vdj_v_list: vdj_df.loc[cell, vdj_v] = counter3[vdj_v] counter5 = Counter( adata_.obs.loc[ adata_.obs[groupby] == cell, "j_call_" + mode + "_VDJ" ] ) for vdj_j in vdj_j_list: vdj_df.loc[cell, vdj_j] = counter5[vdj_j] # normalise vdj_df.loc[cell, vdj_df.columns.isin(vj_v_list)] = ( vdj_df.loc[cell, vdj_df.columns.isin(vj_v_list)] / np.sum(vdj_df.loc[cell, vdj_df.columns.isin(vj_v_list)]) * 100 ) vdj_df.loc[cell, vdj_df.columns.isin(vj_j_list)] = ( vdj_df.loc[cell, vdj_df.columns.isin(vj_j_list)] / np.sum(vdj_df.loc[cell, vdj_df.columns.isin(vj_j_list)]) * 100 ) vdj_df.loc[cell, vdj_df.columns.isin(vdj_v_list)] = ( vdj_df.loc[cell, vdj_df.columns.isin(vdj_v_list)] / np.sum(vdj_df.loc[cell, vdj_df.columns.isin(vdj_v_list)]) * 100 ) vdj_df.loc[cell, vdj_df.columns.isin(vdj_j_list)] = ( vdj_df.loc[cell, vdj_df.columns.isin(vdj_j_list)] / np.sum(vdj_df.loc[cell, vdj_df.columns.isin(vdj_j_list)]) * 100 ) df2 = pd.DataFrame(index=vdj_df.index, columns=["cell_type"]) df2["cell_type"] = list(vdj_df.index) vdj_df_adata = sc.AnnData( X=vdj_df.values, obs=df2, var=pd.DataFrame(index=vdj_df.columns) ) vdj_df_adata.obs["cell_count"] = pd.Series( dict(zip(df1[groupby], df1["cellcount"])) ) sc.pp.pca( vdj_df_adata, n_comps=n_comps, use_highly_variable=False, **kwargs ) if transfer_mapping is not None: collapsed_obs = adata_.obs.drop_duplicates(subset=groupby) for to in transfer_mapping: transfer_dict = dict(zip(collapsed_obs[groupby], collapsed_obs[to])) vdj_df_adata.obs[to] = [ transfer_dict[x] for x in vdj_df_adata.obs_names ] logg.info( " finished", time=start, deep=("Returned AnnData: \n" " 'obsm', X_pca for V/J gene usage"), ) return vdj_df_adata
def group_sequences( input_vdj: pd.DataFrame, junction_key: str, recalculate_length: bool = True, by_alleles: bool = False, locus: Literal["ig", "tr-ab", "tr-gd"] = "ig", ): """ Groups sequence IDs with the same V and J genes, and then splits the groups based on the lengths of the sequences. Parameters ---------- input_vdj : pd.DataFrame The input data frame. junction_key : str The name of the column in the input data that contains the junction sequences. recalculate_length : bool, optional Whether to recalculate the lengths of the junction sequences. by_alleles : bool, optional Whether to group sequence IDs by their V and J gene alleles, rather than by only the V and J genes. locus : Literal["ig", "tr-ab", "tr-gd"], optional The locus of the input data. One of "ig", "tr-ab", or "tr-gd". Returns ------- vj_len_grp : Tree A nested dictionary that groups sequence IDs by V and J gene, and then by sequence length. seq_grp : Tree A nested dictionary that groups sequence IDs by V and J gene, and then by sequence. Raises ------ ValueError Raised when the sequence length column is not found in the input table. """ locus_log1_dict = {"ig": "IGH", "tr-ab": "TRB", "tr-gd": "TRD"} # retrieve the J genes and J genes if not by_alleles: if VCALLG in input_vdj.columns: V = [re.sub(STRIPALLELENUM, "", str(v)) for v in input_vdj[VCALLG]] else: V = [re.sub(STRIPALLELENUM, "", str(v)) for v in input_vdj[VCALL]] J = [re.sub(STRIPALLELENUM, "", str(j)) for j in input_vdj[JCALL]] else: if VCALLG in input_vdj.columns: V = [str(v) for v in input_vdj[VCALLG]] else: V = [str(v) for v in input_vdj[VCALL]] J = [str(j) for j in input_vdj[JCALL]] # collapse the alleles to just genes V = [",".join(list(set(v.split(",")))) for v in V] J = [",".join(list(set(j.split(",")))) for j in J] seq = dict(zip(input_vdj.index, input_vdj[junction_key])) if recalculate_length: seq_length = [len(str(l)) for l in input_vdj[junction_key]] else: try: seq_length = [l for l in input_vdj[junction_key + "_length"]] except: raise ValueError( "{} not found in {} input table.".format( junction_key + "_length", locus_log1_dict[locus] ) ) seq_length_dict = dict(zip(input_vdj.index, seq_length)) # Create a dictionary and group sequence ids with same V and J genes V_J = dict(zip(input_vdj.index, zip(V, J))) vj_grp = defaultdict(list) for key, val in sorted(V_J.items()): vj_grp[val].append(key) # and now we split the groups based on lengths of the seqs vj_len_grp = Tree() seq_grp = Tree() for g in vj_grp: # first, obtain what's the unique lengths jlen = [] for contig_id in vj_grp[g]: jlen.append(seq_length_dict[contig_id]) setjlen = list(set(jlen)) # then for each unique length, we add the contigs to a new tree if it matches the length # and also the actual seq sequences into a another one for s in setjlen: for contig_id in vj_grp[g]: jlen_ = seq_length_dict[contig_id] if jlen_ == s: vj_len_grp[g][s][contig_id].value = 1 seq_grp[g][s][seq[contig_id]].value = 1 for c in [contig_id]: vj_len_grp[g][s][c] = seq[c] return vj_len_grp, seq_grp def group_pairwise_hamming_distance( clonotype_vj_len_group: Tree, clonotype_sequence_group: Tree, identity: float, locus: str, chain: Literal["VDJ", "VJ"], verbose: bool = True, ) -> Tree: """ Group clonotypes by pairwise hamming distance and generate a nested dictionary of clonotype groups. Parameters ---------- clonotype_vj_len_group : Tree A nested dictionary that groups sequence IDs by V and J gene, and then by sequence length. clonotype_sequence_group : Tree A nested dictionary that groups sequence IDs by V and J gene, and then by sequence. identity : float The identity threshold for grouping sequences. locus : str The locus of the chain. chain : Literal["VDJ", "VJ"] The chain type. verbose : bool, optional Whether to show the progress bar, by default True. Returns ------- Tree A nested dictionary containing contigs assigned to clonotype groups. """ clones = Tree() # for each seq group, calculate the hamming distance matrix for g in tqdm( clonotype_sequence_group, desc=f"Finding clones based on {locus} cell {chain} chains ".format(), bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", disable=not verbose, ): for l in clonotype_sequence_group[g]: seq_ = list(clonotype_sequence_group[g][l]) tdarray = np.array(seq_).reshape(-1, 1) d_mat = squareform(pdist(tdarray, lambda x, y: hamming(x[0], y[0]))) # then calculate what the acceptable threshold is for each length of sequence tr = math.floor(int(l) * (1 - identity)) # convert diagonal and upper triangle to zeroes d_mat = np.tril(d_mat) np.fill_diagonal(d_mat, 0) # get the coordinates/indices of seqs to match against the threshold later indices_temp = [] indices = [] indices_temp = [list(x) for x in np.tril_indices_from(d_mat)] indices = list(zip(indices_temp[0], indices_temp[1])) # if there's more than 1 contig, remove the diagonal if len(indices) > 1: for pairs in indices: # remove diagonals if pairs[0] == pairs[1]: indices.remove(pairs) indices_j = [] # use the coordinates/indices to retrieve the seq sequences for p in range(0, len(indices)): a1, b1 = indices[p] indices_j.append(seq_[a1]) indices_j.append(seq_[b1]) # convert the distance matrix to coordinate (source) and distance (target) and create it # as a dictionary source, target = d_mat.nonzero() source_target = list(zip(source.tolist(), target.tolist())) if len(source) == 0 & len(target) == 0: source_target = list([(0, 0)]) dist = {} for st in source_target: dist.update({st: d_mat[st]}) if d_mat.shape[0] > 1: seq_tmp_dict = clustering(dist, tr, seq_) else: seq_tmp_dict = {seq_[0]: tuple([seq_[0]])} # sort the list so that clones that are larger have a smaller number clones_tmp = sorted( list(set(seq_tmp_dict.values())), key=len, reverse=True, ) for x in range(0, len(clones_tmp)): clones[g][l][x + 1] = clones_tmp[x] # now to retrieve the contig ids that are grouped together cid = Tree() for g in clones: for l in clones[g]: # retrieve the clone 'numbers' for c in clones[g][l]: grp_seq = clones[g][l][c] for key, value in clonotype_vj_len_group[g][l].items(): if value in grp_seq: cid[g][l][c][key].value = 1 return cid def rename_clonotype_ids( clonotype_groups: Tree, cell_type: str = "", suffix: str = "", prefix: str = "", ) -> dict[str, str]: """ Renames clonotype IDs to numerical barcode system. Parameters ---------- clonotype_groups : Tree A nested dictionary that containing clonotype groups of contigs. cell_type : str, optional Cell type name to append to front, if necessary. suffix : str, optional Suffix to append to the end of the clonotype ID. prefix : str, optional Prefix to append to the beginning of the clonotype ID. Returns ------- dict[str, str] A dictionary that maps sequence IDs to clonotype IDs. """ clone_dict = {} first_key = [] for k1 in clonotype_groups.keys(): first_key.append(k1) first_key = list(set(first_key)) first_key_dict = dict(zip(first_key, range(1, len(first_key) + 1))) for g in clonotype_groups: second_key = [] for k2 in clonotype_groups[g].keys(): second_key.append(k2) second_key = list(set(second_key)) second_key_dict = dict(zip(second_key, range(1, len(second_key) + 1))) for l in clonotype_groups[g]: third_key = [] for k3 in clonotype_groups[g][l].keys(): third_key.append(k3) third_key = list(set(third_key)) third_key_dict = dict(zip(third_key, range(1, len(third_key) + 1))) for key, value in dict(clonotype_groups[g][l]).items(): for v in value: if type(v) is int: break clone_dict[v] = ( cell_type + prefix + str(first_key_dict[g]) + "_" + str(second_key_dict[l]) + "_" + str(third_key_dict[key]) + suffix ) return clone_dict def refine_clone_assignment( dat: pd.DataFrame, clone_key: str, clone_dict_vj: dict, verbose: bool = True, ) -> None: """ Refines the clone assignment of VDJ sequences based on their VJ chain pairing. Parameters ---------- dat : pd.DataFrame The input data frame containing the full VDJ/VJ data. clone_key : str The name of the column in the input data that contains the clone IDs. clone_dict_vj : dict A dictionary for the VJ chains for us to map sequence IDs to clone IDs. verbose : bool, optional Whether to print progress messages. """ cellclonetree = Tree() seqcellclonetree = Tree() for c, s, z in zip(dat["cell_id"], dat["sequence_id"], dat[clone_key]): seqcellclonetree[c][s].value = 1 if pd.notnull(z): cellclonetree[c][z].value = 1 for c in cellclonetree: cellclonetree[c] = list(cellclonetree[c]) fintree = Tree() for c in tqdm( cellclonetree, desc="Refining clone assignment based on VJ chain pairing ", bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", disable=not verbose, ): suffix = [ clone_dict_vj[x] for x in seqcellclonetree[c] if x in clone_dict_vj ] fintree[c] = [] if len(suffix) > 0: for cl in cellclonetree[c]: if present(cl): for s in suffix: if check_same_celltype(cl, s): fintree[c].append( cl + "_" + "".join(s.split("_", 1)[1]) ) else: fintree[c].append(cl + "|" + s) else: for cl in cellclonetree[c]: if present(cl): fintree[c].append(cl) fintree[c] = "|".join(fintree[c]) dat[clone_key] = [fintree[x] for x in dat["cell_id"]] for i, row in dat.iterrows(): # is this going to be slow...? if not present(row[clone_key]): if i in clone_dict_vj and i in dat.index: dat.at[i, clone_key] = clone_dict_vj[i] def check_chains(dat_vdj: pd.DataFrame, dat_vj: pd.DataFrame) -> pd.DataFrame: """ Generate a summary table of whether the chain is orphan or not. Parameters ---------- dat_vdj : pd.DataFrame Input dataframe containing VDJ chains of a particular locus. dat_vj : pd.DataFrame Input dataframe containing VJ chains of a particular locus. Returns ------- pd.DataFrame Output dataframe containing chain status. """ vj_check = pd.crosstab(dat_vj.cell_id, dat_vj.locus).apply(sum, axis=1) vj_check[vj_check > 1] = 1 vdj_check = pd.crosstab(dat_vdj.cell_id, dat_vdj.locus).apply(sum, axis=1) vdj_check[vdj_check > 1] = 1 chain_check = pd.concat([vdj_check, vj_check], axis=1) chain_check.columns = ["VDJ", "VJ"] chain_check["Orphan VDJ"] = pd.notnull(chain_check["VDJ"]) & ( pd.isnull(chain_check["VJ"]) ) chain_check["Orphan VJ"] = pd.notnull(chain_check["VJ"]) & ( pd.isnull(chain_check["VDJ"]) ) chain_check["All VDJ"] = pd.notnull(chain_check["VDJ"]) chain_check["All VJ"] = pd.notnull(chain_check["VJ"]) chain_check["VDJ"] = ( (~chain_check["Orphan VDJ"]) & (~chain_check["Orphan VJ"]) & (pd.notnull(chain_check["VDJ"])) ) chain_check["VJ"] = ( (~chain_check["Orphan VDJ"]) & (~chain_check["Orphan VJ"]) & (pd.notnull(chain_check["VJ"])) ) return chain_check