This document is the implementation-accurate user guide for microbiome2function (M2F).
It is written against the current code under src/M2F.
M2F is a practical toolkit for turning protein identifiers and UniProt annotations into ML-ready inputs.
Primary use-cases:
ProteinGraphInMemoryDataset, ProteinGraphOnDiskDataset.ProteinDataset (features + labels, no edges).Design goals:
Project metadata (pyproject.toml):
microbiome2function>=3.11,<3.13src/Install from repo root:
python -m venv .venv
source .venv/bin/activate
python -m pip install --upgrade pip
python -m pip install -e .
Why editable install:
import M2F) while you iterate code.requirements.txt includes large ML packages:
torch==2.8.0torch-geometric==2.7.0transformers==4.55.0zarr==3.1.1openai==1.99.3Operational implications:
import logging
from M2F import configure_logging
configure_logging(
logs_dir="logs",
file_level=logging.DEBUG,
console_level=logging.INFO,
)
Why:
Top-level import path:
import M2F
Current exported API (M2F.__all__) includes:
configure_logging.extract_accessions_from_humann, extract_all_accessions_from_dir, fetch_uniprotkb_fields, fetch_save_uniprotkb_batches.clean_col, clean_cols.AAChainEmbedder, FreeTXTEmbedder, MultiHotEncoder, GOEncoder, ECEncoder, encode_multihot, get_GODag.embed_ft_domains, embed_AAsequences, embed_freetxt_cols, encode_go, encode_ec, empty_tuples_to_NaNs, save_df, load_df.FFNN, GraphConv, GraphConvNodeClassifier.accuracy, recall, precision, f1.DatasetInput, build_topology_from_DatasetInput, build_features_from_DatasetInput, ProteinGraphInMemoryDataset, ProteinGraphOnDiskDataset, ProteinDataset.util.M2F works well only if input schemas are strict. This is intentional.
Expected columns exactly:
unirefiConstraints enforced by DatasetInput.validate(...):
i must be integer dtype.i must be 1-based positive IDs.i must not contain duplicates.uniref values must start with UniRef90_.Why strict index requirements:
i - 1).Required only when require_graph=True (graph interfaces).
Defaults:
chunk_\d+\.csvjRules:
edge_dst_column.edge_attr_columns must exist if explicitly provided.Why one chunk per source node:
chunk_<i>.csv).DatasetInput Query/Return MappingDatasetInput uses:
X: dict[str, str] mapping UniProt query field -> return column name.Y: dict[str, str] singleton mapping UniProt query field -> return column name.Important:
Y must contain exactly one entry.Y cannot overlap with X keys or values.Y key cannot be accession.accession is always injected into X internally as "Entry".Why mapping instead of plain list:
from M2F import extract_accessions_from_humann, extract_all_accessions_from_dir
unirefs, uniclusts = extract_accessions_from_humann("sample_gene_families.tsv")
all_unirefs, all_uniclusts = extract_all_accessions_from_dir("humann_outputs/")
Notes:
UNK/UPI are excluded before UniProt mining because they are not queryable reliably.from M2F import fetch_uniprotkb_fields
df = fetch_uniprotkb_fields(
uniref_ids=["A0A1B2C3D4", "Q9XYZ1"],
fields=["accession", "sequence", "go_f", "ec"],
request_size=50,
rps=5,
max_retry=20,
)
Field-name note:
fields values must be valid UniProt API field identifiers.Recommended defaults for stability:
request_size (25-100).rps conservative if network is noisy.Why this matters:
from M2F import clean_cols
cleaned = clean_cols(
df,
col_names=[
"Gene Ontology (molecular function)",
"EC number",
"Domain [FT]",
"Function [CC]",
],
inplace=False,
)
What you get:
() in cleaning stage.Why tuple outputs:
import os
from M2F import (
AAChainEmbedder,
FreeTXTEmbedder,
embed_AAsequences,
embed_freetxt_cols,
encode_go,
encode_ec,
)
# Ensure tuple-based cell format expected by embedding/encoding wrappers.
cleaned = cleaned.copy()
cleaned["Sequence"] = cleaned["Sequence"].map(
lambda s: (s,) if isinstance(s, str) and s else s
)
# Sequence embeddings (ESM2)
aa = AAChainEmbedder(model_key="esm2_t6_8M_UR50D", device="cpu")
df1 = embed_AAsequences(cleaned, embedder=aa, batch_size=64, inplace=False)
# Free-text embeddings (OpenAI)
txt = FreeTXTEmbedder(
api_key=os.environ["OPENAI_API_KEY"],
model="SMALL_OPENAI_MODEL",
cache_file_path="embeddings.sqlite",
caching_mode="APPEND",
max_cache_size_kb=200_000,
)
df2 = embed_freetxt_cols(df1, cols=["Function [CC]"], embedder=txt, batch_size=512, inplace=False)
# Structured label encoding
df3, go_vocab = encode_go(df2, col_name="Gene Ontology (molecular function)", coverage_target=0.8)
df4, ec_vocab = encode_ec(df3, col_name="EC number", examples_per_class=30)
Input-shape note for wrappers:
embed_AAsequences expects "Sequence" cells to be singleton tuples like ("MSEQ...",).embed_freetxt_cols expects tuple-of-strings per row.encode_go and encode_ec expect tuple-encoded labels per row.Why staged transformation is useful:
go_vocab, ec_vocab) are needed for interpretation and consistent inference.from M2F import save_df, load_df
save_df(df4, "features.zip", metadata={"source": "uniprot_2026_05_11"})
restored = load_df("features.zip")
Persistence format constraints:
save_df requires .zip extension.Entry accession column.tuple (ragged integer encodings) or np.ndarray (dense embeddings).DatasetInputfrom pathlib import Path
from M2F import DatasetInput
inp = DatasetInput(
path_to_accession_ids_csv_file=Path("data/uniref_index.csv"),
path_to_edge_csv_dir=Path("data/edges"),
X={
"sequence": "Sequence",
"go_f": "go_mf",
},
Y={
"ec": "target_ec",
},
request_size=25,
rps=1.0,
max_retry=20,
num_feature_batches=8,
edge_dst_column="j",
edge_attr_columns=None,
)
Why num_feature_batches matters:
ProteinGraphInMemoryDataset)Use when graph fits in RAM/GPU workflow and you want a single Data object.
from pathlib import Path
from M2F import ProteinGraphInMemoryDataset
ds = ProteinGraphInMemoryDataset(
root=Path("runs/graph_inmem"),
dataset_input=inp,
pre_transform=None, # DataFrame -> DataFrame
pre_filter=None, # DataFrame -> boolean mask
force_reload=False,
val_set_size=0.1,
test_set_size=0.1,
)
data = ds[0]
print(data.x.shape, data.edge_index.shape, data.y.shape)
Process summary:
raw/features.csv.raw/.build_features_from_DatasetInput.build_topology_from_DatasetInput.RandomNodeSplit masks.processed/data.pt.Key behavior:
X/Y fields after transform/filter.ProteinGraphOnDiskDataset)Use when graph feature matrix is large and should be streamed from disk.
from pathlib import Path
from M2F import ProteinGraphOnDiskDataset
ondisk = ProteinGraphOnDiskDataset(
root=Path("runs/graph_ondisk"),
dataset_input=inp,
pre_transform=None,
pre_filter=None,
force_reload=False,
val_set_size=0.1,
test_set_size=0.1,
)
Storage layout:
raw/features_batches/features_<i>.csvprocessed/feature_store/processed/edge_index.npyprocessed/id_map.npyprocessed/meta.ptTwo-pass processing logic (important):
x/y to zarr.Why this is necessary:
train_loader = ondisk.train_loader(num_neighbors=[15, 10], batch_size=1024, shuffle=True)
val_loader = ondisk.val_loader(num_neighbors=[15, 10], batch_size=1024)
test_loader = ondisk.test_loader(num_neighbors=[15, 10], batch_size=1024)
Under the hood:
FeatureStore.GraphStore.e_id lookup.Operational note:
ondisk.close() when done to release store handles.M2F exposes:
build_features_from_DatasetInput(...)build_topology_from_DatasetInput(...)Use these if you need custom dataset orchestration.
Important guardrails already implemented:
Entry rows in a feature shard raise ValueError.ValueError.X or Y row dimensionality raises ValueError.ProteinDataset)ProteinDataset is the non-graph companion for dense feed-forward models.
What it provides:
x and y only.from pathlib import Path
from M2F import DatasetInput, ProteinDataset
inp_ffnn = DatasetInput(
path_to_accession_ids_csv_file=Path("data/uniref_index.csv"),
X={"sequence": "Sequence"},
Y={"ec": "target_ec"},
num_feature_batches=8,
)
dset = ProteinDataset(
root=Path("runs/ffnn_data"),
dataset_input=inp_ffnn,
split="train",
include_targets=True,
force_reload=False,
)
x, y = dset[0]
print(x.shape, y.shape)
Split control:
# Mutate active split on same object
_ = dset.set_split("val", include_targets=True)
# Or create immutable view objects
val_view = dset.view("val", include_targets=True)
pred_view = dset.view("all", include_targets=False)
Dataloaders:
train_loader = dset.train_loader(batch_size=512, shuffle=True)
val_loader = dset.val_loader(batch_size=512)
test_loader = dset.test_loader(batch_size=512)
predict_loader = dset.predict_loader(batch_size=512, split="all")
Why separate FFNN dataset class:
Operational note:
dset.close() when done.GraphConvNodeClassifierimport torch
from M2F import GraphConvNodeClassifier
model = GraphConvNodeClassifier(
in_dim=128,
edge_dim=4,
msg_dim=64,
state_dim=64,
out_dim=1,
edge_features_used_as="scaling", # or "catting"
dropout_p=0.3,
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
history = model.fit(
train=train_loader,
val=val_loader,
epochs=50,
early_stopping=True,
tolerance=5,
report_performance_every_kth_epoch=1,
save_model_to="runs/checkpoints_gnn",
)
metrics = model.test(test_loader, threshold=0.5)
print(history["best_val_loss"], metrics)
Implementation details worth knowing:
BCEWithLogitsLoss.batch_size mask logic).fit(...) returns best_val_loss, best_model_path, and epoch-wise history.FFNNimport torch
from M2F import FFNN
model = FFNN(in_dim=128, hidden_dim1=256, hidden_dim2=128, out_dim=1, dropout_p=0.3)
model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
history = model.fit(
train=train_loader,
val=val_loader,
epochs=50,
early_stopping=True,
tolerance=5,
report_performance_every_kth_epoch=1,
save_model_to="runs/checkpoints_ffnn",
)
metrics = model.test(test_loader, threshold=0.5)
print(history["best_val_loss"], metrics)
Implementation details:
BCEWithLogitsLoss.forward(...) returns logits during training, sigmoid probabilities during eval.Available helpers (M2F.testing_utils):
accuracy(logits, y_true, mask, threshold=0.5)recall(logits, y_true, mask, threshold=0.5)precision(logits, y_true, mask, threshold=0.5)f1(logits, y_true, mask, threshold=0.5)Use case:
build_features_from_DatasetInput explicitly rejects duplicate Entry values within a shard.
Why:
If filtering drops nodes, old IDs must be remapped to dense new IDs.
You cannot finalize edges safely until global id_map is complete.
Practical consequence:
force_reload SemanticsFor ProteinGraphOnDiskDataset and ProteinDataset:
force_reload=True deletes processed artifacts and raw feature batch folder before rebuild.Use force_reload=True when:
M2F validates that all rows produce identical flattened dimensions for:
If dimensions vary across rows, processing fails early.
Why:
For FreeTXTEmbedder:
cache_file_path + caching_mode="APPEND" for repeated experiments.max_cache_size_kb large enough to reduce DB churn.Why:
import logging
from pathlib import Path
from M2F import (
configure_logging,
DatasetInput,
ProteinGraphOnDiskDataset,
GraphConvNodeClassifier,
)
configure_logging("logs", file_level=logging.DEBUG, console_level=logging.INFO)
inp = DatasetInput(
path_to_accession_ids_csv_file=Path("data/uniref_index.csv"),
path_to_edge_csv_dir=Path("data/edges"),
X={"sequence": "Sequence", "go_f": "go_mf"},
Y={"ec": "target_ec"},
request_size=25,
rps=1.0,
max_retry=20,
num_feature_batches=8,
)
ds = ProteinGraphOnDiskDataset(
root=Path("runs/gds"),
dataset_input=inp,
force_reload=False,
val_set_size=0.1,
test_set_size=0.1,
)
train_loader = ds.train_loader(num_neighbors=[15, 10], batch_size=1024, shuffle=True)
val_loader = ds.val_loader(num_neighbors=[15, 10], batch_size=1024)
test_loader = ds.test_loader(num_neighbors=[15, 10], batch_size=1024)
x_dim = int(ds.meta["x_dim"])
edge_dim = int(ds.meta["edge_attr_dim"])
y_dim = int(ds.meta["y_dim"])
model = GraphConvNodeClassifier(
in_dim=x_dim,
edge_dim=edge_dim,
msg_dim=128,
state_dim=128,
out_dim=y_dim,
)
history = model.fit(
train=train_loader,
val=val_loader,
epochs=30,
tolerance=5,
report_performance_every_kth_epoch=1,
save_model_to="runs/checkpoints",
)
metrics = model.test(test_loader)
print(history["best_val_loss"], metrics)
ds.close()
Local test run:
python -m unittest discover -s tests -p "test_*.py"
Current CI workflows:
.github/workflows/test.yml: runs tests on pushes/PRs (Python 3.11 and 3.12)..github/workflows/build.yml: runs tests, builds package dists (sdist + wheel), validates with twine, and uploads build artifacts.Install from built artifact (example):
python -m pip install dist/microbiome2function-0.1.0-py3-none-any.whl
DatasetInput and preprocessing on a tiny accession subset first.pre_transform must return DataFrame; pre_filter must return boolean mask with matching length.meta.pt, vocab maps, and model checkpoints per experiment.close() to release zarr handles after training/inference.M2F.logging_utils: logger configuration.M2F.mining_utils: accession extraction + UniProt mining.M2F.cleaning_utils: regex-based annotation cleaning.M2F.embedding_utils: ESM and OpenAI embedding + GO/EC/multihot encoders.M2F.feature_engineering_utils: high-level embedding wrappers + zarr zip persistence.M2F.pyg_data_interfaces: graph and FFNN dataset interfaces + standalone builders.M2F.gnn: graph convolution model and training/eval loops.M2F.ffnn: feed-forward model and training/eval loops.M2F.testing_utils: metric helpers.M2F.util: utility helpers and zarr feature-store backend.