from __future__ import annotations
import h5py
import json
import os
import re
import tempfile
import networkx as nx
import pandas as pd
import polars as pl
from collections import defaultdict, OrderedDict
from pathlib import Path
from scanpy import logging as logg
from scipy.sparse import csr_matrix
from dandelion.base.io._io import read_h5ddl as _read_h5ddl
from dandelion.polars.core._core import (
DandelionPolars,
load_polars,
SCHEMA_OVERRIDES,
)
from dandelion.utilities._utilities import (
DEFAULT_PREFIX,
CELLRANGER,
AIRR,
fasta_iterator,
open_zarr_group,
ZipStore,
)
[docs]
def read_zipddl(
filename: str,
distance_zarr: Path | str | None = None,
verbose: bool = False,
) -> DandelionPolars:
"""
Read a Dandelion object from a .zipddl file (hybrid Zarr v3 ZipStore container).
Parameters
----------
filename : str
path to `.zipddl` file.
distance_zarr : Path | str | None, optional
path to an external Zarr array for distances, if the distances were not
embedded in the .zipddl file. Auto-detected when not provided.
verbose : bool, optional
whether or not to print messages during creation of the DandelionPolars object.
Returns
-------
DandelionPolars
DandelionPolars object.
Raises
------
AttributeError
if `data` not found in the `.zipddl` file.
"""
store = ZipStore(filename, mode="r")
root = open_zarr_group(store, mode="r")
constructor = {}
# ---------------------------
# Tables: _data and _metadata as Polars LazyFrames
# ---------------------------
def load_parquet_lazy(
dataset_name: str,
) -> tuple[pl.LazyFrame, tempfile.NamedTemporaryFile]:
arr = root[f"tables/{dataset_name}"][:]
tmp = tempfile.NamedTemporaryFile(suffix=".parquet")
tmp.write(arr.tobytes())
tmp.flush()
# Polars lazy scan
return pl.scan_parquet(tmp.name), tmp # return tmp to keep file alive
cache_handles = {}
if "data.parquet" in root["tables"]:
data_lazy, data_tmp = load_parquet_lazy("data.parquet")
constructor["data"] = data_lazy
cache_handles["data"] = data_tmp
if "metadata.parquet" in root["tables"]:
metadata_lazy, metadata_tmp = load_parquet_lazy("metadata.parquet")
constructor["metadata"] = metadata_lazy
cache_handles["metadata"] = metadata_tmp
# ---------------------------
# Distances: Zarr arrays
# ---------------------------
embedded_distances_loaded = False
if "arrays" in root:
arr_grp = root["arrays"]
if "distances_data" in arr_grp:
distances = csr_matrix(
(
arr_grp["distances_data"][:],
arr_grp["distances_indices"][:],
arr_grp["distances_indptr"][:],
),
shape=tuple(arr_grp["distances_shape"][:]),
)
constructor["distances"] = distances
embedded_distances_loaded = True
elif "distances" in arr_grp:
# Wrap zarr array in dask for lazy access
import dask.array as da
zarr_arr = arr_grp["distances"]
constructor["distances"] = da.from_zarr(zarr_arr)
embedded_distances_loaded = True
if distance_zarr is not None:
import dask.array as da
if embedded_distances_loaded:
logg.warning(
f"Embedded distances found (in {filename}) and external Zarr "
f"path (distance_zarr={distance_zarr}) provided. "
f"Using external Zarr path to override embedded distances."
)
constructor["distances"] = da.from_zarr(
str(distance_zarr) + "/distance_matrix"
)
# ---------------------------
# Graphs: HDF5 blobs
# ---------------------------
if "graph" in root:
graph_group = root["graph"]
graphs = []
for key in sorted(graph_group.array_keys()):
arr = graph_group[key][:]
with tempfile.NamedTemporaryFile(suffix=".h5") as tmp_h5:
tmp_h5.write(arr.tobytes())
tmp_h5.flush()
# Use your helper
df = _read_h5_csr_matrix_zarr(tmp_h5.name, as_df=True)
g = _create_graph(df, adjust_adjacency=0, fillna=0)
graphs.append(g)
constructor["graph"] = tuple(graphs)
# ---------------------------
# Layout
# ---------------------------
if "layout" in root:
layout_grp = root["layout"]
layout = []
for key in sorted(layout_grp.keys()):
arr = layout_grp[key][:]
with tempfile.NamedTemporaryFile(suffix=".h5") as tmp_h5:
tmp_h5.write(arr.tobytes())
tmp_h5.flush()
with h5py.File(tmp_h5.name, "r") as hf:
layout_dict = {k: hf[k][:] for k in hf.keys()}
layout.append(layout_dict)
constructor["layout"] = tuple(layout)
# ---------------------------
# Germline
# ---------------------------
if "germline" in root:
arr = root["germline"]["germline.h5"][:]
with tempfile.NamedTemporaryFile(suffix=".h5") as tmp_h5:
tmp_h5.write(arr.tobytes())
tmp_h5.flush()
with h5py.File(tmp_h5.name, "r") as hf:
constructor["germline"] = _collect_datasets(hf)
# ---------------------------
# Construct Dandelion
# ---------------------------
res = DandelionPolars(
**constructor, verbose=verbose, cache_handles=cache_handles
)
return res
read = read_ddl = read_zipddl # alias
# helper function to read germline correctly
def _collect_datasets(hf: h5py.File, path=""):
datasets = {}
for key in hf.keys():
item = hf[key]
full_path = f"{path}/{key}" if path else key
if isinstance(item, h5py.Group):
datasets.update(_collect_datasets(item, full_path))
elif isinstance(item, h5py.Dataset):
if item.shape == ():
datasets[full_path] = item[()]
else:
datasets[full_path] = item[:]
return datasets
[docs]
def read_h5ddl(
filename: Path | str = "dandelion_data.h5ddl",
distance_zarr: Path | str | None = None,
verbose: bool = False,
) -> DandelionPolars:
"""
Read in and returns a Dandelion class from .h5ddl format.
All components (data, metadata, graph, distances, layout, germline) are
loaded and converted to their polars equivalents. If a companion .zarr
store exists alongside the .h5ddl file (same stem, .zarr extension) it is
picked up automatically as the distance array; this mirrors the behaviour
of write_h5ddl when distances are a dask array.
Parameters
----------
filename : Path | str, optional
path to `.h5ddl` file
distance_zarr : Path | str | None, optional
path to Zarr array for distances if computed lazily. Auto-detected
from a companion .zarr file when not provided.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
Returns
-------
DandelionPolars
DandelionPolars object.
Raises
------
AttributeError
if `data` not found in `.h5ddl` file.
"""
tmp = _read_h5ddl(filename, distance_zarr, verbose)
constructor = {}
# data: pandas → polars LazyFrame
# HDF5 stores missing float values as empty strings; replace before cast.
data_pd = tmp.data
for col in data_pd.columns:
if (
col in SCHEMA_OVERRIDES
and SCHEMA_OVERRIDES[col] == pl.Float64
and data_pd[col].dtype == object
):
data_pd[col] = data_pd[col].replace("", None)
constructor["data"] = pl.from_pandas(
data_pd, schema_overrides=SCHEMA_OVERRIDES
).lazy()
# metadata: pandas (index = cell_id barcodes) → polars DataFrame
# with cell_id as a regular column
if tmp.metadata is not None:
meta_pd = tmp.metadata.copy()
meta_pd.index.name = "cell_id"
constructor["metadata"] = pl.from_pandas(meta_pd.reset_index())
if tmp.graph is not None:
constructor["graph"] = tmp.graph
if tmp.distances is not None:
constructor["distances"] = tmp.distances
if tmp.layout is not None:
constructor["layout"] = tmp.layout
if tmp.germline is not None and len(tmp.germline) > 0:
constructor["germline"] = tmp.germline
return DandelionPolars(**constructor, verbose=verbose)
def _read_h5_csr_matrix_zarr(
filename: Path | str, as_df: bool = True
) -> pd.DataFrame:
"""
Read a group from an H5 file originally stored as a compressed sparse matrix.
Parameters
----------
filename : Path | str
The path to the H5 file.
Returns
-------
pd.DataFrame
The data from the specified group as a pandas dataframe.
"""
with h5py.File(filename, "r") as f:
data = f["data"][:]
indices = f["indices"][:]
indptr = f["indptr"][:]
shape = tuple(f["shape"][:])
# Reconstruct CSR matrix
loaded_matrix = csr_matrix((data, indices, indptr), shape=shape)
if not as_df:
return loaded_matrix
df = pd.DataFrame(loaded_matrix.toarray())
df_col = [
x.decode("utf-8") if isinstance(x, bytes) else x
for x in f["columns"][:]
]
df_index = [
x.decode("utf-8") if isinstance(x, bytes) else x
for x in f["index"][:]
]
df.columns = df_col
df.index = df_index
return df
def _create_graph(
adj: pd.DataFrame,
adjust_adjacency: int | float = 0,
fillna: int | float = 0,
) -> nx.Graph:
"""
Create a networkx graph from the given adjacency matrix.
Parameters
----------
adj : pd.DataFrame
The adjacency matrix to create the graph from.
adjust_adjacency : int | float, optional
The value to add to the graph by as a way to adjust the adjacency matrix. Defaults to 0.
fillna : int | float, optional
The value to fill NaN values with. Defaults to 0.
Returns
-------
nx.Graph
The created networkx graph.
"""
if adjust_adjacency != 0:
adj += adjust_adjacency
adj = adj.fillna(fillna)
g = nx.from_pandas_adjacency(adj)
if adjust_adjacency != 0:
for u, v, d in g.edges(data=True):
d["weight"] -= adjust_adjacency
return g
[docs]
def read_10x_vdj(
data: Path | str | pd.DataFrame | pl.DataFrame | pl.LazyFrame | None = None,
filename_prefix: str | None = None,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_malformed: bool = True,
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> DandelionPolars:
"""
A parser to read .csv and .json files directly from folder containing 10x cellranger-outputs,
or parse an existing pandas/polars DataFrame.
This function parses the 10x output files into an AIRR compatible format using Polars.
Minimum requirement is one of either {filename_prefix}_contig_annotations.csv or all_contig_annotations.json
when reading from file path.
If .fasta, .json files are found in the same folder, additional info will be appended to the final table.
Parameters
----------
data : Path | str | pandas.DataFrame | polars.DataFrame | polars.LazyFrame | None
path to folder containing `.csv` and/or `.json` files, path to files directly, or a pandas/polars
DataFrame containing the contig annotations data.
filename_prefix : str | None, optional
prefix of file name preceding '_contig'. None defaults to 'all'. Only used when data is a file/folder.
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_malformed : bool, optional
whether or not to remove malformed contigs.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the DandelionPolars object.
Returns
-------
DandelionPolars
DandelionPolars object holding the parsed data.
Raises
------
OSError
if contig_annotations.csv and all_contig_annotations.json file(s) not found in the input folder.
TypeError
if data is not a valid type (Path, str, DataFrame, or LazyFrame).
"""
def parse_annotation_polars(data: pl.DataFrame) -> pl.DataFrame:
"""Parse annotation file using Polars - fully vectorized."""
swap_dict = dict(zip(CELLRANGER, AIRR))
# Rename columns based on swap_dict
rename_map = {
k: v for k, v in swap_dict.items() if k in data.collect_schema()
}
data = data.rename(rename_map)
# Fill null values with empty strings for string columns
# Also replace string representations of None/NaN
for col in data.collect_schema():
if data[col].dtype in (pl.Utf8, pl.String, pl.Categorical):
data = data.with_columns(
pl.col(col)
.fill_null("")
.str.replace_all("^(None|none|nan|NaN)$", "")
.alias(col)
)
# Ensure gene call columns exist
gene_call_cols = ["v_call", "d_call", "j_call", "c_call"]
for col in gene_call_cols:
if col not in data.collect_schema():
data = data.with_columns(pl.lit("").alias(col))
if "locus" not in data.collect_schema():
data = data.with_columns(pl.lit("").alias("locus"))
# Create derived locus: extract first 3 chars from each gene call, combine unique
def derive_locus_from_calls(v, d, j, c):
"""Extract locus from gene calls."""
calls = []
for val in [v, d, j, c]:
if val and val not in ["None", "none", "", "nan"]:
calls.append(str(val)[:3])
return "|".join(sorted(set(calls))) if calls else "|"
data = data.with_columns(
pl.when(
(pl.col("locus").is_in(["None", "none", "", "nan", None]))
| pl.col("locus").is_null()
)
.then(
pl.struct(gene_call_cols).map_elements(
lambda x: derive_locus_from_calls(
x["v_call"], x["d_call"], x["j_call"], x["c_call"]
),
return_dtype=pl.String,
)
)
.otherwise(pl.col("locus"))
.alias("locus")
)
# Replace empty locus with "|"
data = data.with_columns(
pl.when(pl.col("locus") == "")
.then(pl.lit("|"))
.otherwise(pl.col("locus"))
.alias("locus")
)
return data
def parse_json_polars(data: list) -> pl.DataFrame:
"""Parse json file and return DataFrame."""
main_dict1 = {
"barcode": "cell_id",
"contig_name": "sequence_id",
"sequence": "sequence",
"aa_sequence": "sequence_aa",
"productive": "productive",
"full_length": "complete_vdj",
"frame": "vj_in_frame",
"cdr3_seq": "junction",
"cdr3": "junction_aa",
}
main_dict2 = {
"read_count": "consensus_count",
"umi_count": "umi_count",
"cdr3_start": "cdr3_start",
"cdr3_stop": "cdr3_end",
}
main_dict3 = {
"high_confidence": "high_confidence_10x",
"filtered": "filtered_10x",
"is_gex_cell": "is_cell_10x",
"is_asm_cell": "is_asm_cell_10x",
}
info_dict = {
"raw_clonotype_id": "clone_id",
"raw_consensus_id": "raw_consensus_id_10x",
"exact_subclonotype_id": "exact_subclonotype_id_10x",
}
region_type_dict = {
"L-REGION+V-REGION": "v_call",
"D-REGION": "d_call",
"J-REGION": "j_call",
}
out = defaultdict(OrderedDict)
for d in data:
# main level
for k in main_dict1:
if k in d:
out[d["contig_name"]].update({main_dict1[k]: d[k]})
for k in main_dict2:
if k in d:
out[d["contig_name"]].update({main_dict2[k]: d[k]})
for k in main_dict3:
if k in d:
out[d["contig_name"]].update({main_dict3[k]: d[k]})
# info level
if "info" in d:
for k in info_dict:
if k in d["info"]:
out[d["contig_name"]].update(
{info_dict[k]: d["info"][k]}
)
# annotation level
if "annotations" in d:
for dat in d["annotations"]:
if "feature" in dat:
if "region_type" in dat["feature"]:
region = dat["feature"]["region_type"]
if region in region_type_dict:
gene_name = dat["feature"]["gene_name"]
chain = dat["feature"]["chain"]
out[d["contig_name"]].update(
{region_type_dict[region]: gene_name}
)
out[d["contig_name"]].update({"locus": chain})
if "chain" in dat["feature"]:
if dat["feature"]["chain"] != "Multi":
chain = dat["feature"]["chain"]
out[d["contig_name"]].update({"locus": chain})
if "cdr3_seq" not in d:
if dat["feature"]["region_type"] == "CDR3":
if "cdr3_start" in dat:
out[d["contig_name"]].update(
{"cdr3_start": dat["cdr3_start"]}
)
if "cdr3_stop" in dat:
out[d["contig_name"]].update(
{"cdr3_end": dat["cdr3_stop"]}
)
if dat["feature"]["feature_id"] == 0:
if dat["feature"]["region_type"] == "5'UTR":
if "contig_match_start" in dat:
out[d["contig_name"]].update(
{
"fwr1_start": dat[
"contig_match_start"
]
}
)
if "region_type" in dat["feature"]:
if dat["feature"]["region_type"] == "C-REGION":
c_gene = dat["feature"]["gene_name"]
out[d["contig_name"]].update({"c_call": c_gene})
# Convert dict to DataFrame
return pl.DataFrame([v for v in out.values()])
# Handle DataFrame inputs (pandas or polars)
if isinstance(data, pd.DataFrame):
logg.info("Converting pandas DataFrame to polars DataFrame")
res = pl.from_pandas(data, schema_overrides=SCHEMA_OVERRIDES)
res = parse_annotation_polars(res)
elif isinstance(data, pl.LazyFrame):
logg.info("Converting polars LazyFrame to polars DataFrame")
res = data.collect(engine="streaming")
res = parse_annotation_polars(res)
elif isinstance(data, pl.DataFrame):
logg.info("Parsing polars DataFrame")
res = parse_annotation_polars(data)
elif isinstance(data, (str, Path)):
# Handle file path inputs
filename_pre = (
DEFAULT_PREFIX if filename_prefix is None else filename_prefix
)
if os.path.isdir(str(data)):
files = os.listdir(data)
filelist = []
for fx in files:
if re.search(filename_pre + "_contig", fx):
if fx.endswith(".fasta") or fx.endswith(".csv"):
filelist.append(fx)
if re.search(
f"{filename_pre.replace('filtered', 'all')}_contig_annotations",
fx,
):
if fx.endswith(".json"):
filelist.append(fx)
csv_idx = [i for i, j in enumerate(filelist) if j.endswith(".csv")]
json_idx = [
i for i, j in enumerate(filelist) if j.endswith(".json")
]
if len(csv_idx) == 1:
file = str(data) + "/" + str(filelist[csv_idx[0]])
logg.info("Reading {}".format(str(file)))
raw = pl.read_csv(str(file))
fasta_file = str(file).split("_annotations.csv")[0] + ".fasta"
json_file = re.sub(
filename_pre + "_contig_annotations",
f"{filename_pre.replace('filtered', 'all')}_contig_annotations",
str(file).split(".csv")[0] + ".json",
)
if os.path.exists(json_file):
logg.info(
"Found {} file. Extracting extra information.".format(
str(json_file)
)
)
# Parse CSV to DataFrame
out_df = parse_annotation_polars(raw)
# Parse JSON to DataFrame
with open(json_file) as f:
raw_json = json.load(f)
out_json_df = parse_json_polars(raw_json)
# Merge DataFrames
res = out_df.join(
out_json_df,
on="sequence_id",
how="outer",
suffix="_json",
)
# Coalesce columns that appear in both (prefer json version)
for col in out_json_df.columns:
if (
col != "sequence_id"
and f"{col}_json" in res.columns
):
res = res.with_columns(
pl.coalesce(
[pl.col(f"{col}_json"), pl.col(col)]
).alias(col)
).drop(f"{col}_json")
elif os.path.exists(fasta_file):
logg.info(
"Found {} file. Extracting extra information.".format(
str(fasta_file)
)
)
seqs = {}
fh = open(fasta_file)
for header, sequence in fasta_iterator(fh):
seqs[header] = sequence
# Add sequences using Polars
raw = raw.with_columns(
pl.col("contig_id")
.map_elements(
lambda x: seqs.get(x, ""), return_dtype=pl.String
)
.alias("sequence")
)
res = parse_annotation_polars(raw)
else:
res = parse_annotation_polars(raw)
elif len(csv_idx) < 1:
if len(json_idx) == 1:
json_file = str(data) + "/" + str(filelist[json_idx[0]])
logg.info("Reading {}".format(json_file))
if os.path.exists(json_file):
with open(json_file) as f:
raw = json.load(f)
res = parse_json_polars(raw)
else:
raise OSError(
"{}_contig_annotations.csv and {}_contig_annotations.json file(s) not found in {} folder.".format(
str(filename_pre),
filename_pre.replace("filtered", "all"),
str(data),
)
)
elif len(csv_idx) > 1:
raise OSError(
"There are multiple input .csv files with the same filename prefix {} in {} folder.".format(
str(filename_pre), str(data)
)
)
elif os.path.isfile(str(data)):
file = data
if str(file).endswith(".csv"):
logg.info("Reading {}.".format(str(file)))
raw = pl.read_csv(str(file))
fasta_file = str(file).split("_annotations.csv")[0] + ".fasta"
json_file = re.sub(
filename_pre + "_contig_annotations",
f"{filename_pre.replace('filtered', 'all')}_contig_annotations",
str(file).split(".csv")[0] + ".json",
)
if os.path.exists(json_file):
logg.info(
"Found {} file. Extracting extra information.".format(
str(json_file)
)
)
# Parse CSV to DataFrame
out_df = parse_annotation_polars(raw)
# Parse JSON to DataFrame
with open(json_file) as f:
raw_json = json.load(f)
out_json_df = parse_json_polars(raw_json)
# Merge DataFrames
res = out_df.join(
out_json_df,
on="sequence_id",
how="outer",
suffix="_json",
)
# Coalesce columns that appear in both (prefer json version)
for col in out_json_df.columns:
if (
col != "sequence_id"
and f"{col}_json" in res.columns
):
res = res.with_columns(
pl.coalesce(
[pl.col(f"{col}_json"), pl.col(col)]
).alias(col)
).drop(f"{col}_json")
elif os.path.exists(fasta_file):
logg.info(
"Found {} file. Extracting extra information.".format(
str(fasta_file)
)
)
seqs = {}
fh = open(fasta_file)
for header, sequence in fasta_iterator(fh):
seqs[header] = sequence
# Add sequences using Polars
raw = raw.with_columns(
pl.col("contig_id")
.map_elements(
lambda x: seqs.get(x, ""), return_dtype=pl.String
)
.alias("sequence")
)
res = parse_annotation_polars(raw)
else:
res = parse_annotation_polars(raw)
elif str(file).endswith(".json"):
if os.path.exists(file):
logg.info("Reading {}".format(file))
with open(file) as f:
raw = json.load(f)
res = parse_json_polars(raw)
else:
raise OSError("{} not found.".format(file))
else:
raise OSError("{} not found.".format(data))
else:
raise TypeError(
f"data must be a Path, str, pandas.DataFrame, polars.DataFrame, or polars.LazyFrame, got {type(data)}"
)
# Quick check if locus is malformed
if remove_malformed:
res = res.filter(~pl.col("locus").str.contains(r"\|"))
# Change all unknowns to blanks
for col in res.columns:
if res[col].dtype in (pl.Utf8, pl.String):
res = res.with_columns(
pl.col(col).str.replace("unknown", "").alias(col)
)
vdj = DandelionPolars(res, verbose=verbose)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_seekgene_vdj(
data: Path | str | pd.DataFrame | pl.DataFrame | pl.LazyFrame | None = None,
filename_prefix: str | None = None,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_malformed: bool = True,
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> DandelionPolars:
"""
A parser to read .csv and .json files directly from folder containing SeekGene VDJ outputs,
or parse an existing pandas/polars DataFrame.
SeekGene produces contig annotation files in the same format as 10x CellRanger VDJ output.
This function is a convenience wrapper around :func:`read_10x_vdj` with SeekGene-specific
naming for clarity.
Minimum requirement is one of either {filename_prefix}_contig_annotations.csv or
all_contig_annotations.json when reading from a file path.
If .fasta, .json files are found in the same folder, additional info will be appended to
the final table.
Parameters
----------
data : Path | str | pandas.DataFrame | polars.DataFrame | polars.LazyFrame | None
path to folder containing `.csv` and/or `.json` files, path to files directly, or a pandas/polars
DataFrame containing the contig annotations data.
filename_prefix : str | None, optional
prefix of file name preceding '_contig'. None defaults to 'all'. Only used when data is a file/folder.
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_malformed : bool, optional
whether or not to remove malformed contigs.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the DandelionPolars object.
Returns
-------
DandelionPolars
DandelionPolars object holding the parsed data.
Raises
------
OSError
if contig_annotations.csv and all_contig_annotations.json file(s) not found in the input folder.
TypeError
if data is not a valid type (Path, str, DataFrame, or LazyFrame).
"""
ddl = read_10x_vdj(
data=data,
filename_prefix=filename_prefix,
prefix=prefix,
suffix=suffix,
sep=sep,
remove_malformed=remove_malformed,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
verbose=verbose,
)
# SeekGene VDJ files share the same CSV/JSON format as 10x CellRanger, but the
# resulting internal columns should not carry a _10x suffix.
_10x_rename = {
"is_cell_10x": "is_cell",
"high_confidence_10x": "high_confidence",
"sequence_length_10x": "sequence_length",
"raw_consensus_id_10x": "raw_consensus_id",
"exact_subclonotype_id_10x": "exact_subclonotype_id",
"filtered_10x": "filtered",
"is_asm_cell_10x": "is_asm_cell",
}
existing_cols = (
ddl._data.collect_schema().names()
if isinstance(ddl._data, pl.LazyFrame)
else ddl._data.columns
)
rename_map = {k: v for k, v in _10x_rename.items() if k in existing_cols}
if rename_map:
ddl._data = ddl._data.rename(rename_map)
return ddl
[docs]
def read_airr(
file: Path | str,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> DandelionPolars:
"""
Reads a standard single-cell AIRR rearrangement file.
If you have non-single-cell data, use `.load_polars` first to load the data and then pass it to `DandelionPolars`.
That will tell you what columns are missing and you can fill it out accordingly e.g. make up a `cell_id`.
Parameters
----------
file : Path | str
path to AIRR rearrangement .tsv file.
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the Dandelion object.
Returns
-------
DandelionPolars
DandelionPolars object from AIRR file.
"""
vdj = DandelionPolars(file, verbose=verbose)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_bd_airr(
file: Path | str,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> DandelionPolars:
"""
Read the TCR or BCR `_AIRR.tsv` produced from BD Rhapsody technology.
Parameters
----------
file : Path | str
path to `_AIRR.tsv`
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the DandelionPolars object.
Returns
-------
DandelionPolars
DandelionPolars object from BD AIRR file.
"""
vdj = DandelionPolars(file, verbose=verbose)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_parse_airr(
file: Path | str,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
**kwargs,
) -> DandelionPolars:
"""
Read the TCR or BCR `_annotation_airr.tsv` produced from Parse Biosciences Evercode technology.
This is not to be used for any airr rearrangement file, but specifically for the one produced by Parse Biosciences.
For standard airr rearrangement files e.g. `all_contig_dandelion.tsv`, use `ddl.Dandelion` or `ddl.read_airr` directly.
Parameters
----------
file : Path | str
path to `_annotation_airr.tsv`
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the DandelionPolars object.
**kwargs
additional keyword arguments passed to DandelionPolars.
Returns
-------
DandelionPolars
DandelionPolars object from Parse AIRR file.
"""
data = load_polars(file) # should return LazyFrame
# Drop the wrong cell_id column if present
if "cell_id" in data.collect_schema():
data = data.drop(["cell_id"])
# Rename columns using polars expressions
rename_dict = {
"cell_barcode": "cell_id",
"read_count": "consensus_count",
"transcript_count": "umi_count",
"cdr3": "junction",
"cdr3_aa": "junction_aa",
}
for old, new in rename_dict.items():
if old in data.collect_schema():
data = data.with_columns(pl.col(old).alias(new)).drop([old])
data = data.collect()
vdj = DandelionPolars(data, verbose=verbose, **kwargs)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj
[docs]
def read_10x_airr(
file: Path | str,
prefix: str | None = None,
suffix: str | None = None,
sep: str = "_",
remove_trailing_hyphen_number: bool = False,
verbose: bool = False,
) -> DandelionPolars:
"""
Read the `airr_rearrangement.tsv` produced from Cell Ranger directly and returns a DandelionPolars object.
This is not to be used for any airr rearrangement file, but specifically for the one produced by 10x Genomics.
For standard airr rearrangement files e.g. `all_contig_dandelion.tsv`, use `Dandelion/DandelionPolars` or `read_airr` directly.
Parameters
----------
file : Path | str
path to `airr_rearrangement.tsv`
prefix : str | None, optional
Prefix to append to sequence_id and cell_id.
suffix : str | None, optional
Suffix to append to sequence_id and cell_id.
sep : str, optional
the separator to append suffix/prefix.
remove_trailing_hyphen_number : bool, optional
whether or not to remove the trailing hyphen number e.g. '-1' from the
cell/contig barcodes.
verbose : bool, optional
whether or not to print messages during creation of the DandelionPolars object.
Returns
-------
DandelionPolars
DandelionPolars object from 10x AIRR file.
"""
data = load_polars(file) # should return LazyFrame
# If locus column is missing, derive it using polars vectorized expressions
if "locus" not in data.collect_schema():
# Extract first 3 chars of each gene call, ignore nulls/empties, get unique, join with '|'
data = data.with_columns(
[
pl.concat_str(
[
pl.col("v_call")
.str.slice(0, 3)
.fill_null("")
.str.strip_chars(),
pl.col("d_call")
.str.slice(0, 3)
.fill_null("")
.str.strip_chars(),
pl.col("j_call")
.str.slice(0, 3)
.fill_null("")
.str.strip_chars(),
pl.col("c_call")
.str.slice(0, 3)
.fill_null("")
.str.strip_chars(),
],
separator="|",
)
.str.split("|")
.list.unique()
.list.eval(pl.element().filter(pl.element() != ""))
.list.join("|")
.alias("locus")
]
)
# Drop columns that are all missing
if isinstance(data, pl.DataFrame):
data = data.lazy()
cols_to_drop = [
col
for col in data.collect_schema()
if data.select(pl.col(col).is_null().all()).collect().item()
]
if cols_to_drop:
data = data.drop(cols_to_drop)
data = data.collect()
vdj = DandelionPolars(data, verbose=verbose)
if suffix is not None:
vdj.add_sequence_suffix(
suffix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
elif prefix is not None:
vdj.add_sequence_prefix(
prefix,
sep=sep,
remove_trailing_hyphen_number=remove_trailing_hyphen_number,
)
return vdj