import h5py
import json
import os
import pickle
import re
import networkx as nx
import numpy as np
import pandas as pd
from collections import defaultdict, OrderedDict
from pathlib import Path
from scanpy import logging as logg
from scipy.sparse import csr_matrix
from dandelion.base.core._core import Dandelion, load_data
from dandelion.utilities._utilities import (
DEFAULT_PREFIX,
CELLRANGER,
AIRR,
deprecated,
all_missing,
sanitize_blastn,
sanitize_data,
Contig,
fasta_iterator,
)
def _load_distance_zarr(distance_zarr: Path | str):
"""Load lazy distance array from a user/path-compatible Zarr location."""
import dask.array as da
base = str(distance_zarr).rstrip("/\\")
if base.lower().endswith(".zarr"):
candidates = [f"{base}/distance_matrix", base]
else:
candidates = [
f"{base}/distance_matrix.zarr/distance_matrix",
f"{base}/distance_matrix.zarr",
f"{base}/distance_matrix",
]
last_error = None
for candidate in candidates:
try:
return da.from_zarr(candidate)
except Exception as exc:
last_error = exc
raise ValueError(
f"Could not load distances from distance_zarr={distance_zarr}. "
f"Tried: {candidates}"
) from last_error
def decode(df):
"""Decode byte strings in a DataFrame."""
for col in df:
if df[col].dtype == object:
df[col] = df[col].apply(
lambda x: x.decode("utf-8") if isinstance(x, bytes) else x
)
return df
[docs]
def read_h5ddl(
filename: Path | str = "dandelion_data.h5ddl",
distance_zarr: Path | str | None = None,
verbose: bool = False,
) -> Dandelion:
"""
Read in and returns a Dandelion class from .h5ddl format.
Parameters
----------
filename : Path | str, optional
path to `.h5ddl` file
distance_zarr : Path | str | None, optional
path to Zarr array for distances if computed lazily.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
Returns
-------
Dandelion
Dandelion object.
Raises
------
AttributeError
if `data` not found in `.h5ddl` file.
"""
# Auto-detect alongside zarr written by write_h5ddl for dask distances
if distance_zarr is None:
auto_zarr = Path(filename).with_suffix(".zarr")
if auto_zarr.exists():
distance_zarr = auto_zarr
# Detect legacy (PyTables) vs new format
with h5py.File(filename, "r") as hf:
is_legacy = isinstance(hf["data"], h5py.Group)
if is_legacy: # pragma: no cover
data = decode(load_data(_read_h5_group_legacy(filename, group="data")))
metadata = _read_h5_group_legacy(filename, group="metadata")
try:
g_0 = _read_h5_group_legacy(filename, group="graph/graph_0")
g_1 = _read_h5_group_legacy(filename, group="graph/graph_1")
graph0 = _create_graph(g_0, adjust_adjacency=1, fillna=0)
graph1 = _create_graph(g_1, adjust_adjacency=1, fillna=0)
graph = (graph0, graph1)
except KeyError:
pass
try:
distances = _read_h5_csr_matrix(
filename, group="distances", as_df=False
)
distances._index_names = metadata.index
except KeyError:
if distance_zarr is not None: # pragma: no cover
distances = _load_distance_zarr(distance_zarr)
try:
layout0 = _read_h5_dict(filename, group="layout/layout_0")
layout1 = _read_h5_dict(filename, group="layout/layout_1")
layout = (layout0, layout1)
except KeyError:
pass
try:
germline = _read_h5_zip_legacy(filename, group="germline")
except KeyError:
pass
else:
data = decode(load_data(_read_h5_group(filename, group="data")))
# final decode to ensure all byte strings are converted to str
metadata = _read_h5_group(filename, group="metadata")
metadata_names = _read_h5_group(filename, group="metadata_names")
metadata.index = metadata_names
try:
g_0 = _read_h5_csr_matrix(
filename, group="graph/graph_0", as_df=True
)
g_1 = _read_h5_csr_matrix(
filename, group="graph/graph_1", as_df=True
)
graph0 = _create_graph(g_0, adjust_adjacency=1, fillna=0)
graph1 = _create_graph(g_1, adjust_adjacency=1, fillna=0)
graph = (graph0, graph1)
except KeyError:
pass
try:
distances = _read_h5_csr_matrix(
filename, group="distances", as_df=False
)
distances._index_names = metadata.index
except KeyError:
if distance_zarr is not None:
distances = _load_distance_zarr(distance_zarr)
try:
layout0 = _read_h5_dict(filename, group="layout/layout_0")
layout1 = _read_h5_dict(filename, group="layout/layout_1")
layout = (layout0, layout1)
except KeyError:
pass
try:
germline = _read_h5_zip(
filename,
group="germline",
key_group="keys",
value_group="values",
)
except KeyError:
pass
constructor = {}
constructor["data"] = data
if "metadata" in locals():
constructor["metadata"] = metadata
if "germline" in locals():
constructor["germline"] = germline
if "layout" in locals():
constructor["layout"] = layout
if "graph" in locals():
constructor["graph"] = graph
if "distances" in locals():
constructor["distances"] = distances
res = Dandelion(**constructor, verbose=verbose)
# ensure that the metadata is decoded
res._metadata = decode(res._metadata)
return res
read = read_ddl = read_h5ddl # alias
[docs]
def read_airr(
file: Path | str,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> Dandelion:
"""
Reads a standard single-cell AIRR rearrangement file.
If you have non-single-cell data, use `.load_data` first to load the data and then pass it to `ddl.Dandelion`.
That will tell you what columns are missing and you can fill it out accordingly e.g. make up a `cell_id`.
Parameters
----------
file : Path | str
path to AIRR rearrangement .tsv file.
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
Returns
-------
Dandelion
Dandelion object from AIRR file.
"""
vdj = Dandelion(file, verbose=verbose)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_bd_airr(
file: Path | str,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> Dandelion:
"""
Read the TCR or BCR `_AIRR.tsv` produced from BD Rhapsody technology.
Parameters
----------
file : Path | str
path to `_AIRR.tsv`
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
Returns
-------
Dandelion
Dandelion object from BD AIRR file.
"""
vdj = Dandelion(file, verbose=verbose)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_parse_airr(
file: Path | str,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
**kwargs,
) -> Dandelion:
"""
Read the TCR or BCR `_annotation_airr.tsv` produced from Parse Biosciences Evercode technology.
This is not to be used for any airr rearrangement file, but specifically for the one produced by Parse Biosciences.
For standard airr rearrangement files e.g. `all_contig_dandelion.tsv`, use `ddl.Dandelion` or `ddl.read_airr` directly.
Parameters
----------
file : Path | str
path to `_annotation_airr.tsv`
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
**kwargs
additional keyword arguments passed to Dandelion.
Returns
-------
Dandelion
Dandelion object from Parse AIRR file.
"""
data = load_data(file)
data.drop("cell_id", axis=1, inplace=True) # it's the wrong cell_id
data = data.rename(
columns={
"cell_barcode": "cell_id",
"read_count": "consensus_count",
"transcript_count": "umi_count",
"cdr3": "junction",
"cdr3_aa": "junction_aa",
}
)
vdj = Dandelion(data, verbose=verbose, **kwargs)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_10x_airr(
file: Path | str,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> Dandelion:
"""
Read the `airr_rearrangement.tsv` produced from Cell Ranger directly and returns a Dandelion object.
This is not to be used for any airr rearrangement file, but specifically for the one produced by 10x Genomics.
For standard airr rearrangement files e.g. `all_contig_dandelion.tsv`, use `ddl.Dandelion` or `ddl.read_airr` directly.
Parameters
----------
file : Path | str
path to `airr_rearrangement.tsv`
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
Returns
-------
Dandelion
Dandelion object from 10x AIRR file.
"""
dat = load_data(file)
# get all the v,d,j,c calls
if "locus" not in dat:
tmp = [
(v, d, j, c)
for v, d, j, c in zip(
dat["v_call"], dat["d_call"], dat["j_call"], dat["c_call"]
)
]
locus = []
for t in tmp:
if all("IGH" in x for x in t if pd.notnull(x)):
locus.append("IGH")
elif all("IGK" in x for x in t if pd.notnull(x)):
locus.append("IGK")
elif all("IGL" in x for x in t if pd.notnull(x)):
locus.append("IGL")
elif all("TRA" in x for x in t if pd.notnull(x)):
locus.append("TRA")
elif all("TRB" in x for x in t if pd.notnull(x)):
locus.append("TRB")
elif all("TRD" in x for x in t if pd.notnull(x)):
locus.append("TRD")
elif all("TRG" in x for x in t if pd.notnull(x)):
locus.append("TRG")
else:
locus.append(np.nan)
dat["locus"] = locus
null_columns = [col for col in dat.columns if all_missing(dat[col])]
if len(null_columns) > 0:
dat.drop(null_columns, inplace=True, axis=1)
vdj = Dandelion(dat, verbose=verbose)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_10x_vdj(
data: Path | str | pd.DataFrame,
filename_prefix: str | None = None,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_malformed: bool = True,
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> Dandelion:
"""
A parser to read .csv and .json files directly from folder containing 10x cellranger-outputs,
or parse an existing pandas DataFrame.
This function parses the 10x output files into an AIRR compatible format.
Minimum requirement is one of either {filename_prefix}_contig_annotations.csv or all_contig_annotations.json
when reading from file path.
If .fasta, .json files are found in the same folder, additional info will be appended to the final table.
Parameters
----------
data : Path | str | pandas.DataFrame
path to folder containing `.csv` and/or `.json` files, path to files directly, or a pandas
DataFrame containing the contig annotations data.
filename_prefix : str | None, optional
prefix of file name preceding '_contig'. None defaults to 'all'. Only used when data is a file/folder.
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_malformed : bool, optional
whether or not to remove malformed contigs.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
Returns
-------
Dandelion
Dandelion object holding the parsed data.
Raises
------
OSError
if contig_annotations.csv and all_contig_annotations.json file(s) not found in the input folder.
TypeError
if data is not a valid type (Path, str, or pandas.DataFrame).
"""
filename_pre = (
DEFAULT_PREFIX if filename_prefix is None else filename_prefix
)
# Handle DataFrame input (pandas)
if isinstance(data, pd.DataFrame):
logg.info("Parsing pandas DataFrame")
raw = data.copy()
raw.set_index("contig_id", drop=False, inplace=True)
out = parse_annotation(raw)
elif isinstance(data, (str, Path)):
# Handle file path inputs
if os.path.isdir(str(data)):
files = os.listdir(data)
filelist = []
for fx in files:
if re.search(filename_pre + "_contig", fx):
if fx.endswith(".fasta") or fx.endswith(".csv"):
filelist.append(fx)
if re.search(
f"{filename_pre.replace('filtered', 'all')}_contig_annotations",
fx,
):
if fx.endswith(".json"):
filelist.append(fx)
csv_idx = [i for i, j in enumerate(filelist) if j.endswith(".csv")]
json_idx = [
i for i, j in enumerate(filelist) if j.endswith(".json")
]
if len(csv_idx) == 1:
file = str(data) + "/" + str(filelist[csv_idx[0]])
logg.info("Reading {}".format(str(file)))
raw = pd.read_csv(str(file))
raw.set_index("contig_id", drop=False, inplace=True)
fasta_file = str(file).split("_annotations.csv")[0] + ".fasta"
json_file = re.sub(
filename_pre + "_contig_annotations",
f"{filename_pre.replace('filtered', 'all')}_contig_annotations",
str(file).split(".csv")[0] + ".json",
)
if os.path.exists(json_file):
logg.info(
"Found {} file. Extracting extra information.".format(
str(json_file)
)
)
out = parse_annotation(raw)
with open(json_file) as f:
raw_json = json.load(f)
out_json = parse_json(raw_json)
out.update(out_json)
elif os.path.exists(fasta_file):
logg.info(
"Found {} file. Extracting extra information.".format(
str(fasta_file)
)
)
seqs = {}
fh = open(fasta_file)
for header, sequence in fasta_iterator(fh):
seqs[header] = sequence
raw["sequence"] = pd.Series(seqs)
out = parse_annotation(raw)
else:
out = parse_annotation(raw)
elif len(csv_idx) < 1:
if len(json_idx) == 1:
json_file = str(data) + "/" + str(filelist[json_idx[0]])
logg.info("Reading {}".format(json_file))
if os.path.exists(json_file):
with open(json_file) as f:
raw = json.load(f)
out = parse_json(raw)
else:
raise OSError(
"{}_contig_annotations.csv and {}_contig_annotations.json file(s) not found in {} folder.".format(
str(filename_pre),
filename_pre.replace("filtered", "all"),
str(data),
)
)
elif len(csv_idx) > 1:
raise OSError(
"There are multiple input .csv files with the same filename prefix {} in {} folder.".format(
str(filename_pre), str(data)
)
)
elif os.path.isfile(str(data)):
file = data
if str(file).endswith(".csv"):
logg.info("Reading {}.".format(str(file)))
raw = pd.read_csv(str(file))
raw.set_index("contig_id", drop=False, inplace=True)
fasta_file = str(file).split("_annotations.csv")[0] + ".fasta"
json_file = re.sub(
filename_pre + "_contig_annotations",
f"{filename_pre.replace('filtered', 'all')}_contig_annotations",
str(file).split(".csv")[0] + ".json",
)
if os.path.exists(json_file):
logg.info(
"Found {} file. Extracting extra information.".format(
str(json_file)
)
)
out = parse_annotation(raw)
with open(json_file) as f:
raw_json = json.load(f)
out_json = parse_json(raw_json)
out.update(out_json)
elif os.path.exists(fasta_file):
logg.info(
"Found {} file. Extracting extra information.".format(
str(fasta_file)
)
)
seqs = {}
fh = open(fasta_file)
for header, sequence in fasta_iterator(fh):
seqs[header] = sequence
raw["sequence"] = pd.Series(seqs)
out = parse_annotation(raw)
else:
out = parse_annotation(raw)
elif str(file).endswith(".json"):
if os.path.exists(file):
logg.info("Reading {}".format(file))
with open(file) as f:
raw = json.load(f)
out = parse_json(raw)
else:
raise OSError("{} not found.".format(file))
else:
raise OSError("{} not found.".format(data))
else:
raise TypeError(
f"data must be a Path, str, or pandas.DataFrame, got {type(data)}"
)
res = pd.DataFrame.from_dict(out, orient="index")
# quick check if locus is malformed
if remove_malformed:
res = res[~res["locus"].str.contains("[|]")]
# change all unknowns to blanks
res.replace("unknown", "", inplace=True)
vdj = Dandelion(res, verbose=verbose)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_seekgene_vdj(
data: Path | str | pd.DataFrame,
filename_prefix: str | None = None,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_malformed: bool = True,
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> Dandelion:
"""
A parser to read .csv and .json files directly from folder containing SeekGene VDJ outputs,
or parse an existing pandas DataFrame.
SeekGene produces contig annotation files in the same format as 10x CellRanger VDJ output.
This function is a convenience wrapper around :func:`read_10x_vdj` with SeekGene-specific
naming for clarity.
Minimum requirement is one of either {filename_prefix}_contig_annotations.csv or
all_contig_annotations.json when reading from a file path.
If .fasta, .json files are found in the same folder, additional info will be appended to
the final table.
Parameters
----------
data : Path | str | pandas.DataFrame
path to folder containing `.csv` and/or `.json` files, path to files directly, or a pandas
DataFrame containing the contig annotations data.
filename_prefix : str | None, optional
prefix of file name preceding '_contig'. None defaults to 'all'. Only used when data is a file/folder.
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_malformed : bool, optional
whether or not to remove malformed contigs.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
Returns
-------
Dandelion
Dandelion object holding the parsed data.
Raises
------
OSError
if contig_annotations.csv and all_contig_annotations.json file(s) not found in the input folder.
TypeError
if data is not a valid type (Path, str, or pandas.DataFrame).
"""
ddl = read_10x_vdj(
data=data,
filename_prefix=filename_prefix,
prefix=prefix,
suffix=suffix,
sep=sep,
remove_malformed=remove_malformed,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
verbose=verbose,
)
# SeekGene VDJ files share the same CSV/JSON format as 10x CellRanger, but the
# resulting internal columns should not carry a _10x suffix.
_10x_rename = {
"is_cell_10x": "is_cell",
"high_confidence_10x": "high_confidence",
"sequence_length_10x": "sequence_length",
"raw_consensus_id_10x": "raw_consensus_id",
"exact_subclonotype_id_10x": "exact_subclonotype_id",
"filtered_10x": "filtered",
"is_asm_cell_10x": "is_asm_cell",
}
rename_map = {k: v for k, v in _10x_rename.items() if k in ddl.data.columns}
if rename_map:
ddl._data = ddl.data.rename(columns=rename_map)
return ddl
def parse_json(data: list) -> defaultdict:
"""Parse json file."""
main_dict1 = {
"barcode": "cell_id",
"contig_name": "sequence_id",
"sequence": "sequence",
"aa_sequence": "sequence_aa",
"productive": "productive",
"full_length": "complete_vdj",
"frame": "vj_in_frame",
"cdr3_seq": "junction",
"cdr3": "junction_aa",
}
main_dict2 = {
"read_count": "consensus_count",
"umi_count": "umi_count",
# "duplicate_count": "umi_count",
"cdr3_start": "cdr3_start",
"cdr3_stop": "cdr3_end",
}
main_dict3 = {
"high_confidence": "high_confidence_10x",
"filtered": "filtered_10x",
"is_gex_cell": "is_cell_10x",
"is_asm_cell": "is_asm_cell_10x",
}
info_dict = {
"raw_clonotype_id": "clone_id",
"raw_consensus_id": "raw_consensus_id_10x",
"exact_subclonotype_id": "exact_subclonotype_id_10x",
}
region_type_dict = {
"L-REGION+V-REGION": "v_call",
"D-REGION": "d_call",
"J-REGION": "j_call",
"C-REGION": "c_call",
}
required_calls = ["v_call", "d_call", "j_call", "c_call"]
region_keys = ["fwr1", "cdr1", "fwr2", "cdr2", "fwr3", "fwr4"]
out = defaultdict(OrderedDict)
for i in range(len(data)):
if data[i]["contig_name"] is not None:
key = data[i]["contig_name"]
else:
continue
for k in main_dict1.keys():
if k in data[i]:
if data[i][k] is not None:
out[key].update({main_dict1[k]: data[i][k]})
else:
out[key].update({main_dict1[k]: ""})
else:
out[key].update({main_dict1[k]: ""})
if data[i]["annotations"] is not None:
chains = []
for j in range(len(data[i]["annotations"])):
chains.append(data[i]["annotations"][j]["feature"]["chain"])
out[key].update({"locus": "|".join(list(set(chains)))})
for j in range(len(data[i]["annotations"])):
rtype = data[i]["annotations"][j]["feature"]["region_type"]
if rtype in region_type_dict.keys():
call = region_type_dict[rtype]
else:
continue
if (
data[i]["annotations"][j]["feature"]["gene_name"]
is not None
):
out[key].update(
{
call: data[i]["annotations"][j]["feature"][
"gene_name"
]
}
)
else:
out[key].update({call: ""})
for rc in required_calls:
if rc not in out[key]:
out[key].update({rc: ""})
for k in main_dict2.keys():
if k in data[i]:
if data[i][k] is not None:
out[key].update({main_dict2[k]: data[i][k]})
else:
out[key].update({main_dict2[k]: np.nan})
else:
out[key].update({main_dict2[k]: np.nan})
for rk in region_keys:
if rk in data[i]:
if data[i][rk] is not None:
for k in data[i][rk]:
if k == "start":
ka = rk + "_start"
elif k == "stop":
ka = rk + "_end"
elif k == "nt_seq":
ka = rk + ""
elif k == "aa_seq":
ka = rk + "_aa"
else:
continue
out[key].update({ka: data[i][rk][k]})
else:
for k in region_keys:
out[key].update({k + "_start": np.nan})
out[key].update({k + "_end": np.nan})
out[key].update({k + "": ""})
out[key].update({k + "_aa": ""})
else:
for k in region_keys:
out[key].update({k + "_start": np.nan})
out[key].update({k + "_end": np.nan})
out[key].update({k + "": ""})
out[key].update({k + "_aa": ""})
if data[i]["info"] is not None:
for info in data[i]["info"]:
if data[i]["info"][info] is not None:
out[key].update({info_dict[info]: data[i]["info"][info]})
else:
out[key].update({info_dict[info]: ""})
for k in main_dict3.keys():
if k in data[i]:
if data[i][k] is not None:
out[key].update({main_dict3[k]: data[i][k]})
else:
out[key].update({main_dict3[k]: ""})
else:
out[key].update({main_dict3[k]: ""})
return out
def parse_annotation(data: pd.DataFrame) -> defaultdict:
"""Parse annotation file."""
out = defaultdict(OrderedDict)
swap_dict = dict(zip(CELLRANGER, AIRR))
for _, row in data.iterrows():
contig = Contig(row, swap_dict).contig["sequence_id"]
out[contig] = Contig(row, swap_dict).contig
if pd.isna(out[contig]["locus"]) or out[contig]["locus"] in [
"None",
"none",
"",
]:
calls = []
for call in ["v_call", "d_call", "j_call", "c_call"]:
if pd.notna(out[contig][call]) and out[contig][call] not in [
"None",
"none",
"",
]:
calls.append(out[contig][call])
out[contig]["locus"] = "|".join(list({str(c)[:3] for c in calls}))
if pd.isna(out[contig]["locus"]) or out[contig]["locus"] in [
"None",
"",
]:
out[contig]["locus"] = "|"
return out
def _read_h5_group(filename: Path | str, group: str) -> pd.DataFrame:
"""
Read a specific group from an H5 file.
Parameters
----------
filename : Path | str
The path to the H5 file.
group : str
The group to read from the H5 file.
Returns
-------
pd.DataFrame
The data from the specified group as a pandas dataframe.
Raises
------
KeyError
If the specified group is not found in the H5 file.
"""
with h5py.File(filename, "r") as hf:
data_group = hf[group]
structured_data_array = data_group[:]
decoded = {}
if structured_data_array.dtype.names is not None:
for col in structured_data_array.dtype.names:
dtype = structured_data_array.dtype[col]
if dtype.char == "S": # Check if dtype is byte strings
# Decode byte strings
decoded.update(
{
col: np.array(
[
(
x.astype(str)
if isinstance(x, bytes)
else x
)
for x in structured_data_array[col]
]
)
}
)
else:
decoded.update({col: structured_data_array[col]})
# Create a DataFrame from the structured array
data = pd.DataFrame(decoded)
else:
data = np.array(
[
x.astype(str) if isinstance(x, bytes) else x
for x in structured_data_array
]
)
return data
def _read_h5_csr_matrix(
filename: Path | str, group: str, as_df: bool = True
) -> pd.DataFrame:
"""
Read a group from an H5 file originally stored as a compressed sparse matrix.
Parameters
----------
filename : Path | str
The path to the H5 file.
group : str
The group to read from the H5 file.
Returns
-------
pd.DataFrame
The data from the specified group as a pandas dataframe.
"""
with h5py.File(filename, "r") as f:
data = f[f"{group}/data"][:]
indices = f[f"{group}/indices"][:]
indptr = f[f"{group}/indptr"][:]
shape = tuple(f[f"{group}/shape"][:])
# Reconstruct CSR matrix
loaded_matrix = csr_matrix((data, indices, indptr), shape=shape)
if not as_df:
return loaded_matrix
df = pd.DataFrame(loaded_matrix.toarray())
df_col = _read_h5_group(filename, f"{group}/column")
df_index = _read_h5_group(filename, f"{group}/index")
df.columns = df_col
df.index = df_index
return df
def _create_graph(
adj: pd.DataFrame,
adjust_adjacency: int | float = 0,
fillna: int | float = 0,
) -> nx.Graph:
"""
Create a networkx graph from the given adjacency matrix.
Parameters
----------
adj : pd.DataFrame
The adjacency matrix to create the graph from.
adjust_adjacency : int | float, optional
The value to add to the graph by as a way to adjust the adjacency matrix. Defaults to 0.
fillna : int | float, optional
The value to fill NaN values with. Defaults to 0.
Returns
-------
nx.Graph
The created networkx graph.
"""
if adjust_adjacency != 0:
adj += adjust_adjacency
adj = adj.fillna(fillna)
g = nx.from_pandas_adjacency(adj)
if adjust_adjacency != 0:
for u, v, d in g.edges(data=True):
d["weight"] -= adjust_adjacency
return g
def _read_h5_dict(filename: Path | str, group: str) -> dict:
"""
Read a dictionary from an H5 file.
Parameters
----------
filename : Path | str
The path to the H5 file.
group : str
The group to read from the H5 file.
Returns
-------
dict
The dictionary data from the specified group.
"""
out_dict = {}
with h5py.File(filename, "r") as hf:
for k in hf[group].keys():
out_dict[k] = hf[group][k][:]
return out_dict
def _read_h5_zip(
filename: Path | str, group: str, key_group: str, value_group: str
) -> dict:
"""
Read two groups from an H5 file and return them as a dictionary.
Parameters
----------
filename : Path | str
The path to the H5 file.
group : str
The group to read from the H5 file.
key_group : str
The name of the group containing the keys.
value_group : str
The name of the group containing the values.
Returns
-------
dict
The dictionary data from the specified groups.
"""
with h5py.File(filename, "r") as hf:
keys = [key.decode("utf-8") for key in hf[f"{group}/{key_group}"][:]]
values = [
value.decode("utf-8") for value in hf[f"{group}/{value_group}"][:]
]
# Reconstruct the dictionary
out_dict = dict(zip(keys, values))
return out_dict
@deprecated(
deprecated_in="1.0.0",
removed_in="1.1.0",
details="legacy .h5ddl format will no longer be supported.",
)
def read_h5ddl_legacy(
filename: Path | str = "dandelion_data.h5ddl",
) -> Dandelion: # pragma: no cover
"""
Read in and returns a Dandelion class from .h5ddl format saved in legacy (version 3) format.
Parameters
----------
filename : Path | str, optional
path to `.h5ddl` file
Returns
-------
Dandelion
Dandelion object.
Raises
------
AttributeError
if `data` not found in `.h5ddl` file.
"""
data = decode(load_data(_read_h5_group_legacy(filename, group="data")))
# final decode to ensure all byte strings are converted to str
metadata = _read_h5_group_legacy(filename, group="metadata")
try:
g_0 = _read_h5_group_legacy(filename, group="graph/graph_0")
g_1 = _read_h5_group_legacy(filename, group="graph/graph_1")
graph0 = _create_graph(g_0, adjust_adjacency=1, fillna=0)
graph1 = _create_graph(g_1, adjust_adjacency=1, fillna=0)
graph = (graph0, graph1)
except KeyError:
pass
try:
layout0 = _read_h5_dict(filename, group="layout/layout_0")
layout1 = _read_h5_dict(filename, group="layout/layout_1")
layout = (layout0, layout1)
except KeyError:
pass
try:
germline = _read_h5_zip_legacy(filename, group="germline")
except KeyError:
pass
constructor = {}
constructor["data"] = data
if "metadata" in locals():
constructor["metadata"] = metadata
if "germline" in locals():
constructor["germline"] = germline
if "layout" in locals():
constructor["layout"] = layout
if "graph" in locals():
constructor["graph"] = graph
res = Dandelion(**constructor, verbose=False)
# ensure that the metadata is decoded
res._metadata = decode(res._metadata)
return res
def _read_h5_group_legacy(
filename: Path | str, group: str
) -> pd.DataFrame: # pragma: no cover
"""
Read a specific group from an H5 file in legacy (PyTables) format.
Supports both 'fixed' format (block-based with axis0/axis1/blockN_*)
and 'table' format (structured array with column metadata in attrs).
Parameters
----------
filename : Path | str
The path to the H5 file.
group : str
The group to read from the H5 file.
Returns
-------
pd.DataFrame
The data from the specified group as a pandas dataframe.
Raises
------
KeyError
If the specified group is not found in the H5 file.
"""
with h5py.File(filename, "r") as hf:
grp = hf[group]
if "table" in grp:
# Table format: structured array with column names in attrs
return _read_pytables_table_format(grp)
elif "axis0" in grp:
# Fixed format: block-based with axis0/axis1/blockN_*
return _read_pytables_fixed_format(grp)
else:
raise KeyError(f"Unrecognized legacy format for group '{group}'")
@deprecated(
deprecated_in="1.0.0",
removed_in="1.1.0",
details="legacy .h5ddl format will no longer be supported.",
)
def _read_pytables_fixed_format(
grp: h5py.Group,
) -> pd.DataFrame: # pragma: no cover
"""Read PyTables fixed format (block-based) from h5py Group."""
_decode_bytes = np.vectorize(
lambda x: x.decode("utf-8") if isinstance(x, bytes) else x
)
columns = _decode_bytes(grp["axis0"][:])
index = _decode_bytes(grp["axis1"][:])
# Collect all blocks
block_num = 0
frames = []
while f"block{block_num}_items" in grp:
items = _decode_bytes(grp[f"block{block_num}_items"][:])
vals_ds = grp[f"block{block_num}_values"]
if vals_ds.dtype == object:
# Object block: stored as pickled numpy array
raw = vals_ds[0]
values = pickle.loads(raw.tobytes())
else:
values = vals_ds[:]
df_block = pd.DataFrame(values, columns=items, index=index)
frames.append(df_block)
block_num += 1
if frames:
result = pd.concat(frames, axis=1)
# Reorder columns to match original order
result = result[columns]
else:
result = pd.DataFrame(index=index, columns=columns)
return result
@deprecated(
deprecated_in="1.0.0",
removed_in="1.1.0",
details="legacy .h5ddl format will no longer be supported.",
)
def _read_pytables_table_format(
grp: h5py.Group,
) -> pd.DataFrame: # pragma: no cover
"""Read PyTables table format (structured array) from h5py Group."""
table_ds = grp["table"]
table_data = table_ds[:]
attrs = dict(table_ds.attrs)
# Extract index
index_raw = table_data["index"]
index = np.array(
[x.decode("utf-8") if isinstance(x, bytes) else x for x in index_raw]
)
# Extract each values_block and its column names from attrs
frames = []
block_num = 0
while f"values_block_{block_num}_kind" in attrs:
kind_raw = attrs[f"values_block_{block_num}_kind"]
if isinstance(kind_raw, bytes):
kind_raw = kind_raw.decode("utf-8")
col_names = pickle.loads(kind_raw.encode("latin-1"))
field_name = f"values_block_{block_num}"
block_values = table_data[field_name]
# Decode byte strings if needed
if block_values.dtype.kind == "S" or block_values.dtype.kind == "O":
decoded = np.vectorize(
lambda x: x.decode("utf-8") if isinstance(x, bytes) else x
)(block_values)
df_block = pd.DataFrame(decoded, columns=col_names, index=index)
else:
df_block = pd.DataFrame(
block_values, columns=col_names, index=index
)
frames.append(df_block)
block_num += 1
if frames:
result = pd.concat(frames, axis=1)
else:
result = pd.DataFrame(index=index)
return result
@deprecated(
deprecated_in="1.0.0",
removed_in="1.1.0",
details="legacy .h5ddl format will no longer be supported.",
)
def _read_h5_zip_legacy(
filename: Path | str, group: str
) -> dict: # pragma: no cover
"""
Read two groups from an H5 file and return them as a dictionary.
Parameters
----------
filename : Path | str
The path to the H5 file.
group : str
The group to read from the H5 file.
key_group : str
The name of the group containing the keys.
value_group : str
The name of the group containing the values.
Returns
-------
dict
The dictionary data from the specified groups.
"""
with h5py.File(filename, "r") as hf:
out_dict = {}
for g in hf[group].attrs:
out_dict.update({g: hf[group].attrs[g]})
return out_dict
def write_airr(data: pd.DataFrame, save: Path | str) -> None:
"""Save as airr formatted file."""
data = sanitize_data(data)
data.to_csv(save, sep="\t", index=False)
def write_blastn(data: pd.DataFrame, save: Path | str) -> None:
"""Write blast output."""
data = sanitize_blastn(data)
data.to_csv(save, sep="\t", index=False)