#!/usr/bin/env python
import numpy as np
import networkx as nx
import pandas as pd
import warnings
from anndata import AnnData
from time import sleep
from tqdm import tqdm
from scanpy import logging as logg
from scipy.special import gammaln
from typing import Literal
from dandelion.external.skbio._chao1 import chao1
from dandelion.external.skbio._gini import gini_index
from dandelion.external.skbio._shannon import shannon
from dandelion.tools._network import (
clone_centrality,
clone_degree,
generate_network,
)
from dandelion.utilities._core import *
from dandelion.utilities._io import *
from dandelion.utilities._utilities import *
[docs]
def clone_rarefaction(
vdj_data: Dandelion | AnnData,
groupby: str,
clone_key: str | None = None,
diversity_key: str | None = None,
verbose: bool = False,
) -> dict:
"""
Return rarefaction predictions for cell numbers vs clone size.
Parameters
----------
vdj_data : Dandelion | AnnData
`Dandelion` or `AnnData` object.
groupby : str
Column name to split the calculation of clone numbers for a given number of cells for e.g. sample, patient etc.
clone_key : str | None, optional
Column name specifying the clone_id column in metadata/obs.
diversity_key : str | None, optional
key for 'diversity' results in AnnData's `.uns`.
verbose : bool, optional
whether to print progress.
Returns
-------
dict
Rarefaction predictions for cell numbers vs clone size.
"""
start = logg.info("Constructing rarefaction curve")
if isinstance(vdj_data, AnnData):
_metadata = vdj_data.obs.copy()
elif isinstance(vdj_data, Dandelion):
_metadata = vdj_data.metadata.copy()
clonekey = clone_key if clone_key is not None else "clone_id"
groups = list(set(_metadata[groupby]))
if type(_metadata[clonekey]) == "category":
_metadata[clonekey] = _metadata[clonekey].cat.remove_unused_categories()
res = {}
for g in groups:
__metadata = _metadata[_metadata[groupby] == g]
res[g] = __metadata[clonekey].value_counts()
res_ = pd.DataFrame.from_dict(res, orient="index")
# remove those with no counts
logg.info(
"removing due to zero counts: "
", ".join(
[res_.index[i] for i, x in enumerate(res_.sum(axis=1) == 0) if x]
),
)
res_ = res_[~(res_.sum(axis=1) == 0)]
# set up for calculating rarefaction
tot = res_.apply(sum, axis=1)
# S = res_.apply(lambda x: x[x > 0].shape[0], axis=1)
nr = res_.shape[0]
# append the results to a dictionary
rarecurve = {}
sleep(0.5)
for i in tqdm(
range(0, nr),
desc="Calculating rarefaction curve ",
bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}",
disable=not verbose,
):
n = np.arange(1, tot[i], step=10)
if n[-1:] != tot[i]:
n = np.append(n, tot[i])
rarecurve[res_.index[i]] = [
rarefun(
np.array(res_.iloc[i,]),
z,
)
for z in n
]
y = pd.DataFrame([rarecurve[c] for c in rarecurve]).T
pred = pd.DataFrame(
[np.append(np.arange(1, s, 10), s) for s in res_.sum(axis=1)],
index=res_.index,
).T
diversitykey = diversity_key if diversity_key is not None else "diversity"
if isinstance(vdj_data, AnnData):
if diversitykey not in vdj_data.uns:
vdj_data.uns[diversitykey] = {}
vdj_data.uns[diversitykey] = {
"rarefaction_cells_x": pred,
"rarefaction_clones_y": y,
}
logg.info(
" finished",
time=start,
deep=("updated `.uns` with rarefaction curves.\n"),
)
if isinstance(vdj_data, Dandelion):
return {"rarefaction_cells_x": pred, "rarefaction_clones_y": y}
[docs]
def clone_diversity(
vdj_data: Dandelion | AnnData,
groupby: str,
method: Literal["gini", "chao1", "shannon"] = "gini",
metric: Literal["clone_network", "clone_degree", "clone_centrality"] = None,
clone_key: str | None = None,
return_table: bool = False,
diversity_key: str | None = None,
resample: bool = False,
downsample: int | None = None,
n_resample: int = 50,
normalize: bool = True,
reconstruct_network: bool = True,
expanded_only: bool = False,
use_contracted: bool = False,
key_added: str | None = None,
verbose: bool = False,
**kwargs,
) -> pd.DataFrame:
"""
Compute B cell clones diversity : Gini indices, Chao1 estimates, or Shannon entropy.
Parameters
----------
vdj_data : Dandelion | AnnData
`Dandelion` or `AnnData` object.
groupby : str
Column name to calculate the gini indices on, for e.g. sample, patient etc.
method : Literal["gini", "chao1", "shannon"], optional
Method for diversity estimation. Either one of ['gini', 'chao1', 'shannon'].
metric : Literal["clone_network", "clone_degree", "clone_centrality"], optional
Metric to use for calculating Gini indices of clones.
Accepts one of ['clone_network', 'clone_degree', 'clone_centrality'].
`None` defaults to 'clone_network'.
clone_key : str | None, optional
Column name specifying the clone_id column in metadata.
return_table : bool, optional
If True, a `pandas` data frame is returned.
If False, function will try to populate the input object's metadata/obs slot.
diversity_key : str | None, optional
key for 'diversity' results in `.uns`.
resample : bool, optional
Whether or not to randomly sample cells without replacement to
the minimum size of groups for the diversity calculation. Default is False.
downsample : int | None, optional
number of cells to downsample to. If None, defaults to size of smallest group.
n_resample : int, optional
Number of times to perform resampling. Default is 50.
normalize : bool, optional
Whether or not to return normalized Shannon Entropy according to https://math.stackexchange.com/a/945172.
Default is True.
reconstruct_network : bool, optional
Whether or not to reconstruct the network for Gini Index based measures.
Default is True and will reconstruct for each group specified by groupby option.
expanded_only : bool, optional
Whether or not to calculate gini indices using expanded clones only. Default is False i.e. use all cells/clones.
use_contracted : bool, optional
Whether or not to perform the gini calculation after contraction of clone network.
Only applies to calculation of clone size gini index. Default is False.
This is to try and preserve the single-cell properties of the network.
key_added : str | None, optional
column names for output.
verbose : bool, optional
whether to print progress.
**kwargs
passed to dandelion.tl.generate_network
Returns
-------
pd.DataFrame
Pandas DataFrame holding diversity information.
"""
if downsample is not None:
resample = True
if method == "gini":
if return_table:
return diversity_gini(
vdj_data,
groupby=groupby,
metric=metric,
clone_key=clone_key,
return_table=return_table,
diversity_key=diversity_key,
resample=resample,
n_resample=n_resample,
downsample=downsample,
reconstruct_network=reconstruct_network,
expanded_only=expanded_only,
use_contracted=use_contracted,
key_added=key_added,
verbose=verbose,
**kwargs,
)
else:
diversity_gini(
vdj_data,
groupby=groupby,
metric=metric,
clone_key=clone_key,
return_table=return_table,
diversity_key=diversity_key,
resample=resample,
n_resample=n_resample,
downsample=downsample,
reconstruct_network=reconstruct_network,
expanded_only=expanded_only,
use_contracted=use_contracted,
key_added=key_added,
verbose=verbose,
**kwargs,
)
if method == "chao1":
if return_table:
return diversity_chao1(
vdj_data,
groupby=groupby,
clone_key=clone_key,
return_table=return_table,
diversity_key=diversity_key,
resample=resample,
n_resample=n_resample,
downsample=downsample,
key_added=key_added,
verbose=verbose,
)
else:
diversity_chao1(
vdj_data,
groupby=groupby,
clone_key=clone_key,
return_table=return_table,
diversity_key=diversity_key,
resample=resample,
n_resample=n_resample,
downsample=downsample,
key_added=key_added,
verbose=verbose,
)
if method == "shannon":
if return_table:
return diversity_shannon(
vdj_data,
groupby=groupby,
clone_key=clone_key,
return_table=return_table,
diversity_key=diversity_key,
resample=resample,
n_resample=n_resample,
normalize=normalize,
downsample=downsample,
key_added=key_added,
verbose=verbose,
)
else:
diversity_shannon(
vdj_data,
groupby=groupby,
clone_key=clone_key,
return_table=return_table,
diversity_key=diversity_key,
resample=resample,
n_resample=n_resample,
normalize=normalize,
downsample=downsample,
key_added=key_added,
verbose=verbose,
)
def clone_networkstats(
vdj_data: Dandelion,
expanded_only: bool = False,
network_clustersize: bool = False,
verbose: bool = False,
) -> tuple[defaultdict, defaultdict, defaultdict]:
"""Retrieve network stats.
Parameters
----------
vdj_data : Dandelion
input object
expanded_only : bool, optional
whether or not to calculate only on expanded clones.
network_clustersize : bool, optional
depends on metric.
verbose : bool, optional
whether to print progress.
Returns
-------
tuple[defaultdict, defaultdict, defaultdict]
output nodes names, vertex sizes and cluster sizes.
Raises
------
AttributeError
if graph not found.
TypeError
if input object is not Dandelion.
"""
start = logg.info("Calculating vertex size of nodes after contraction")
if isinstance(vdj_data, Dandelion):
if vdj_data.graph is None:
raise AttributeError(
"Graph not found. Please run tl.generate_network."
)
else:
if expanded_only:
G = vdj_data.graph[1]
else:
G = vdj_data.graph[0]
remove_edges = defaultdict(list)
vertexsizes = defaultdict(list)
clustersizes = defaultdict(list)
nodes_names = defaultdict(list)
for subg in tqdm(
nx.connected_components(G),
desc="Reducing graph ",
disable=not verbose,
bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}",
):
nodes = sorted(list(subg))
# just assign the value in a single cell, because this will be representative of the clone
tmp = nodes[0]
for n in nodes:
nodes_names[n] = tmp # keep so i can reference later
if len(nodes) > 1:
G_ = G.subgraph(nodes).copy()
remove_edges[tmp] = [
(e[0], e[1])
for e in G_.edges(data=True)
if e[2]["weight"] > 0
]
if len(remove_edges[tmp]) > 0:
G_.remove_edges_from(remove_edges[tmp])
for connected in nx.connected_components(G_):
vertexsizes[tmp].append(len(connected))
vertexsizes[tmp] = sorted(
vertexsizes[tmp], reverse=True
)
else:
vertexsizes[tmp] = [
1 for i in range(len(G_.edges(data=True)))
]
if network_clustersize:
clustersizes[tmp] = len(vertexsizes[tmp])
else:
clustersizes[tmp] = len(nodes)
else:
vertexsizes[tmp] = [1]
clustersizes[tmp] = [1]
return (nodes_names, vertexsizes, clustersizes)
else:
raise TypeError("Input object must be of {}".format(Dandelion))
def diversity_gini(
vdj_data: Dandelion | AnnData,
groupby: str,
metric: str | None = None,
clone_key: str | None = None,
return_table: bool = False,
resample: bool = False,
n_resample: int = 50,
downsample: int | None = None,
reconstruct_network: bool = True,
expanded_only: bool = False,
use_contracted: bool = False,
key_added: str | None = None,
verbose: bool = False,
**kwargs,
) -> pd.DataFrame:
"""
Compute clones Gini indices.
Parameters
----------
vdj_data : Dandelion | AnnData
`Dandelion` or `AnnData` object.
groupby : str
Column name to calculate the Gini indices on, for e.g. sample, patient etc.
metric : str | None, optional
Metric to use for calculating Gini indices of clones.
Accepts one of ['clone_network', 'clone_degree', 'clone_centrality'].
Defaults to 'clone_centrality'.
clone_key : str | None, optional
Column name specifying the clone_id column in metadata.
return_table : bool, optional
If True, a `pandas` data frame is returned.
If False, function will try to populate the input object's metadata/obs slot.
resample : bool, optional
Whether or not to randomly sample cells without replacement to
the minimum size of groups for the diversity calculation.
Default is False. Resampling will automatically trigger reconstruction of network.
n_resample : int, optional
Number of times to perform resampling. Default is 50.
downsample : int | None, optional
number of cells to downsample to. If None, defaults to size of smallest group.
reconstruct_network : bool, optional
Whether or not to reconstruct the network for Gini Index based measures.
Default is True and will reconstruct for each group specified by groupby option.
expanded_only : bool, optional
Whether or not to calculate gini indices using expanded clones only. Default is False i.e. use all cells/clones.
use_contracted : bool, optional
Whether or not to perform the gini calculation after contraction of clone network.
Only applies to calculation of clone size gini index. Default is False.
This is to try and preserve the single-cell properties of the network.
key_added : str | None, optional
column names for output.
verbose : bool, optional
whether to print progress.
**kwargs
passed to dandelion.tl.generate_network
Returns
-------
pd.DataFrame
pandas DataFrame holding diversity information.
Raises
------
TypeError
if not Dandelion class.
ValueError
if columns names don't exist.
"""
start = logg.info("Calculating Gini indices")
res = gini_indices(
vdj_data,
groupby=groupby,
clone_key=clone_key,
metric=metric,
resample=resample,
n_resample=n_resample,
downsample=downsample,
reconstruct_network=reconstruct_network,
expanded_only=expanded_only,
contracted=use_contracted,
key_added=key_added,
verbose=verbose,
**kwargs,
)
if return_table:
logg.info(" finished", time=start)
return res
else:
transfer_diversity_results(vdj_data, res, groupby)
if isinstance(vdj_data, Dandelion):
logg.info(
" finished",
time=start,
deep=("updated `.metadata` with Gini indices.\n"),
)
def diversity_chao1(
vdj_data: Dandelion | AnnData,
groupby: str,
clone_key: str | None = None,
return_table: bool = False,
diversity_key: str | None = None,
resample: bool = False,
n_resample: int = 50,
downsample: int | None = None,
key_added: str | None = None,
verbose: bool = False,
) -> pd.DataFrame:
"""
Compute clones Chao1 estimates.
Parameters
----------
vdj_data : Dandelion | AnnData
`Dandelion` or `AnnData` object.
groupby : str
Column name to calculate the Chao1 estimates on, for e.g. sample, patient etc.
clone_key : str | None, optional
Column name specifying the clone_id column in metadata.
return_table : bool, optional
If True, a `pandas` dataframe is returned. If False, function will try to populate the input object's
metadata/obs slot.
diversity_key : str | None, optional
key for 'diversity' results in `.uns`.
resample : bool, optional
Whether or not to randomly sample cells without replacement to the minimum size of groups for the diversity
calculation. Default is False.
n_resample : int, optional
Number of times to perform resampling. Default is 50.
downsample : int | None, optional
number of cells to downsample to. If None, defaults to size of smallest group.
key_added : str | None, optional
column names for output.
verbose : bool, optional
whether to print progress.
Returns
-------
pd.DataFrame
pandas DataFrame holding diversity information.
"""
start = logg.info("Calculating Chao1 estimates")
res = chao1_estimates(
vdj_data,
groupby=groupby,
clone_key=clone_key,
resample=resample,
n_resample=n_resample,
downsample=downsample,
key_added=key_added,
verbose=verbose,
)
diversitykey = diversity_key if diversity_key is not None else "diversity"
if isinstance(vdj_data, AnnData):
if diversitykey not in vdj_data.uns:
vdj_data.uns[diversitykey] = {}
vdj_data.uns[diversitykey].update({"chao1": res})
if return_table:
if isinstance(vdj_data, AnnData):
logg.info(
" finished",
time=start,
deep=("updated `.uns` with Chao1 estimates.\n"),
)
else:
logg.info(" finished", time=start)
return res
else:
transfer_diversity_results(vdj_data, res, groupby)
if isinstance(vdj_data, Dandelion):
logg.info(
" finished",
time=start,
deep=("updated `.metadata` with Chao1 estimates.\n"),
)
elif isinstance(vdj_data, AnnData):
logg.info(
" finished",
time=start,
deep=("updated `.obs` and `.uns` with Chao1 estimates.\n"),
)
def diversity_shannon(
vdj_data: Dandelion | AnnData,
groupby: str,
clone_key: str | None = None,
return_table: bool = False,
diversity_key: str | None = None,
resample: bool = False,
n_resample: int = 50,
normalize: bool = True,
key_added: str | None = None,
downsample: int | None = None,
verbose: bool = False,
) -> pd.DataFrame:
"""
Compute clones Shannon entropy.
Parameters
----------
vdj_data : Dandelion | AnnData
`Dandelion` or `AnnData` object.
groupby : str
Column name to calculate the Shannon entropy on, for e.g. sample, patient etc.
clone_key : str | None, optional
Column name specifying the clone_id column in metadata.
return_table : bool, optional
If True, a `pandas` data frame is returned. If False, function will try to populate the input object's metadata/obs slot.
diversity_key : str | None, optional
key for 'diversity' results in `.uns`.
resample : bool, optional
Whether or not to randomly sample cells without replacement to the minimum size of groups for the diversity calculation. Default is False.
n_resample : int, optional
Number of times to perform resampling. Default is 50.
normalize : bool, optional
Whether or not to return normalized Shannon Entropy according to https://math.stackexchange.com/a/945172. Default is True.
key_added : str | None, optional
column names for output.
downsample : int | None, optional
number of cells to downsample to. If None, defaults to size of smallest group.
verbose : bool, optional
whether to print progress.
Returns
-------
pd.DataFrame
pandas DataFrame holding diversity information.
"""
start = logg.info("Calculating Shannon entropy")
res = shannon_entropy(
vdj_data,
groupby=groupby,
clone_key=clone_key,
resample=resample,
n_resample=n_resample,
normalize=normalize,
key_added=key_added,
downsample=downsample,
verbose=verbose,
)
diversitykey = diversity_key if diversity_key is not None else "diversity"
if isinstance(vdj_data, AnnData):
if diversitykey not in vdj_data.uns:
vdj_data.uns[diversitykey] = {}
vdj_data.uns[diversitykey].update({"shannon": res})
if return_table:
if isinstance(vdj_data, AnnData):
if normalize:
logg.info(
" finished",
time=start,
deep=("updated `.uns` with normalized Shannon entropy.\n"),
)
else:
logg.info(
" finished",
time=start,
deep=("updated `.uns` with Shannon entropy.\n"),
)
else:
logg.info(" finished", time=start)
return res
else:
transfer_diversity_results(vdj_data, res, groupby)
if isinstance(vdj_data, Dandelion):
if normalize:
logg.info(
" finished",
time=start,
deep=(
"updated `.metadata` with normalized Shannon entropy.\n"
),
)
else:
logg.info(
" finished",
time=start,
deep=("updated `.metadata` with Shannon entropy.\n"),
)
elif isinstance(vdj_data, AnnData):
if normalize:
logg.info(
" finished",
time=start,
deep=(
"updated `.obs` and `.uns` with normalized Shannon entropy.\n"
),
)
else:
logg.info(
" finished",
time=start,
deep=("updated `.obs` and `.uns` with Shannon entropy.\n"),
)
def chooseln(N, k) -> float:
"""
R's lchoose in python
from https://stackoverflow.com/questions/21767690/python-log-n-choose-k
"""
return gammaln(N + 1) - gammaln(N - k + 1) - gammaln(k + 1)
def rarefun(y, sample) -> float:
"""
Adapted from rarefun from vegan:
https://github.com/vegandevs/vegan/blob/master/R/rarefy.R
"""
res = []
y = y[y > 0]
J = np.sum(y)
ldiv = chooseln(J, sample)
for d in J - y:
if d < sample:
res.append(0)
else:
res.append(np.exp(chooseln(d, sample) - ldiv))
out = np.sum(1 - np.array(res))
return out
def drop_nan_values(df: pd.DataFrame) -> None:
"""
Drop NaN values from a pandas DataFrame.
Parameters
----------
df : pd.DataFrame
The DataFrame from which to drop NaN values.
"""
if "nan" in df.index or np.nan in df.index:
try:
df.drop("nan", inplace=True)
except:
df.drop(np.nan, inplace=True)
def safe_average_update(data_dict: dict, key: str, values: list[float]) -> None:
"""
Safely calculate the average of a list of values and update the dictionary.
Parameters
----------
data_dict : dict
The dictionary to update.
key : Any
The key to update in the dictionary.
values : list of float
The list of values to average.
"""
try:
average = sum(values) / len(values)
except ZeroDivisionError:
average = 0
data_dict.update({key: average})
def process_clone_network_stats(
ddl_dat: Dandelion, expanded_only: bool, contracted: bool, verbose: bool
) -> tuple[dict, dict, dict]:
"""
Process clone network statistics and calculate Gini indices.
Parameters
----------
ddl_dat : Any
The `Dandelion` object to process.
expanded_only : bool
Whether to consider only expanded clones.
contracted : bool
Whether to use contracted network clusters.
verbose : bool
Whether to display progress information.
Returns
-------
tuple[dict, dict, dict]
Tuple containing dictionaries for node names, vertex sizes, and cluster sizes.
"""
n_n, v_s, c_s = clone_networkstats(
ddl_dat,
expanded_only=expanded_only,
network_clustersize=contracted,
verbose=verbose,
)
g_c_v = defaultdict(dict)
g_c_v_res, g_c_c_res = {}, {}
for vs in v_s:
v_sizes = np.array(v_s[vs])
if len(v_sizes) > 1:
v_sizes = np.append(v_sizes, 0)
g_c_v[vs] = calculate_gini_index(v_sizes)
for cell in n_n:
g_c_v_res.update({cell: g_c_v[n_n[cell]]})
c_sizes = np.array(sorted(list(flatten(c_s.values())), reverse=True))
if len(c_sizes) > 1:
c_sizes = np.append(c_sizes, 0)
g_c_c = calculate_gini_index(c_sizes)
for cell in n_n:
g_c_c_res.update({cell: g_c_c})
return g_c_v_res, g_c_c_res
def gini_indices(
data: Dandelion,
groupby: str,
metric: str | None = None,
clone_key: str | None = None,
resample: bool = False,
n_resample: int = 50,
downsample: int | None = None,
reconstruct_network: bool = True,
expanded_only: bool = False,
contracted: bool = False,
key_added: str | None = None,
verbose: bool = False,
**kwargs,
) -> pd.DataFrame:
"""Gini indices."""
if isinstance(data, AnnData):
raise TypeError("Only Dandelion class object accepted.")
elif isinstance(data, Dandelion):
_metadata = data.metadata.copy()
clonekey = clone_key if clone_key is not None else "clone_id"
met = metric if metric is not None else "clone_network"
# split up the table by groupby
_metadata[groupby] = _metadata[groupby].astype("category")
_metadata[groupby] = _metadata[groupby].cat.remove_unused_categories()
groups = list(set(_metadata[groupby]))
minsize = calculate_minsize_and_log(
df=_metadata, col=groupby, downsample=downsample, resample=resample
)
res1 = {}
if met == "clone_network":
logg.info(
"Computing Gini indices for cluster and vertex size using network."
)
if not reconstruct_network:
n_n, v_s, c_s = clone_networkstats(
data,
expanded_only=expanded_only,
network_clustersize=contracted,
verbose=verbose,
)
g_c_v = defaultdict(dict)
g_c_v_res, g_c_c_res = {}, {}
for vs in v_s:
v_sizes = np.array(v_s[vs])
if len(v_sizes) > 1:
v_sizes = np.append(v_sizes, 0)
g_c_v[vs] = calculate_gini_index(v_sizes)
for cell in n_n:
g_c_v_res.update({cell: g_c_v[n_n[cell]]})
c_sizes = np.array(
np.array(sorted(list(flatten(c_s.values())), reverse=True))
)
if len(c_sizes) > 1:
c_sizes = np.append(c_sizes, 0)
g_c_c = calculate_gini_index(c_sizes)
for cell in n_n:
g_c_c_res.update({cell: g_c_c})
data.metadata["clone_network_vertex_size_gini"] = pd.Series(
g_c_v_res
)
data.metadata["clone_network_cluster_size_gini"] = pd.Series(
g_c_c_res
)
elif met == "clone_centrality":
logg.info(
"Computing gini indices for clone size using metadata and node closeness centrality using network."
)
clone_centrality(data)
elif met == "clone_degree":
logg.info(
"Computing gini indices for clone size using metadata and node degree using network."
)
clone_degree(data)
_metadata = data.metadata.copy()
_data = data.data.copy()
res2 = {}
if resample:
logg.info(
"Downsampling each group specified in `{}` to {} cells for calculating gini indices.".format(
groupby, minsize
)
)
for g in groups:
# clone size distribution
_dat = _metadata[_metadata[groupby] == g]
__data = _data[_data["cell_id"].isin(list(_dat.index))]
ddl_dat = Dandelion(__data, metadata=_dat)
if resample:
sizelist = []
if isinstance(data, Dandelion):
graphlist = []
for i in tqdm(
range(0, n_resample),
bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}",
disable=not verbose,
):
if isinstance(data, Dandelion):
resampled = generate_network(
ddl_dat,
clone_key=clonekey,
downsample=minsize,
verbose=verbose,
compute_layout=False,
**kwargs,
)
if met == "clone_network":
g_c_v_res, g_c_c_res = process_clone_network_stats(
resampled,
expanded_only=expanded_only,
contracted=contracted,
verbose=verbose,
)
resampled.metadata["clone_network_vertex_size_gini"] = (
pd.Series(g_c_v_res)
)
resampled.metadata[
"clone_network_cluster_size_gini"
] = pd.Series(g_c_c_res)
elif met == "clone_centrality":
clone_centrality(resampled)
elif met == "clone_degree":
clone_degree(resampled)
else:
raise ValueError(
"Unknown metric for calculating network stats. Please specify "
+ "one of `clone_network`, `clone_centrality` or `clone_degree`."
)
# clone size gini
_dat = resampled.metadata.copy()
_tab = _dat[clonekey].value_counts()
drop_nan_values(_tab)
if met == "clone_network":
sizelist.append(_dat[met + "_cluster_size_gini"].mean())
else:
clonesizecounts = np.array(_tab)
clonesizecounts = clonesizecounts[clonesizecounts > 0]
if len(clonesizecounts) > 1:
# append a single zero for lorenz curve calculation
clonesizecounts = np.append(clonesizecounts, 0)
g_c = (
calculate_gini_index(clonesizecounts)
if len(clonesizecounts) > 0
else 0
)
sizelist.append(g_c)
if met == "clone_network":
graphlist.append(_dat[met + "_vertex_size_gini"].mean())
else:
# vertex closeness centrality or weighted degree distribution
# only calculate for expanded clones. If including non-expanded clones, the centrality is just zero which doesn't help.
connectednodes = resampled.metadata[met][
resampled.metadata[met] > 0
]
graphcounts = np.array(connectednodes.value_counts())
# graphcounts = np.append(graphcounts, 0) # if I add a zero here, it will skew the results when the centrality measure is uniform.... so leave it out for now.
g_c = (
calculate_gini_index(graphcounts)
if len(graphcounts) > 0
else 0
)
graphlist.append(g_c)
safe_average_update(res1, g, sizelist)
if "graphlist" in locals():
safe_average_update(res2, g, graphlist)
else:
_tab = _dat[clonekey].value_counts()
drop_nan_values(_tab)
if met != "clone_network":
clonesizecounts = np.array(_tab)
clonesizecounts = clonesizecounts[clonesizecounts > 0]
if len(clonesizecounts) > 1:
# append a single zero for lorenz curve calculation
clonesizecounts = np.append(clonesizecounts, 0)
g_c = (
calculate_gini_index(clonesizecounts)
if len(clonesizecounts) > 0
else 0
)
res1.update({g: g_c})
if isinstance(data, Dandelion):
if met == "clone_network":
if reconstruct_network:
generate_network(
ddl_dat,
clone_key=clonekey,
verbose=verbose,
compute_layout=False,
**kwargs,
)
g_c_v_res, g_c_c_res = process_clone_network_stats(
ddl_dat,
expanded_only=expanded_only,
contracted=contracted,
verbose=verbose,
)
res2.update({g: pd.Series(g_c_v_res).mean()})
res1.update({g: pd.Series(g_c_c_res).mean()})
else:
res2.update({g: _dat[met + "_vertex_size_gini"].mean()})
res1.update(
{g: _dat[met + "_cluster_size_gini"].mean()}
)
else:
# vertex closeness centrality or weighted degree distribution
# only calculate for expanded clones. If including non-expanded clones, the centrality is
# just zero which doesn't help.
connectednodes = _dat[met][_dat[met] > 0]
graphcounts = np.array(connectednodes.value_counts())
# graphcounts = np.append(graphcounts, 0) # if I add a zero here, it will skew the results
# when the centrality measure is uniform.... so leave it out for now.
g_c = (
calculate_gini_index(graphcounts)
if len(graphcounts) > 0
else 0
)
res2.update({g: g_c})
if "res2" in locals():
res_df = pd.DataFrame.from_dict([res1, res2]).T
if key_added is None:
if met == "clone_network":
res_df.columns = [
met + "_cluster_size_gini",
met + "_vertex_size_gini",
]
else:
res_df.columns = ["clone_size_gini", met + "_gini"]
else:
if not type(key_added) is list:
key_added = [key_added]
if len(key_added) == len(res_df.columns):
res_df.columns = key_added
else:
raise ValueError(
"Please provide {} key(s) for new column names.".format(
len(res_df.columns)
)
)
else:
res_df = pd.DataFrame.from_dict([res1]).T
if key_added is None:
res_df.columns = ["clone_size_gini"]
else:
if not type(key_added) is list:
key_added = [key_added]
if len(key_added) == len(res_df.columns):
res_df.columns = key_added
else:
raise ValueError(
"Please provide {} key(s) for new column names.".format(
len(res_df.columns)
)
)
return res_df
def chao1_estimates(
data: Dandelion | AnnData,
groupby: str,
clone_key: str | None = None,
resample: bool = False,
n_resample: int = 50,
downsample: int | None = None,
key_added: str | None = None,
verbose: bool = False,
) -> pd.DataFrame:
"""Chao1 estimates."""
if isinstance(data, AnnData):
_metadata = data.obs.copy()
elif isinstance(data, Dandelion):
_metadata = data.metadata.copy()
clonekey = clone_key if clone_key is not None else "clone_id"
diversity_mode = "chao1"
# split up the table by groupby
_metadata[groupby] = _metadata[groupby].astype("category")
_metadata[groupby] = _metadata[groupby].cat.remove_unused_categories()
groups = list(set(_metadata[groupby]))
minsize = calculate_minsize_and_log(
df=_metadata, col=groupby, downsample=downsample, resample=resample
)
res1 = {}
for g in groups:
# clone size distribution
_dat = _metadata[_metadata[groupby] == g]
if resample:
sizelist = []
for i in tqdm(
range(0, n_resample),
bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}",
disable=not verbose,
):
_dat = _dat.sample(minsize)
_tab = _dat[clonekey].value_counts()
drop_nan_values(_tab)
clonesizecounts = np.array(_tab)
clonesizecounts = clonesizecounts[clonesizecounts > 0]
g_c = (
calculate_chao1(clonesizecounts)
if len(clonesizecounts) > 0
else 0
)
sizelist.append(g_c)
safe_average_update(res1, g, sizelist)
else:
_tab = _dat[clonekey].value_counts()
drop_nan_values(_tab)
clonesizecounts = np.array(_tab)
clonesizecounts = clonesizecounts[clonesizecounts > 0]
g_c = (
calculate_chao1(clonesizecounts)
if len(clonesizecounts) > 0
else 0
)
res1.update({g: g_c})
res_df = pd.DataFrame.from_dict([res1]).T
rename_result_column(res_df, diversity_mode, key_added)
return res_df
def shannon_entropy(
data: Dandelion | AnnData,
groupby: str,
clone_key: str | None = None,
resample: bool = False,
n_resample: int = 50,
normalize: bool = True,
downsample: int | None = None,
key_added: str | None = None,
verbose: bool = False,
) -> pd.DataFrame:
"""Shannon entropy."""
if isinstance(data, AnnData):
_metadata = data.obs.copy()
elif isinstance(data, Dandelion):
_metadata = data.metadata.copy()
clonekey = clone_key if clone_key is not None else "clone_id"
diversity_mode = "shannon" if not normalize else "normalized_shannon"
# split up the table by groupby
_metadata[groupby] = _metadata[groupby].astype("category")
_metadata[groupby] = _metadata[groupby].cat.remove_unused_categories()
groups = list(set(_metadata[groupby]))
minsize = calculate_minsize_and_log(
df=_metadata, col=groupby, downsample=downsample, resample=resample
)
res1 = {}
for g in groups:
# clone size distribution
_dat = _metadata[_metadata[groupby] == g]
if resample:
sizelist = []
for i in tqdm(
range(0, n_resample),
bar_format="{l_bar}{bar:10}{r_bar}{bar:-10b}",
disable=not verbose,
):
_dat = _dat.sample(minsize)
_tab = _dat[clonekey].value_counts()
drop_nan_values(_tab)
clonesizecounts = np.array(_tab)
clonesizecounts = clonesizecounts[clonesizecounts > 0]
g_c = (
calculate_shannon_entropy(clonesizecounts, normalize)
if len(clonesizecounts) > 0
else 0
)
sizelist.append(g_c)
safe_average_update(res1, g, sizelist)
else:
_tab = _dat[clonekey].value_counts()
drop_nan_values(_tab)
clonesizecounts = np.array(_tab)
clonesizecounts = clonesizecounts[clonesizecounts > 0]
g_c = (
calculate_shannon_entropy(clonesizecounts, normalize)
if len(clonesizecounts) > 0
else 0
)
res1.update({g: g_c})
res_df = pd.DataFrame.from_dict([res1]).T
rename_result_column(res_df, diversity_mode, key_added)
return res_df
def calculate_gini_index(
values: np.ndarray, method: str = "trapezoids"
) -> float:
"""
Calculate the Gini index for a given array of values.
Parameters
----------
values : np.ndarray
Array of values to calculate the Gini index for.
method : str, optional
Method to use for Gini index calculation, by default "trapezoids".
Returns
-------
float
The calculated Gini index, or 0 if the index is negative or NaN.
"""
gini = gini_index(values, method=method)
return 0 if gini < 0 or np.isnan(gini) else gini
def calculate_chao1(values: np.ndarray) -> float:
"""
Calculate the Chao1 estimate for a given array of values
Parameters
----------
values : np.ndarray
Array of values to calculate the Chao1 estimates for.
Returns
-------
float
The calculated Chao1 estimates, or 0 if the index is negative or NaN.
"""
chao1e = chao1(values)
return 0 if chao1e < 0 or np.isnan(chao1e) else chao1e
def calculate_shannon_entropy(values: np.ndarray, normalize: bool) -> float:
"""
Calculate the Shannon entropy for a given array of values
Parameters
----------
values : np.ndarray
Array of values to calculate the Shannon entropys for.
normalize : bool, optional
Whether or not to return normalized Shannon Entropy according to https://math.stackexchange.com/a/945172. Default is True.
Returns
-------
float
The calculated Shannon entropys, or 0 if the index is negative or NaN.
"""
if normalize:
if len(values) == 1:
return 0
else:
values_freqs = values / np.sum(values)
return -np.sum(
(values_freqs * np.log(values_freqs))
/ np.log(len(values_freqs))
)
else:
return shannon(values)
def transfer_diversity_results(
vdj_data: Dandelion | AnnData, diversity_results: pd.DataFrame, groupby: str
) -> None:
"""Transfer diversity results."""
if isinstance(vdj_data, AnnData):
_metadata = vdj_data.obs.copy()
elif isinstance(vdj_data, Dandelion):
_metadata = vdj_data.metadata.copy()
groups = list(set(_metadata[groupby]))
for c in diversity_results.columns:
_metadata[c] = np.nan
for g in groups:
for i in _metadata.index:
if _metadata.at[i, groupby] == g:
_metadata.at[i, c] = diversity_results[c][g]
if isinstance(vdj_data, AnnData):
vdj_data.obs = _metadata.copy()
elif isinstance(vdj_data, Dandelion):
vdj_data.metadata = _metadata.copy()
def rename_result_column(
res_df: pd.DataFrame,
diversity_mode: str,
key_added: list[str] | str | None = None,
) -> None:
"""Processes the output result"""
if key_added is None:
res_df.columns = ["clone_size_" + diversity_mode]
else:
if isinstance(key_added, list):
res_df.columns = key_added[0]
else:
res_df.columns = [key_added]
def calculate_minsize_and_log(
df: pd.DataFrame,
col: str,
downsample: int | None = None,
resample: int | bool | None = False,
):
"""
Calculate the minimum group size for downsampling and log related information.
Parameters
----------
_df : pd.DataFrame
Metadata DataFrame containing the grouping information.
col : str
Column name in `_df` to group by.
downsample : int | None, optional
Downsampling size. If None or larger than the smallest group size, defaults to the smallest group size.
resample : bool | int | None, optional
Whether to log a message about resampling.
logg : logging.Logger, optional
Logger instance for logging messages. If None, no logs will be emitted.
Returns
-------
int
The determined minimum group size (`minsize`).
Notes
-----
- If `downsample` is greater than the smallest group size, the function logs a warning and defaults
to the smallest group size.
- If `minsize` is less than 100, a cautionary message is logged about potential issues with
diversity measures.
- When `resample` is True, a message is logged about the downsampling process.
"""
# Determine minimum group size
if downsample is None:
minsize = df[col].value_counts().min()
else:
minsize = downsample
if minsize > df[col].value_counts().min():
logg.info(
"Downsampling size provided of {} was larger than the smallest group size. ".format(
downsample
)
+ "Defaulting to the smallest group size for downsampling."
)
minsize = df[col].value_counts().min()
# Log a warning if the minimum size is too small
if minsize < 100:
logg.info(
"The minimum cell numbers when grouped by {} is {}.".format(
col, minsize
)
+ " Exercise caution when interpreting diversity measures."
)
# Log information about resampling
if resample:
logg.info(
"Downsampling each group specified in `{}` to {} cells for calculating Shannon entropy.".format(
col, minsize
)
)
return minsize