Source code for dandelion.base.io._io

import h5py
import json
import os
import pickle
import re

import networkx as nx
import numpy as np
import pandas as pd

from collections import defaultdict, OrderedDict
from pathlib import Path
from scanpy import logging as logg
from scipy.sparse import csr_matrix

from dandelion.base.core._core import Dandelion, load_data
from dandelion.utilities._utilities import (
    DEFAULT_PREFIX,
    CELLRANGER,
    AIRR,
    deprecated,
    all_missing,
    sanitize_blastn,
    sanitize_data,
    Contig,
    fasta_iterator,
)


def _load_distance_zarr(distance_zarr: Path | str):
    """Load lazy distance array from a user/path-compatible Zarr location."""
    import dask.array as da

    base = str(distance_zarr).rstrip("/\\")
    if base.lower().endswith(".zarr"):
        candidates = [f"{base}/distance_matrix", base]
    else:
        candidates = [
            f"{base}/distance_matrix.zarr/distance_matrix",
            f"{base}/distance_matrix.zarr",
            f"{base}/distance_matrix",
        ]

    last_error = None
    for candidate in candidates:
        try:
            return da.from_zarr(candidate)
        except Exception as exc:
            last_error = exc

    raise ValueError(
        f"Could not load distances from distance_zarr={distance_zarr}. "
        f"Tried: {candidates}"
    ) from last_error


def decode(df):
    """Decode byte strings in a DataFrame."""
    for col in df:
        if df[col].dtype == object:
            df[col] = df[col].apply(
                lambda x: x.decode("utf-8") if isinstance(x, bytes) else x
            )
    return df


