Source code for dandelion.base.tools._network

from __future__ import annotations

import multiprocessing
import re
import time

import networkx as nx
import numpy as np
import pandas as pd

from collections import defaultdict
from joblib import Parallel, delayed
from pathlib import Path
from polyleven import levenshtein
from scanpy import logging as logg

from scipy.sparse.csgraph import csgraph_from_dense
from scipy.sparse import csr_matrix
from tqdm import tqdm
from typing import Callable, Literal, TYPE_CHECKING

if TYPE_CHECKING:
    from anndata import AnnData


from dandelion.base.tools._tools import vdj_sample
from dandelion.base.core._core import Dandelion, Query
from dandelion.utilities._layout import generate_layout
from dandelion.utilities._distances import (
    Metric,
    prepare_sequences_with_separator,
    resolve_metric,
)
from dandelion.utilities._utilities import (
    flatten,
    present,
    sanitize_data,
    Tree,
    FALSES,
)


[docs] def generate_network( vdj: Dandelion, adata: AnnData | None = None, key: str | None = None, clone_key: str | None = None, min_size: int = 2, productive_only: bool = True, sample: int | None = None, force_replace: bool = False, verbose: bool = True, compute_graph: bool = True, compute_layout: bool = True, layout_method: Literal[ "mod_fr", "mod_fr2", "mod_fr2_gpu", "mod_fr_bh", "mod_fr_bh_gpu", "fa2", ] = "mod_fr", singleton_mass: float = 0.5, expanded_only: bool = False, use_existing_graph: bool = True, n_cpus: int = 1, sequential_chain: bool = False, distance_mode: Literal["clone", "full"] = "clone", dist_func: Callable | str | None = None, pad_to_max: bool = False, lazy: bool = False, zarr_path: Path | str | None = None, chunk_size: int | None = None, chunk_clone_limit: int | None = None, memory_limit_gb: float | None = None, memory_safety_fraction: float = 0.3, compress: bool = True, random_state: int | np.random.RandomState | None = None, **kwargs, ) -> Dandelion | tuple[Dandelion, AnnData]: """ Generate a Levenshtein distance network based on VDJ and VJ sequences. The distance matrices are then combined into a singular matrix. Parameters ---------- vdj : Dandelion Dandelion object. key : str | None, optional column name for distance calculations. None defaults to 'sequence_alignment_aa'. clone_key : str | None, optional column name to build network on. min_size : int, optional For visualization purposes, two graphs are created where one contains all cells and a trimmed second graph. This value specifies the minimum number of edges required otherwise node will be trimmed in the secondary graph. productive_only : bool, optional whether or not to only consider productive sequences for distance calculation. sample : int | None, optional If specified, cells will be randomly sampled to the integer provided. If the integer is larger than the number of cells, sampling with replacement is used and the same cell may appear multiple times with different sequence and cell ids. If None, no resampling is performed. A new Dandelion class will be returned. force_replace : bool, optional whether or not to sample with replacement when `sample` is smaller or equal to than the number of cells. verbose : bool, optional whether or not to print the progress bars. compute_graph : bool, optional whether or not to generate the graph after distance matrix calculation. compute_layout : bool, optional whether or not to generate the layout. May be time consuming if too many cells. layout_method : Literal["mod_fr", "mod_fr2", "mod_fr2_gpu", "mod_fr_bh", "mod_fr_bh_gpu", "fa2"], optional Layout algorithm. Options: - 'mod_fr': Original python FR layout - 'mod_fr2': Numba-accelerated FR (faster CPU) - 'mod_fr2_gpu': PyTorch GPU FR (auto-tiles for >100K nodes) - 'mod_fr_bh': Barnes-Hut O(N log N) CPU layout (scalable) - 'mod_fr_bh_gpu': Barnes-Hut O(N log N) GPU layout (scalable) - 'fa2': ForceAtlas2 (requires fa2-modified) singleton_mass : float, optional Mass assigned to singleton nodes (no edges) in Barnes-Hut layouts. Lower values reduce their impact on pushing connected components apart. Default 0.5. Only used with 'mod_fr_bh' and 'mod_fr_bh_gpu'. expanded_only : bool, optional whether or not to only compute layout on expanded clonotypes. use_existing_graph : bool, optional whether or not to just compute the layout using the existing graph if it exists in the object. n_cpus : int, optional number of cores to use for parallelizable steps. -1 uses all available cores. sequential_chain : bool, optional whether or not to use the original method for distance calculation method where each chain is calculated separately and sequentially added to the total distance matrix. This method is slower but would be more precise calculation. If False, concatenated sequences with a long separator are used for distance calculation. Ignored if lazy=True as the lazy method always uses the long separator approach. The long separator approach inserts a long string of consistent characters on a per-chain basis to ensure that distances between chains are large and do not interfere with intra-chain distances. distance_mode : Literal["clone", "full"], optional method to compute distance matrix. 'clone' refers to the original membership-based distance calculation where only distances within clones are calculated. Whereas 'full' computes the full pairwise distance matrix. dist_func : Callable | str | None, optional distance function to use. If None, `polyleven.levenshtein` is used. If a string is provided, it will use Bio.Align's substitution matrices (e.g., 'BLOSUM62', 'PAM250'). See `Bio.Align.substitution_matrices.load` for available options. pad_to_max : bool, optional whether or not to pad sequences to the maximum length in the dataset before distance calculation. This will allow for distance calculations that need sequences of the same length (e.g., Hamming distance). Note that this may increase memory usage and computation time. lazy: bool, optional If True, computation will be performed lazily using Dask/Zarr arrays. True will also return a Dask array view of the distance matrix stored on disk instead of a numpy array stored in memory. zarr_path: Path | str | None, optional Path to store Zarr array when using lazy mode. If None, "distance_matrix.zarr" will be created in the current working directory. chunk_size: int | None, optional Chunk size for distance matrix computation when using lazy mode. If None, chunk size is automatically computed based on available memory and number of cores. The automatic chunk size can be further adjusted using `memory_limit_gb` and `memory_safety_fraction` parameters. chunk_clone_limit: int | None, optional Maximum number of clones to process per chunk when using lazy mode and distance method = "clone". If None, chunk sizes will be automatically determined based on available memory and number of cores. memory_limit_gb: float | None, optional Memory limit per worker in GB for Dask. None defaults to all available memory/cores. memory_safety_fraction: float, optional Fraction of available memory to use. Defaults to 0.3 (i.e., 30% of available memory will be used for chunk size calculation). compress: bool, optional Whether to compress the Zarr array using Blosc with zstd. random_state : int | np.random.RandomState | None, optional Random state for reproducible sampling. **kwargs additional kwargs passed layout generation in `generate_layout`. Returns ------- Dandelion | tuple[Dandelion, AnnData] Dandelion object with `.edges`, `.layout`, `.graph` initialized. Raises ------ ValueError if any errors with dandelion input. """ # normalize n_cpus convention (-1 => use all CPUs) if n_cpus == -1: n_cpus = multiprocessing.cpu_count() n_cpus = max(1, int(n_cpus)) clone_key = clone_key if clone_key is not None else "clone_id" dist_func = levenshtein if dist_func is None else dist_func metric = resolve_metric(dist_func) if not compute_graph: compute_layout = False if distance_mode == "clone" or compute_graph or compute_layout: if clone_key not in vdj._data: raise ValueError( "Data does not contain clone information. Please run ddl.tl.find_clones." ) regenerate = True if vdj.graph is not None: if (min_size != 2) or (sample is not None): pass elif use_existing_graph: start = logg.info( "Generating network layout from pre-computed network" ) if isinstance(vdj, Dandelion): regenerate = False g, g_, lyt, lyt_ = generate_layout( vertices=None, edges=None, min_size=min_size, weight=None, verbose=verbose, compute_layout=compute_layout, layout_method=layout_method, expanded_only=expanded_only, graphs=(vdj.graph[0], vdj.graph[1]), singleton_mass=singleton_mass, **kwargs, ) if regenerate: start = logg.info("Generating network") key_ = key if key is not None else "sequence_alignment_aa" if key_ not in vdj._data: raise ValueError(f"key {key_} not found in data.") if lazy: from dandelion.base.tools._lazydistances import ( dask_safe_slice_square, ) if sample is not None: if adata is not None: vdj, adata = vdj_sample( vdj_data=vdj, size=sample, adata=adata, force_replace=force_replace, random_state=random_state, ) else: vdj = vdj_sample( vdj_data=vdj, size=sample, force_replace=force_replace, random_state=random_state, ) dat = vdj._data.copy() else: if "ambiguous" in vdj._data: dat = vdj._data[vdj._data["ambiguous"].isin(FALSES)].copy() else: dat = vdj._data.copy() dat_h = dat[dat["locus"].isin(["IGH", "TRB", "TRD"])].copy() dat_l = dat[dat["locus"].isin(["IGK", "IGL", "TRA", "TRG"])].copy() dat_ = pd.concat([dat_h, dat_l], ignore_index=True) dat_ = sanitize_data(dat_, ignore=clone_key) # retrieve sequence columns and clone info (unchanged) querier = Query(dat_, productive_only=productive_only, verbose=verbose) dat_seq = querier.retrieve(query=key_, retrieve_mode="split") # ensure that dat_seq matches order of vdj.metadata dat_seq = dat_seq.reindex(vdj._metadata.index) dat_seq.columns = [re.sub(key_ + "_", "", i) for i in dat_seq.columns] if compute_graph or compute_layout or distance_mode == "clone": dat_clone = querier.retrieve( query=clone_key, retrieve_mode="merge and unique only" ) dat_clone = dat_clone[clone_key].str.split("|", expand=True) membership = Tree() for i, j in dat_clone.iterrows(): jjj = [jj for jj in j if present(jj)] for ij in jjj: membership[ij][i].value = 1 membership = {i: list(j) for i, j in dict(membership).items()} # compute total_dist using chosen mode (original uses membership) logg.info( f"Calculating distance matrix {'lazily' if lazy else ''} with distance_mode = '{distance_mode}'\n" ) if distance_mode == "clone": if lazy: from dandelion.base.tools._lazydistances import ( calculate_distance_matrix_zarr, ) total_dist = calculate_distance_matrix_zarr( dat_seq, metric=metric, pad_to_max=pad_to_max, membership=membership, zarr_path=zarr_path, chunk_size=chunk_size, max_clones_per_chunk=chunk_clone_limit, n_cpus=n_cpus, memory_limit_gb=memory_limit_gb, memory_safety_fraction=memory_safety_fraction, compress=compress, lazy=lazy, verbose=verbose, ) else: if sequential_chain: total_dist = calculate_distance_matrix_original( dat_seq, membership, metric=metric, pad_to_max=pad_to_max, verbose=verbose, ) else: total_dist = calculate_distance_matrix_long( dat_seq, membership=membership, metric=metric, pad_to_max=pad_to_max, n_cpus=n_cpus, verbose=verbose, ) elif distance_mode == "full": if lazy: from dandelion.base.tools._lazydistances import ( calculate_distance_matrix_zarr, ) total_dist = calculate_distance_matrix_zarr( dat_seq, metric=metric, pad_to_max=pad_to_max, membership=None, zarr_path=zarr_path, chunk_size=chunk_size, n_cpus=n_cpus, memory_limit_gb=memory_limit_gb, memory_safety_fraction=memory_safety_fraction, compress=compress, lazy=lazy, verbose=verbose, ) else: if sequential_chain: total_dist = calculate_distance_matrix_original_full( dat_seq, metric=metric, pad_to_max=pad_to_max, n_cpus=n_cpus, verbose=verbose, ) else: total_dist = calculate_distance_matrix_long( dat_seq, membership=None, metric=metric, pad_to_max=pad_to_max, n_cpus=n_cpus, verbose=verbose, ) if compute_graph: tmp_clusterdist = Tree() cluster_dist = {} overlap = [] for i in vdj._metadata.index: cx_ = vdj._metadata.loc[i, str(clone_key)] if cx_ is None or cx_ == "None": continue if len(cx_.split("|")) > 1: overlap.append( [ c for c in cx_.split("|") if c is not None and c != "None" ] ) for c in cx_.split("|"): if c is not None and c != "None": tmp_clusterdist[c][i].value = 1 else: tmp_clusterdist[cx_][i].value = 1 tmp_clusterdist2 = {} for x in tmp_clusterdist: tmp_clusterdist2[x] = list(tmp_clusterdist[x]) cluster_dist = {} for c_ in tqdm( tmp_clusterdist2, desc="Sorting into clusters ", disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ): idxs = [ i for i in tmp_clusterdist2[c_] if i in vdj._metadata.index ] if c_ in list(flatten(overlap)): for ol in overlap: if c_ in ol: idx = list( set( flatten( [tmp_clusterdist2[c_x] for c_x in ol] ) ) ) if len(list(set(idx))) > 1: pos = [dat_seq.index.get_loc(i) for i in idxs] # dist_mat_ = tmp_totaldist.loc[idx, idx] if lazy: dist_mat_ = pd.DataFrame( dask_safe_slice_square( total_dist, pos ).compute(), index=idxs, columns=idxs, ) else: dist_mat_ = pd.DataFrame( total_dist[np.ix_(pos, pos)], index=idxs, columns=idxs, ) s1, s2 = dist_mat_.shape if s1 > 1 and s2 > 1: cluster_dist["|".join(ol)] = dist_mat_ else: pos = [ dat_seq.index.get_loc(i) for i in tmp_clusterdist2[c_] ] if lazy: dist_mat_ = pd.DataFrame( dask_safe_slice_square(total_dist, pos).compute(), index=tmp_clusterdist2[c_], columns=tmp_clusterdist2[c_], ) else: dist_mat_ = pd.DataFrame( total_dist[np.ix_(pos, pos)], index=idxs, columns=idxs, ) s1, s2 = dist_mat_.shape if s1 > 1 and s2 > 1: cluster_dist[c_] = dist_mat_ # Minimum spanning trees (unchanged) # mst_tree = mst(cluster_dist, n_cpus=n_cpus, verbose=verbose) mst_tree = mst(cluster_dist, n_cpus=1, verbose=verbose) # generate edge list edge_list = Tree() for c in tqdm( mst_tree, desc="Generating edge list ", disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ): edge_list[c] = nx.to_pandas_edgelist(mst_tree[c]) if edge_list[c].shape[0] > 0: edge_list[c]["weight"] = edge_list[c]["weight"] - 1 edge_list[c].loc[edge_list[c]["weight"] < 0, "weight"] = 0 clone_ref = dict(vdj._metadata[clone_key]) clone_ref = { k: r for k, r in clone_ref.items() if r is not None and r != "None" } tmp_clone_tree = Tree() for x in vdj._metadata.index: if x in clone_ref: if "|" in clone_ref[x]: for x_ in clone_ref[x].split("|"): if x_ is not None and x_ != "None": tmp_clone_tree[x_][x].value = 1 else: tmp_clone_tree[clone_ref[x]][x].value = 1 tmp_clone_tree2 = Tree() for x in tmp_clone_tree: tmp_clone_tree2[x] = list(tmp_clone_tree[x]) tmp_clone_tree3 = Tree() tmp_clone_tree3_overlap = Tree() for x in tqdm( tmp_clone_tree2, desc="Computing overlap ", disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ): # this is to catch all possible cells that may potentially match up with this clone that's joined together if x in list(flatten(overlap)): for ol in overlap: if x in ol: if len(tmp_clone_tree2[x]) > 1: for x_ in tmp_clone_tree2[x]: tmp_clone_tree3_overlap["|".join(ol)][ "".join(x_) ].value = 1 else: tmp_clone_tree3_overlap["|".join(ol)][ "".join(tmp_clone_tree2[x]) ].value = 1 else: tmp_ = pd.DataFrame( index=tmp_clone_tree2[x], columns=tmp_clone_tree2[x] ) tmp_ = pd.DataFrame( np.tril(tmp_) + 1, index=tmp_clone_tree2[x], columns=tmp_clone_tree2[x], ) tmp_ = tmp_.astype(float).fillna(0) tmp_clone_tree3[x] = tmp_ for x in tqdm( tmp_clone_tree3_overlap, desc="Adjust overlap ", disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ): # repeat for the overlap clones tmp_ = pd.DataFrame( index=tmp_clone_tree3_overlap[x], columns=tmp_clone_tree3_overlap[x], ) tmp_ = pd.DataFrame( np.tril(tmp_) + 1, index=tmp_clone_tree3_overlap[x], columns=tmp_clone_tree3_overlap[x], ) tmp_ = tmp_.astype(float).fillna(0) tmp_clone_tree3[x] = tmp_ # free up memory del tmp_clone_tree2 # this chunk doesn't scale well? # here I'm using a temporary edge list to catch all cells that were identified as clones to forcefully # link them up if they were identical but clipped off during the mst step # create a data frame to recall the actual distance quickly # convert total_dist to DataFrame to become adjacency matrix # tmp_totaldist = pd.DataFrame( # total_dist.compute() if lazy else total_dist, # index=vdj.metadata.index, # columns=vdj.metadata.index, # ) # tmp_totaldiststack = adjacency_to_edge_list( # tmp_totaldist, rename_index=True # ) # tmp_totaldiststack["keep"] = [ # False if len(set(i.split("|"))) == 1 else True # for i in tmp_totaldiststack.index # ] # tmp_totaldiststack = tmp_totaldiststack[ # tmp_totaldiststack.keep # ].drop("keep", axis=1) # convert total_dist to sparse graph tmp_g = csgraph_from_dense( total_dist.compute() + 1 if lazy else total_dist + 1 # +1 to keep zeros as infinite distance ) # construct edge list as a dictionary Gcoo = tmp_g.tocoo() # remove self-loops mask = Gcoo.row != Gcoo.col rows = Gcoo.row[mask] cols = Gcoo.col[mask] weights = ( Gcoo.data[mask] - 1 ) # -1 to revert back to original distance tmp_totaldiststack = { vdj._metadata.index[r] + "|" + vdj._metadata.index[c]: w for r, c, w in zip(rows, cols, weights) } tmp_totaldiststack = pd.DataFrame( pd.Series(tmp_totaldiststack, name="weight") ) # convert tmp_totaldist to edge list and rename the index tmp_edge_list = Tree() for c in tqdm( tmp_clone_tree3, desc="Linking edges ", disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ): if len(tmp_clone_tree3[c]) > 1: G = create_networkx_graph( tmp_clone_tree3[c], drop_zero=True, ) G.remove_edges_from(nx.selfloop_edges(G)) tmp_edge_list[c] = nx.to_pandas_edgelist(G) set_edge_list_index(tmp_edge_list[c]) tmp_edge_list[c].update( {"weight": tmp_totaldiststack["weight"]} ) # keep only edges when there is 100% identity, to minimise crowding tmp_edge_list[c] = tmp_edge_list[c][ tmp_edge_list[c]["weight"] == 0 ] tmp_edge_list[c].reset_index(inplace=True) # try to catch situations where there's no edge (only singletons) try: edge_listx = pd.concat([edge_list[x] for x in edge_list]) set_edge_list_index(edge_listx) tmp_edge_listx = pd.concat( [tmp_edge_list[x] for x in tmp_edge_list] ) tmp_edge_listx = tmp_edge_listx.drop("index", axis=1) set_edge_list_index(tmp_edge_listx) edge_list_final = edge_listx.combine_first(tmp_edge_listx) common_idx = edge_list_final.index.intersection( tmp_totaldiststack.index ) edge_list_final.loc[common_idx, "weight"] = ( tmp_totaldiststack.loc[common_idx, "weight"] ) # for i, row in tmp_totaldiststack.iterrows(): # if i in edge_list_final.index: # edge_list_final.at[i, "weight"] = row["weight"] # return the edge list edge_list_final = edge_list_final.reset_index(drop=True) except Exception: edge_list_final = None # final layout + graph creation (unchanged) g, g_, lyt, lyt_ = generate_layout( vertices=vdj._metadata.index.tolist(), edges=edge_list_final, min_size=min_size, weight=None, verbose=verbose, compute_layout=compute_layout, layout_method=layout_method, expanded_only=expanded_only, singleton_mass=singleton_mass, **kwargs, ) logg.info( " finished.\n Updated Dandelion object\n", time=start, deep=( " 'layout', graph layout\n" if compute_layout else ( "" " 'graph', network constructed from distance matrices of VDJ- and VJ- chains\n" if compute_graph else ( "" " 'distances', VDJ + VJ distance matrix\n" if regenerate else "" ) ) ), ) # return or re-initialize vdj germline = getattr(vdj, "germline", None) if regenerate: distances = total_dist if lazy else csr_matrix(total_dist) if not lazy: distances._index_names = vdj.metadata_names else: distances = vdj.distances graph = (g, g_) if compute_graph else None layout = (lyt, lyt_) if compute_graph and compute_layout else None if sample is not None: out = Dandelion( data=dat_, metadata=vdj._metadata, clone_key=clone_key, layout=layout, graph=graph, distances=distances, germline=germline, verbose=False, ) if adata is None: return out else: return out, adata else: vdj.__init__( data=vdj._data, metadata=vdj._metadata, clone_key=clone_key, layout=layout, graph=graph, distances=distances, germline=germline, initialize=False, verbose=False, )
def mst( mat: dict, n_cpus: int | None = None, verbose: bool = True, ) -> Tree: """ Construct minimum spanning tree based on supplied matrix in dictionary. Parameters ---------- mat : dict Dictionary containing numpy ndarrays. n_cpus: int, optional Number of cores to run this step. Parallelise using `sklearn.metrics.pairwise_distances` if n_cpus > 1.. verbose : bool, optional Whether or not to show logging information. Returns ------- Tree Dandelion `Tree` object holding DataFrames of constructed minimum spanning trees. """ mst_tree = Tree() if n_cpus == 1: for c in tqdm( mat, desc="Calculating minimum spanning tree ", disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ): _, mst_tree[c] = process_mst_per_clonotype(mat=mat, c=c) else: results = Parallel(n_jobs=n_cpus)( delayed(process_mst_per_clonotype)(mat, c) for c in tqdm( mat, desc=f"Calculating minimum spanning tree, parallelized across {n_cpus} cores ", disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ) ) for result in results: mst_tree[result[0]] = result[1] return mst_tree def process_mst_per_clonotype( mat: dict[str, pd.DataFrame], c: str ) -> tuple[str, nx.Graph]: """ Function to calculate minimum spanning tree. Parameters ---------- mat : dict[str, pd.DataFrame] Dictionary holding distance matrices. c : str Name of clonotype. Returns ------- tuple[str, nx.Graph] Graph holding minimum spanning tree. """ tmp = mat[c] + 1 tmp[np.isnan(tmp)] = 0 G = create_networkx_graph(tmp, drop_zero=True) return c, nx.minimum_spanning_tree(G) def create_networkx_graph( adjacency: pd.DataFrame, drop_zero: bool = True ) -> nx.Graph: """ Create a networkx graph from an adjacency matrix in chunks. Parameters ---------- adjacency : pd.DataFrame Adjacency matrix. drop_zero : bool, optional Whether or not to drop edges with zero weight, by default True. Returns ------- nx.Graph NetworkX graph. """ G = nx.Graph() # add nodes nodes = list(adjacency.index) G.add_nodes_from(nodes) # convert adjacency matrix to edge list edge_list = adjacency_to_edge_list(adjacency, drop_zero=drop_zero) G.add_weighted_edges_from(ebunch_to_add=edge_list.values) return G def set_edge_list_index(edge_list: pd.DataFrame) -> None: """ Set the index of the edge list in-place. Parameters ---------- edge_list : pd.DataFrame Edge list. """ # edge_list.index = [ # str(s) + "|" + str(t) # for s, t in zip(edge_list["source"], edge_list["target"]) # ] edge_list.index = ( edge_list["source"].astype(str) + "|" + edge_list["target"].astype(str) ) def adjacency_to_edge_list( adjacency: pd.DataFrame, drop_zero: bool = False, rename_index: bool = False ) -> pd.DataFrame: """ Convert adjacency matrix to edge list that excludes self-loops. Parameters ---------- adjacency : pd.DataFrame Adjacency matrix. drop_zero : bool, optional Whether or not to drop edges with zero weight, by default False. rename_index : bool, optional Whether or not to rename the index, by default False. Returns ------- pd.DataFrame Edge list. """ edge_list = adjacency.stack().reset_index() edge_list.columns = ["source", "target", "weight"] if rename_index: set_edge_list_index(edge_list) if drop_zero: edge_list = edge_list[edge_list["weight"] != 0] return edge_list
[docs] def clone_degree(vdj: Dandelion, weight: str | None = None) -> Dandelion: """ Calculate node degree in BCR/TCR network. Parameters ---------- vdj : Dandelion Dandelionect after `tl.generate_network` has been run. weight : str | None, optional Attribute name for retrieving edge weight in graph. None defaults to ignoring this. See `networkx.Graph.degree`. Raises ------ AttributeError if graph not found. TypeError if input is not Dandelion class. """ if isinstance(vdj, Dandelion): if vdj.graph is None: raise AttributeError( "Graph not found. Please run tl.generate_network." ) else: G = vdj.graph[0] cd = pd.DataFrame.from_dict(G.degree(weight=weight)) cd.set_index(0, inplace=True) vdj._metadata["clone_degree"] = pd.Series(cd[1]) else: raise TypeError("Input object must be of {}".format(Dandelion))
[docs] def clone_centrality(vdj: Dandelion): """ Calculate node closeness centrality in BCR/TCR network. Parameters ---------- vdj : Dandelion Dandelion object after `tl.generate_network` has been run. Raises ------ AttributeError if graph not found. TypeError if input is not Dandelion class. """ if isinstance(vdj, Dandelion): if vdj.graph is None: raise AttributeError( "Graph not found. Please run tl.generate_network." ) else: G = vdj.graph[0] cc = nx.closeness_centrality(G) cc = pd.DataFrame.from_dict( cc, orient="index", columns=["clone_centrality"] ) vdj._metadata["clone_centrality"] = pd.Series( cc["clone_centrality"] ) else: raise TypeError("Input object must be of {}".format(Dandelion))
def calculate_distance_matrix_original( dat_seq: pd.DataFrame, membership: dict, metric: Metric, pad_to_max: bool = False, verbose: bool = True, ) -> np.ndarray: """ Re-implementation of original membership-based distance calculation. Parameters ---------- dat_seq : pd.DataFrame DataFrame with sequence columns; index corresponds to cell IDs (or whatever unique ids you use). membership : dict Mapping from clone_id -> list of indices (these indices must be present in dat_seq.index). metric : Metric Distance metric to use. pad_to_max : bool, optional Whether to pad sequences to maximum length before distance calculation. verbose : bool, optional Whether to show progress. Returns ------- total_dist : np.ndarray Aggregated distance matrix across all columns; diagonal set to NaN by caller. """ n = dat_seq.shape[0] index_list = list(dat_seq.index) # dmat_per_column will collect for each column a list of DataFrames (one per clone) to concat dmat_per_column = defaultdict(list) # iterate clones (membership) exactly like original for clone in tqdm( membership, disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ): tmp = dat_seq.loc[membership[clone]] if tmp.shape[0] > 1: tmp = ( tmp.replace("[.]", "", regex=True) .fillna("") .replace("None", "") ) for col in tmp.columns: seqs_raw = [[s] for s in tmp[col].values] prepared_seqs = prepare_sequences_with_separator( seqs_raw, metric=metric, pad_to_max=pad_to_max, sep="" if not pad_to_max else "#", ) # Vectorized computation d_mat_tmp = metric.compute_vectorized(prepared_seqs) df_block = pd.DataFrame( d_mat_tmp, index=tmp.index, columns=tmp.index ) dmat_per_column[col].append(df_block) # For each column, concat its blocks, resolve duplicates (sum), reindex to full index dist_matrices = [] for col, blocks in dmat_per_column.items(): if not blocks: continue full = pd.concat(blocks) # If duplicates occur at the index-level, group & sum them (same as original) if any(full.index.duplicated()): dup_indices = full.index[full.index.duplicated()] tmp1 = full.drop(dup_indices) tmp2 = full.loc[dup_indices] tmp2 = tmp2.groupby(level=0).apply(lambda df: df.sum(axis=0)) full = pd.concat([tmp1, tmp2]) # reindex to full matrix indices (missing rows/cols -> fill with zeros) full = full.reindex(index=index_list, columns=index_list).fillna(0.0) dist_matrices.append(full.values) if len(dist_matrices) == 0: total_dist = np.zeros((n, n)) else: total_dist = np.sum(dist_matrices, axis=0) np.fill_diagonal(total_dist, np.nan) return total_dist def calculate_distance_matrix_original_full( dat_seq: pd.DataFrame, metric: Metric, pad_to_max: bool = False, n_cpus: int = 1, verbose: bool = True, ) -> np.ndarray: """ Re-implementation of original membership-based distance calculation. Parameters ---------- dat_seq : pd.DataFrame DataFrame with sequence columns; index corresponds to cell IDs (or whatever unique ids you use). metric : Metric Distance metric to use. pad_to_max : bool, optional Whether to pad sequences to maximum length before distance calculation. n_cpus : int, optional Number of cores to run this step. Parallelise using `sklearn.metrics.pairwise_distances` if n_cpus > 1. verbose : bool, optional Whether to show progress. Returns ------- total_dist : np.ndarray Aggregated distance matrix across all columns; diagonal set to NaN by caller. """ start_time = time.time() n = dat_seq.shape[0] total_dist = np.zeros((n, n), dtype=float) for col in dat_seq.columns: seq_series = ( dat_seq[col] .replace("[.]", "", regex=True) .fillna("") .replace("None", "") ) nonnull = seq_series.dropna() if nonnull.shape[0] <= 1: continue # Prepare sequences for single column (reshape to list of single-element lists) seqs_raw = [[s] for s in nonnull.to_numpy(dtype=object)] prepared_seqs = prepare_sequences_with_separator( seqs_raw, metric=metric, pad_to_max=pad_to_max, sep="" if not pad_to_max else "#", ) # Compute distance matrix using vectorized metric results = metric.compute_vectorized(prepared_seqs, n_cpus=n_cpus) total_dist += results np.fill_diagonal(total_dist, np.nan) if verbose: end_time = time.time() logg.info( f"Distances calculated in {end_time - start_time:.2f} seconds" ) return total_dist def calculate_distance_matrix_long( dat_seq: pd.DataFrame, membership: dict | None, metric: Metric, pad_to_max: bool = False, n_cpus: int = 1, verbose: bool = True, ) -> np.ndarray: """ Re-implementation of original membership-based distance calculation but using concatenated sequences using a long separator. Parameters ---------- dat_seq : pd.DataFrame DataFrame with sequence columns; index corresponds to cell IDs (or whatever unique ids you use). membership : dict | None Mapping from clone_id -> list of indices (these indices must be present in dat_seq.index). None indicates full pairwise distance calculation. metric : Metric Distance metric to use. pad_to_max : bool, optional whether or not to pad sequences to the maximum length in the dataset before distance calculation. This will allow for distance calculations that need sequences of the same length (e.g., Hamming distance). Note that this may increase memory usage and computation time. n_cpus : int, optional Number of cores to run this step. Parallelise using `sklearn.metrics.pairwise_distances` if n_cpus > 1.. verbose : bool, optional Whether to show progress. Returns ------- total_dist : np.ndarray (n x n) Aggregated distance matrix across all columns; diagonal set to NaN by caller. """ start_time = time.time() # Step 1: clean sequences dat_seq_clean = ( dat_seq.replace("[.]", "", regex=True).fillna("").replace("None", "") ) # Step 2: prepare sequences (concatenate with separators, apply padding) # This happens ONCE upfront, not per-pair seqs_raw = dat_seq_clean.values.tolist() prepared_seqs = prepare_sequences_with_separator( seqs_raw, metric=metric, pad_to_max=pad_to_max, sep="#", ) # Step 3: initialize distance matrix n = dat_seq_clean.shape[0] index_list = list(dat_seq_clean.index) total_dist = np.zeros((n, n)) if membership is None: # Step 4: compute full distance matrix at once using vectorized metric results = metric.compute_vectorized(prepared_seqs, n_cpus=n_cpus) total_dist += results else: # Step 4: iterate over clone memberships for clone in tqdm( membership, disable=not verbose, bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}", ): tmp = dat_seq_clean.loc[membership[clone]] if tmp.shape[0] > 1: # Get indices for this clone tmp_indices = [index_list.index(i) for i in tmp.index] # Extract prepared sequences for this clone clone_seqs = [prepared_seqs[i] for i in tmp_indices] # Compute distance matrix for this clone using vectorized metric d_mat_tmp = metric.compute_vectorized(clone_seqs, n_cpus=n_cpus) total_dist[np.ix_(tmp_indices, tmp_indices)] += d_mat_tmp np.fill_diagonal(total_dist, np.nan) if verbose: end_time = time.time() logg.info( f"Distances calculated in {end_time - start_time:.2f} seconds" ) return total_dist