[docs] def read_h5ddl( filename: Path | str = "dandelion_data.h5ddl", distance_zarr: Path | str | None = None, verbose: bool = False, ) -> Dandelion: """ Read in and returns a Dandelion class from .h5ddl format. Parameters ---------- filename : Path | str, optional path to `.h5ddl` file distance_zarr : Path | str | None, optional path to Zarr array for distances if computed lazily. verbose : bool, optional whether or not to print messages during creation of the Dandelion object. Returns ------- Dandelion Dandelion object. Raises ------ AttributeError if `data` not found in `.h5ddl` file. """ # Auto-detect alongside zarr written by write_h5ddl for dask distances if distance_zarr is None: auto_zarr = Path(filename).with_suffix(".zarr") if auto_zarr.exists(): distance_zarr = auto_zarr # Detect legacy (PyTables) vs new format with h5py.File(filename, "r") as hf: is_legacy = isinstance(hf["data"], h5py.Group) if is_legacy: # pragma: no cover data = decode(load_data(_read_h5_group_legacy(filename, group="data"))) metadata = _read_h5_group_legacy(filename, group="metadata") try: g_0 = _read_h5_group_legacy(filename, group="graph/graph_0") g_1 = _read_h5_group_legacy(filename, group="graph/graph_1") graph0 = _create_graph(g_0, adjust_adjacency=1, fillna=0) graph1 = _create_graph(g_1, adjust_adjacency=1, fillna=0) graph = (graph0, graph1) except KeyError: pass try: distances = _read_h5_csr_matrix( filename, group="distances", as_df=False ) distances._index_names = metadata.index except KeyError: if distance_zarr is not None: # pragma: no cover distances = _load_distance_zarr(distance_zarr) try: layout0 = _read_h5_dict(filename, group="layout/layout_0") layout1 = _read_h5_dict(filename, group="layout/layout_1") layout = (layout0, layout1) except KeyError: pass try: germline = _read_h5_zip_legacy(filename, group="germline") except KeyError: pass else: data = decode(load_data(_read_h5_group(filename, group="data"))) # final decode to ensure all byte strings are converted to str metadata = _read_h5_group(filename, group="metadata") metadata_names = _read_h5_group(filename, group="metadata_names") metadata.index = metadata_names try: g_0 = _read_h5_csr_matrix( filename, group="graph/graph_0", as_df=True ) g_1 = _read_h5_csr_matrix( filename, group="graph/graph_1", as_df=True ) graph0 = _create_graph(g_0, adjust_adjacency=1, fillna=0) graph1 = _create_graph(g_1, adjust_adjacency=1, fillna=0) graph = (graph0, graph1) except KeyError: pass try: distances = _read_h5_csr_matrix( filename, group="distances", as_df=False ) distances._index_names = metadata.index except KeyError: if distance_zarr is not None: distances = _load_distance_zarr(distance_zarr) try: layout0 = _read_h5_dict(filename, group="layout/layout_0") layout1 = _read_h5_dict(filename, group="layout/layout_1") layout = (layout0, layout1) except KeyError: pass try: germline = _read_h5_zip( filename, group="germline", key_group="keys", value_group="values", ) except KeyError: pass constructor = {} constructor["data"] = data if "metadata" in locals(): constructor["metadata"] = metadata if "germline" in locals(): constructor["germline"] = germline if "layout" in locals(): constructor["layout"] = layout if "graph" in locals(): constructor["graph"] = graph if "distances" in locals(): constructor["distances"] = distances res = Dandelion(**constructor, verbose=verbose) # ensure that the metadata is decoded res._metadata = decode(res._metadata) return res
read = read_ddl = read_h5ddl # alias
[docs] def read_airr( file: Path | str, prefix: str | None = None, suffix: str | None = None, sep: str = "_", remove_trailing_hyphen_number: bool = False, verbose: bool = False, ) -> Dandelion: """ Reads a standard single-cell AIRR rearrangement file. If you have non-single-cell data, use `.load_data` first to load the data and then pass it to `ddl.Dandelion`. That will tell you what columns are missing and you can fill it out accordingly e.g. make up a `cell_id`. Parameters ---------- file : Path | str path to AIRR rearrangement .tsv file. prefix : str | None, optional Prefix to append to sequence_id and cell_id. suffix : str | None, optional Suffix to append to sequence_id and cell_id. sep : str, optional the separator to append suffix/prefix. remove_trailing_hyphen_number : bool, optional whether or not to remove the trailing hyphen number e.g. '-1' from the cell/contig barcodes. verbose : bool, optional whether or not to print messages during creation of the Dandelion object. Returns ------- Dandelion Dandelion object from AIRR file. """ vdj = Dandelion(file, verbose=verbose) if suffix is not None: vdj.add_sequence_suffix( suffix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) elif prefix is not None: vdj.add_sequence_prefix( prefix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) return vdj
[docs] def read_bd_airr( file: Path | str, prefix: str | None = None, suffix: str | None = None, sep: str = "_", remove_trailing_hyphen_number: bool = False, verbose: bool = False, ) -> Dandelion: """ Read the TCR or BCR `_AIRR.tsv` produced from BD Rhapsody technology. Parameters ---------- file : Path | str path to `_AIRR.tsv` prefix : str | None, optional Prefix to append to sequence_id and cell_id. suffix : str | None, optional Suffix to append to sequence_id and cell_id. sep : str, optional the separator to append suffix/prefix. remove_trailing_hyphen_number : bool, optional whether or not to remove the trailing hyphen number e.g. '-1' from the cell/contig barcodes. verbose : bool, optional whether or not to print messages during creation of the Dandelion object. Returns ------- Dandelion Dandelion object from BD AIRR file. """ vdj = Dandelion(file, verbose=verbose) if suffix is not None: vdj.add_sequence_suffix( suffix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) elif prefix is not None: vdj.add_sequence_prefix( prefix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) return vdj
[docs] def read_parse_airr( file: Path | str, prefix: str | None = None, suffix: str | None = None, sep: str = "_", remove_trailing_hyphen_number: bool = False, verbose: bool = False, **kwargs, ) -> Dandelion: """ Read the TCR or BCR `_annotation_airr.tsv` produced from Parse Biosciences Evercode technology. This is not to be used for any airr rearrangement file, but specifically for the one produced by Parse Biosciences. For standard airr rearrangement files e.g. `all_contig_dandelion.tsv`, use `ddl.Dandelion` or `ddl.read_airr` directly. Parameters ---------- file : Path | str path to `_annotation_airr.tsv` prefix : str | None, optional Prefix to append to sequence_id and cell_id. suffix : str | None, optional Suffix to append to sequence_id and cell_id. sep : str, optional the separator to append suffix/prefix. remove_trailing_hyphen_number : bool, optional whether or not to remove the trailing hyphen number e.g. '-1' from the cell/contig barcodes. verbose : bool, optional whether or not to print messages during creation of the Dandelion object. **kwargs additional keyword arguments passed to Dandelion. Returns ------- Dandelion Dandelion object from Parse AIRR file. """ data = load_data(file) data.drop("cell_id", axis=1, inplace=True) # it's the wrong cell_id data = data.rename( columns={ "cell_barcode": "cell_id", "read_count": "consensus_count", "transcript_count": "umi_count", "cdr3": "junction", "cdr3_aa": "junction_aa", } ) vdj = Dandelion(data, verbose=verbose, **kwargs) if suffix is not None: vdj.add_sequence_suffix( suffix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) elif prefix is not None: vdj.add_sequence_prefix( prefix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) return vdj
[docs] def read_10x_airr( file: Path | str, prefix: str | None = None, suffix: str | None = None, sep: str = "_", remove_trailing_hyphen_number: bool = False, verbose: bool = False, ) -> Dandelion: """ Read the `airr_rearrangement.tsv` produced from Cell Ranger directly and returns a Dandelion object. This is not to be used for any airr rearrangement file, but specifically for the one produced by 10x Genomics. For standard airr rearrangement files e.g. `all_contig_dandelion.tsv`, use `ddl.Dandelion` or `ddl.read_airr` directly. Parameters ---------- file : Path | str path to `airr_rearrangement.tsv` prefix : str | None, optional Prefix to append to sequence_id and cell_id. suffix : str | None, optional Suffix to append to sequence_id and cell_id. sep : str, optional the separator to append suffix/prefix. remove_trailing_hyphen_number : bool, optional whether or not to remove the trailing hyphen number e.g. '-1' from the cell/contig barcodes. verbose : bool, optional whether or not to print messages during creation of the Dandelion object. Returns ------- Dandelion Dandelion object from 10x AIRR file. """ dat = load_data(file) # get all the v,d,j,c calls if "locus" not in dat: tmp = [ (v, d, j, c) for v, d, j, c in zip( dat["v_call"], dat["d_call"], dat["j_call"], dat["c_call"] ) ] locus = [] for t in tmp: if all("IGH" in x for x in t if pd.notnull(x)): locus.append("IGH") elif all("IGK" in x for x in t if pd.notnull(x)): locus.append("IGK") elif all("IGL" in x for x in t if pd.notnull(x)): locus.append("IGL") elif all("TRA" in x for x in t if pd.notnull(x)): locus.append("TRA") elif all("TRB" in x for x in t if pd.notnull(x)): locus.append("TRB") elif all("TRD" in x for x in t if pd.notnull(x)): locus.append("TRD") elif all("TRG" in x for x in t if pd.notnull(x)): locus.append("TRG") else: locus.append(np.nan) dat["locus"] = locus null_columns = [col for col in dat.columns if all_missing(dat[col])] if len(null_columns) > 0: dat.drop(null_columns, inplace=True, axis=1) vdj = Dandelion(dat, verbose=verbose) if suffix is not None: vdj.add_sequence_suffix( suffix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) elif prefix is not None: vdj.add_sequence_prefix( prefix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) return vdj
[docs] def read_10x_vdj( data: Path | str | pd.DataFrame, filename_prefix: str | None = None, prefix: str | None = None, suffix: str | None = None, sep: str = "_", remove_malformed: bool = True, remove_trailing_hyphen_number: bool = False, verbose: bool = False, ) -> Dandelion: """ A parser to read .csv and .json files directly from folder containing 10x cellranger-outputs, or parse an existing pandas DataFrame. This function parses the 10x output files into an AIRR compatible format. Minimum requirement is one of either {filename_prefix}_contig_annotations.csv or all_contig_annotations.json when reading from file path. If .fasta, .json files are found in the same folder, additional info will be appended to the final table. Parameters ---------- data : Path | str | pandas.DataFrame path to folder containing `.csv` and/or `.json` files, path to files directly, or a pandas DataFrame containing the contig annotations data. filename_prefix : str | None, optional prefix of file name preceding '_contig'. None defaults to 'all'. Only used when data is a file/folder. prefix : str | None, optional Prefix to append to sequence_id and cell_id. suffix : str | None, optional Suffix to append to sequence_id and cell_id. sep : str, optional the separator to append suffix/prefix. remove_malformed : bool, optional whether or not to remove malformed contigs. remove_trailing_hyphen_number : bool, optional whether or not to remove the trailing hyphen number e.g. '-1' from the cell/contig barcodes. verbose : bool, optional whether or not to print messages during creation of the Dandelion object. Returns ------- Dandelion Dandelion object holding the parsed data. Raises ------ OSError if contig_annotations.csv and all_contig_annotations.json file(s) not found in the input folder. TypeError if data is not a valid type (Path, str, or pandas.DataFrame). """ filename_pre = ( DEFAULT_PREFIX if filename_prefix is None else filename_prefix ) # Handle DataFrame input (pandas) if isinstance(data, pd.DataFrame): logg.info("Parsing pandas DataFrame") raw = data.copy() raw.set_index("contig_id", drop=False, inplace=True) out = parse_annotation(raw) elif isinstance(data, (str, Path)): # Handle file path inputs if os.path.isdir(str(data)): files = os.listdir(data) filelist = [] for fx in files: if re.search(filename_pre + "_contig", fx): if fx.endswith(".fasta") or fx.endswith(".csv"): filelist.append(fx) if re.search( f"{filename_pre.replace('filtered', 'all')}_contig_annotations", fx, ): if fx.endswith(".json"): filelist.append(fx) csv_idx = [i for i, j in enumerate(filelist) if j.endswith(".csv")] json_idx = [ i for i, j in enumerate(filelist) if j.endswith(".json") ] if len(csv_idx) == 1: file = str(data) + "/" + str(filelist[csv_idx[0]]) logg.info("Reading {}".format(str(file))) raw = pd.read_csv(str(file)) raw.set_index("contig_id", drop=False, inplace=True) fasta_file = str(file).split("_annotations.csv")[0] + ".fasta" json_file = re.sub( filename_pre + "_contig_annotations", f"{filename_pre.replace('filtered', 'all')}_contig_annotations", str(file).split(".csv")[0] + ".json", ) if os.path.exists(json_file): logg.info( "Found {} file. Extracting extra information.".format( str(json_file) ) ) out = parse_annotation(raw) with open(json_file) as f: raw_json = json.load(f) out_json = parse_json(raw_json) out.update(out_json) elif os.path.exists(fasta_file): logg.info( "Found {} file. Extracting extra information.".format( str(fasta_file) ) ) seqs = {} fh = open(fasta_file) for header, sequence in fasta_iterator(fh): seqs[header] = sequence raw["sequence"] = pd.Series(seqs) out = parse_annotation(raw) else: out = parse_annotation(raw) elif len(csv_idx) < 1: if len(json_idx) == 1: json_file = str(data) + "/" + str(filelist[json_idx[0]]) logg.info("Reading {}".format(json_file)) if os.path.exists(json_file): with open(json_file) as f: raw = json.load(f) out = parse_json(raw) else: raise OSError( "{}_contig_annotations.csv and {}_contig_annotations.json file(s) not found in {} folder.".format( str(filename_pre), filename_pre.replace("filtered", "all"), str(data), ) ) elif len(csv_idx) > 1: raise OSError( "There are multiple input .csv files with the same filename prefix {} in {} folder.".format( str(filename_pre), str(data) ) ) elif os.path.isfile(str(data)): file = data if str(file).endswith(".csv"): logg.info("Reading {}.".format(str(file))) raw = pd.read_csv(str(file)) raw.set_index("contig_id", drop=False, inplace=True) fasta_file = str(file).split("_annotations.csv")[0] + ".fasta" json_file = re.sub( filename_pre + "_contig_annotations", f"{filename_pre.replace('filtered', 'all')}_contig_annotations", str(file).split(".csv")[0] + ".json", ) if os.path.exists(json_file): logg.info( "Found {} file. Extracting extra information.".format( str(json_file) ) ) out = parse_annotation(raw) with open(json_file) as f: raw_json = json.load(f) out_json = parse_json(raw_json) out.update(out_json) elif os.path.exists(fasta_file): logg.info( "Found {} file. Extracting extra information.".format( str(fasta_file) ) ) seqs = {} fh = open(fasta_file) for header, sequence in fasta_iterator(fh): seqs[header] = sequence raw["sequence"] = pd.Series(seqs) out = parse_annotation(raw) else: out = parse_annotation(raw) elif str(file).endswith(".json"): if os.path.exists(file): logg.info("Reading {}".format(file)) with open(file) as f: raw = json.load(f) out = parse_json(raw) else: raise OSError("{} not found.".format(file)) else: raise OSError("{} not found.".format(data)) else: raise TypeError( f"data must be a Path, str, or pandas.DataFrame, got {type(data)}" ) res = pd.DataFrame.from_dict(out, orient="index") # quick check if locus is malformed if remove_malformed: res = res[~res["locus"].str.contains("[|]")] # change all unknowns to blanks res.replace("unknown", "", inplace=True) vdj = Dandelion(res, verbose=verbose) if suffix is not None: vdj.add_sequence_suffix( suffix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) elif prefix is not None: vdj.add_sequence_prefix( prefix, sep=sep, remove_trailing_hyphen_number=remove_trailing_hyphen_number, ) return vdj
[docs] def read_seekgene_vdj( data: Path | str | pd.DataFrame, filename_prefix: str | None = None, prefix: str | None = None, suffix: str | None = None, sep: str = "_", remove_malformed: bool = True, remove_trailing_hyphen_number: bool = False, verbose: bool = False, ) -> Dandelion: """ A parser to read .csv and .json files directly from folder containing SeekGene VDJ outputs, or parse an existing pandas DataFrame. SeekGene produces contig annotation files in the same format as 10x CellRanger VDJ output. This function is a convenience wrapper around :func:`read_10x_vdj` with SeekGene-specific naming for clarity. Minimum requirement is one of either {filename_prefix}_contig_annotations.csv or all_contig_annotations.json when reading from a file path. If .fasta, .json files are found in the same folder, additional info will be appended to the final table. Parameters ---------- data : Path | str | pandas.DataFrame path to folder containing `.csv` and/or `.json` files, path to files directly, or a pandas DataFrame containing the contig annotations data. filename_prefix : str | None, optional prefix of file name preceding '_contig'. None defaults to 'all'. Only used when data is a file/folder. prefix : str | None, optional Prefix to append to sequence_id and cell_id. suffix : str | None, optional Suffix to append to sequence_id and cell_id. sep : str, optional the separator to append suffix/prefix. remove_malformed : bool, optional whether or not to remove malformed contigs. remove_trailing_hyphen_number : bool, optional whether or not to remove the trailing hyphen number e.g. '-1' from the cell/contig barcodes. verbose : bool, optional whether or not to print messages during creation of the Dandelion object. Returns ------- Dandelion Dandelion object holding the parsed data. Raises ------ OSError if contig_annotations.csv and all_contig_annotations.json file(s) not found in the input folder. TypeError if data is not a valid type (Path, str, or pandas.DataFrame). """ ddl = read_10x_vdj( data=data, filename_prefix=filename_prefix, prefix=prefix, suffix=suffix, sep=sep, remove_malformed=remove_malformed, remove_trailing_hyphen_number=remove_trailing_hyphen_number, verbose=verbose, ) # SeekGene VDJ files share the same CSV/JSON format as 10x CellRanger, but the # resulting internal columns should not carry a _10x suffix. _10x_rename = { "is_cell_10x": "is_cell", "high_confidence_10x": "high_confidence", "sequence_length_10x": "sequence_length", "raw_consensus_id_10x": "raw_consensus_id", "exact_subclonotype_id_10x": "exact_subclonotype_id", "filtered_10x": "filtered", "is_asm_cell_10x": "is_asm_cell", } rename_map = {k: v for k, v in _10x_rename.items() if k in ddl.data.columns} if rename_map: ddl._data = ddl.data.rename(columns=rename_map) return ddl
def parse_json(data: list) -> defaultdict: """Parse json file.""" main_dict1 = { "barcode": "cell_id", "contig_name": "sequence_id", "sequence": "sequence", "aa_sequence": "sequence_aa", "productive": "productive", "full_length": "complete_vdj", "frame": "vj_in_frame", "cdr3_seq": "junction", "cdr3": "junction_aa", } main_dict2 = { "read_count": "consensus_count", "umi_count": "umi_count", # "duplicate_count": "umi_count", "cdr3_start": "cdr3_start", "cdr3_stop": "cdr3_end", } main_dict3 = { "high_confidence": "high_confidence_10x", "filtered": "filtered_10x", "is_gex_cell": "is_cell_10x", "is_asm_cell": "is_asm_cell_10x", } info_dict = { "raw_clonotype_id": "clone_id", "raw_consensus_id": "raw_consensus_id_10x", "exact_subclonotype_id": "exact_subclonotype_id_10x", } region_type_dict = { "L-REGION+V-REGION": "v_call", "D-REGION": "d_call", "J-REGION": "j_call", "C-REGION": "c_call", } required_calls = ["v_call", "d_call", "j_call", "c_call"] region_keys = ["fwr1", "cdr1", "fwr2", "cdr2", "fwr3", "fwr4"] out = defaultdict(OrderedDict) for i in range(len(data)): if data[i]["contig_name"] is not None: key = data[i]["contig_name"] else: continue for k in main_dict1.keys(): if k in data[i]: if data[i][k] is not None: out[key].update({main_dict1[k]: data[i][k]}) else: out[key].update({main_dict1[k]: ""}) else: out[key].update({main_dict1[k]: ""}) if data[i]["annotations"] is not None: chains = [] for j in range(len(data[i]["annotations"])): chains.append(data[i]["annotations"][j]["feature"]["chain"]) out[key].update({"locus": "|".join(list(set(chains)))}) for j in range(len(data[i]["annotations"])): rtype = data[i]["annotations"][j]["feature"]["region_type"] if rtype in region_type_dict.keys(): call = region_type_dict[rtype] else: continue if ( data[i]["annotations"][j]["feature"]["gene_name"] is not None ): out[key].update( { call: data[i]["annotations"][j]["feature"][ "gene_name" ] } ) else: out[key].update({call: ""}) for rc in required_calls: if rc not in out[key]: out[key].update({rc: ""}) for k in main_dict2.keys(): if k in data[i]: if data[i][k] is not None: out[key].update({main_dict2[k]: data[i][k]}) else: out[key].update({main_dict2[k]: np.nan}) else: out[key].update({main_dict2[k]: np.nan}) for rk in region_keys: if rk in data[i]: if data[i][rk] is not None: for k in data[i][rk]: if k == "start": ka = rk + "_start" elif k == "stop": ka = rk + "_end" elif k == "nt_seq": ka = rk + "" elif k == "aa_seq": ka = rk + "_aa" else: continue out[key].update({ka: data[i][rk][k]}) else: for k in region_keys: out[key].update({k + "_start": np.nan}) out[key].update({k + "_end": np.nan}) out[key].update({k + "": ""}) out[key].update({k + "_aa": ""}) else: for k in region_keys: out[key].update({k + "_start": np.nan}) out[key].update({k + "_end": np.nan}) out[key].update({k + "": ""}) out[key].update({k + "_aa": ""}) if data[i]["info"] is not None: for info in data[i]["info"]: if data[i]["info"][info] is not None: out[key].update({info_dict[info]: data[i]["info"][info]}) else: out[key].update({info_dict[info]: ""}) for k in main_dict3.keys(): if k in data[i]: if data[i][k] is not None: out[key].update({main_dict3[k]: data[i][k]}) else: out[key].update({main_dict3[k]: ""}) else: out[key].update({main_dict3[k]: ""}) return out def parse_annotation(data: pd.DataFrame) -> defaultdict: """Parse annotation file.""" out = defaultdict(OrderedDict) swap_dict = dict(zip(CELLRANGER, AIRR)) for _, row in data.iterrows(): contig = Contig(row, swap_dict).contig["sequence_id"] out[contig] = Contig(row, swap_dict).contig if pd.isna(out[contig]["locus"]) or out[contig]["locus"] in [ "None", "none", "", ]: calls = [] for call in ["v_call", "d_call", "j_call", "c_call"]: if pd.notna(out[contig][call]) and out[contig][call] not in [ "None", "none", "", ]: calls.append(out[contig][call]) out[contig]["locus"] = "|".join(list({str(c)[:3] for c in calls})) if pd.isna(out[contig]["locus"]) or out[contig]["locus"] in [ "None", "", ]: out[contig]["locus"] = "|" return out def _read_h5_group(filename: Path | str, group: str) -> pd.DataFrame: """ Read a specific group from an H5 file. Parameters ---------- filename : Path | str The path to the H5 file. group : str The group to read from the H5 file. Returns ------- pd.DataFrame The data from the specified group as a pandas dataframe. Raises ------ KeyError If the specified group is not found in the H5 file. """ with h5py.File(filename, "r") as hf: data_group = hf[group] structured_data_array = data_group[:] decoded = {} if structured_data_array.dtype.names is not None: for col in structured_data_array.dtype.names: dtype = structured_data_array.dtype[col] if dtype.char == "S": # Check if dtype is byte strings # Decode byte strings decoded.update( { col: np.array( [ ( x.astype(str) if isinstance(x, bytes) else x ) for x in structured_data_array[col] ] ) } ) else: decoded.update({col: structured_data_array[col]}) # Create a DataFrame from the structured array data = pd.DataFrame(decoded) else: data = np.array( [ x.astype(str) if isinstance(x, bytes) else x for x in structured_data_array ] ) return data def _read_h5_csr_matrix( filename: Path | str, group: str, as_df: bool = True ) -> pd.DataFrame: """ Read a group from an H5 file originally stored as a compressed sparse matrix. Parameters ---------- filename : Path | str The path to the H5 file. group : str The group to read from the H5 file. Returns ------- pd.DataFrame The data from the specified group as a pandas dataframe. """ with h5py.File(filename, "r") as f: data = f[f"{group}/data"][:] indices = f[f"{group}/indices"][:] indptr = f[f"{group}/indptr"][:] shape = tuple(f[f"{group}/shape"][:]) # Reconstruct CSR matrix loaded_matrix = csr_matrix((data, indices, indptr), shape=shape) if not as_df: return loaded_matrix df = pd.DataFrame(loaded_matrix.toarray()) df_col = _read_h5_group(filename, f"{group}/column") df_index = _read_h5_group(filename, f"{group}/index") df.columns = df_col df.index = df_index return df def _create_graph( adj: pd.DataFrame, adjust_adjacency: int | float = 0, fillna: int | float = 0, ) -> nx.Graph: """ Create a networkx graph from the given adjacency matrix. Parameters ---------- adj : pd.DataFrame The adjacency matrix to create the graph from. adjust_adjacency : int | float, optional The value to add to the graph by as a way to adjust the adjacency matrix. Defaults to 0. fillna : int | float, optional The value to fill NaN values with. Defaults to 0. Returns ------- nx.Graph The created networkx graph. """ if adjust_adjacency != 0: adj += adjust_adjacency adj = adj.fillna(fillna) g = nx.from_pandas_adjacency(adj) if adjust_adjacency != 0: for u, v, d in g.edges(data=True): d["weight"] -= adjust_adjacency return g def _read_h5_dict(filename: Path | str, group: str) -> dict: """ Read a dictionary from an H5 file. Parameters ---------- filename : Path | str The path to the H5 file. group : str The group to read from the H5 file. Returns ------- dict The dictionary data from the specified group. """ out_dict = {} with h5py.File(filename, "r") as hf: for k in hf[group].keys(): out_dict[k] = hf[group][k][:] return out_dict def _read_h5_zip( filename: Path | str, group: str, key_group: str, value_group: str ) -> dict: """ Read two groups from an H5 file and return them as a dictionary. Parameters ---------- filename : Path | str The path to the H5 file. group : str The group to read from the H5 file. key_group : str The name of the group containing the keys. value_group : str The name of the group containing the values. Returns ------- dict The dictionary data from the specified groups. """ with h5py.File(filename, "r") as hf: keys = [key.decode("utf-8") for key in hf[f"{group}/{key_group}"][:]] values = [ value.decode("utf-8") for value in hf[f"{group}/{value_group}"][:] ] # Reconstruct the dictionary out_dict = dict(zip(keys, values)) return out_dict @deprecated( deprecated_in="1.0.0", removed_in="1.1.0", details="legacy .h5ddl format will no longer be supported.", ) def read_h5ddl_legacy( filename: Path | str = "dandelion_data.h5ddl", ) -> Dandelion: # pragma: no cover """ Read in and returns a Dandelion class from .h5ddl format saved in legacy (version 3) format. Parameters ---------- filename : Path | str, optional path to `.h5ddl` file Returns ------- Dandelion Dandelion object. Raises ------ AttributeError if `data` not found in `.h5ddl` file. """ data = decode(load_data(_read_h5_group_legacy(filename, group="data"))) # final decode to ensure all byte strings are converted to str metadata = _read_h5_group_legacy(filename, group="metadata") try: g_0 = _read_h5_group_legacy(filename, group="graph/graph_0") g_1 = _read_h5_group_legacy(filename, group="graph/graph_1") graph0 = _create_graph(g_0, adjust_adjacency=1, fillna=0) graph1 = _create_graph(g_1, adjust_adjacency=1, fillna=0) graph = (graph0, graph1) except KeyError: pass try: layout0 = _read_h5_dict(filename, group="layout/layout_0") layout1 = _read_h5_dict(filename, group="layout/layout_1") layout = (layout0, layout1) except KeyError: pass try: germline = _read_h5_zip_legacy(filename, group="germline") except KeyError: pass constructor = {} constructor["data"] = data if "metadata" in locals(): constructor["metadata"] = metadata if "germline" in locals(): constructor["germline"] = germline if "layout" in locals(): constructor["layout"] = layout if "graph" in locals(): constructor["graph"] = graph res = Dandelion(**constructor, verbose=False) # ensure that the metadata is decoded res._metadata = decode(res._metadata) return res def _read_h5_group_legacy( filename: Path | str, group: str ) -> pd.DataFrame: # pragma: no cover """ Read a specific group from an H5 file in legacy (PyTables) format. Supports both 'fixed' format (block-based with axis0/axis1/blockN_*) and 'table' format (structured array with column metadata in attrs). Parameters ---------- filename : Path | str The path to the H5 file. group : str The group to read from the H5 file. Returns ------- pd.DataFrame The data from the specified group as a pandas dataframe. Raises ------ KeyError If the specified group is not found in the H5 file. """ with h5py.File(filename, "r") as hf: grp = hf[group] if "table" in grp: # Table format: structured array with column names in attrs return _read_pytables_table_format(grp) elif "axis0" in grp: # Fixed format: block-based with axis0/axis1/blockN_* return _read_pytables_fixed_format(grp) else: raise KeyError(f"Unrecognized legacy format for group '{group}'") @deprecated( deprecated_in="1.0.0", removed_in="1.1.0", details="legacy .h5ddl format will no longer be supported.", ) def _read_pytables_fixed_format( grp: h5py.Group, ) -> pd.DataFrame: # pragma: no cover """Read PyTables fixed format (block-based) from h5py Group.""" _decode_bytes = np.vectorize( lambda x: x.decode("utf-8") if isinstance(x, bytes) else x ) columns = _decode_bytes(grp["axis0"][:]) index = _decode_bytes(grp["axis1"][:]) # Collect all blocks block_num = 0 frames = [] while f"block{block_num}_items" in grp: items = _decode_bytes(grp[f"block{block_num}_items"][:]) vals_ds = grp[f"block{block_num}_values"] if vals_ds.dtype == object: # Object block: stored as pickled numpy array raw = vals_ds[0] values = pickle.loads(raw.tobytes()) else: values = vals_ds[:] df_block = pd.DataFrame(values, columns=items, index=index) frames.append(df_block) block_num += 1 if frames: result = pd.concat(frames, axis=1) # Reorder columns to match original order result = result[columns] else: result = pd.DataFrame(index=index, columns=columns) return result @deprecated( deprecated_in="1.0.0", removed_in="1.1.0", details="legacy .h5ddl format will no longer be supported.", ) def _read_pytables_table_format( grp: h5py.Group, ) -> pd.DataFrame: # pragma: no cover """Read PyTables table format (structured array) from h5py Group.""" table_ds = grp["table"] table_data = table_ds[:] attrs = dict(table_ds.attrs) # Extract index index_raw = table_data["index"] index = np.array( [x.decode("utf-8") if isinstance(x, bytes) else x for x in index_raw] ) # Extract each values_block and its column names from attrs frames = [] block_num = 0 while f"values_block_{block_num}_kind" in attrs: kind_raw = attrs[f"values_block_{block_num}_kind"] if isinstance(kind_raw, bytes): kind_raw = kind_raw.decode("utf-8") col_names = pickle.loads(kind_raw.encode("latin-1")) field_name = f"values_block_{block_num}" block_values = table_data[field_name] # Decode byte strings if needed if block_values.dtype.kind == "S" or block_values.dtype.kind == "O": decoded = np.vectorize( lambda x: x.decode("utf-8") if isinstance(x, bytes) else x )(block_values) df_block = pd.DataFrame(decoded, columns=col_names, index=index) else: df_block = pd.DataFrame( block_values, columns=col_names, index=index ) frames.append(df_block) block_num += 1 if frames: result = pd.concat(frames, axis=1) else: result = pd.DataFrame(index=index) return result @deprecated( deprecated_in="1.0.0", removed_in="1.1.0", details="legacy .h5ddl format will no longer be supported.", ) def _read_h5_zip_legacy( filename: Path | str, group: str ) -> dict: # pragma: no cover """ Read two groups from an H5 file and return them as a dictionary. Parameters ---------- filename : Path | str The path to the H5 file. group : str The group to read from the H5 file. key_group : str The name of the group containing the keys. value_group : str The name of the group containing the values. Returns ------- dict The dictionary data from the specified groups. """ with h5py.File(filename, "r") as hf: out_dict = {} for g in hf[group].attrs: out_dict.update({g: hf[group].attrs[g]}) return out_dict def write_airr(data: pd.DataFrame, save: Path | str) -> None: """Save as airr formatted file.""" data = sanitize_data(data) data.to_csv(save, sep="\t", index=False) def write_blastn(data: pd.DataFrame, save: Path | str) -> None: """Write blast output.""" data = sanitize_blastn(data) data.to_csv(save, sep="\t", index=False)