M2F

/M2F-DOCS

LOGO
Test

M2F Documentation

This document is the implementation-accurate user guide for microbiome2function (M2F).
It is written against the current code under src/M2F.

1. What M2F Is For

M2F is a practical toolkit for turning protein identifiers and UniProt annotations into ML-ready inputs.

Primary use-cases:

Design goals:

2. Install and Environment

2.1 Python / Packaging

Project metadata (pyproject.toml):

Install from repo root:

python -m venv .venv
source .venv/bin/activate
python -m pip install --upgrade pip
python -m pip install -e .

Why editable install:

2.2 Heavy Dependencies to Plan For

requirements.txt includes large ML packages:

Operational implications:

2.3 Logging Setup (Recommended)

import logging
from M2F import configure_logging

configure_logging(
    logs_dir="logs",
    file_level=logging.DEBUG,
    console_level=logging.INFO,
)

Why:

3. Public API Overview

Top-level import path:

import M2F

Current exported API (M2F.__all__) includes:

4. Data Contracts You Must Respect

M2F works well only if input schemas are strict. This is intentional.

4.1 Accession Index CSV

Expected columns exactly:

Constraints enforced by DatasetInput.validate(...):

Why strict index requirements:

4.2 Edge CSV Files (Graph Datasets Only)

Required only when require_graph=True (graph interfaces).

Defaults:

Rules:

Why one chunk per source node:

4.3 DatasetInput Query/Return Mapping

DatasetInput uses:

Important:

Why mapping instead of plain list:

5. Quick Start: End-to-End Patterns

5.1 Mining Accessions from HUMAnN

from M2F import extract_accessions_from_humann, extract_all_accessions_from_dir

unirefs, uniclusts = extract_accessions_from_humann("sample_gene_families.tsv")
all_unirefs, all_uniclusts = extract_all_accessions_from_dir("humann_outputs/")

Notes:

5.2 Fetch UniProt Fields

from M2F import fetch_uniprotkb_fields

df = fetch_uniprotkb_fields(
    uniref_ids=["A0A1B2C3D4", "Q9XYZ1"],
    fields=["accession", "sequence", "go_f", "ec"],
    request_size=50,
    rps=5,
    max_retry=20,
)

Field-name note:

Recommended defaults for stability:

Why this matters:

5.3 Clean UniProt Text Columns

from M2F import clean_cols

cleaned = clean_cols(
    df,
    col_names=[
        "Gene Ontology (molecular function)",
        "EC number",
        "Domain [FT]",
        "Function [CC]",
    ],
    inplace=False,
)

What you get:

Why tuple outputs:

5.4 Encode and Embed

import os
from M2F import (
    AAChainEmbedder,
    FreeTXTEmbedder,
    embed_AAsequences,
    embed_freetxt_cols,
    encode_go,
    encode_ec,
)

# Ensure tuple-based cell format expected by embedding/encoding wrappers.
cleaned = cleaned.copy()
cleaned["Sequence"] = cleaned["Sequence"].map(
    lambda s: (s,) if isinstance(s, str) and s else s
)

# Sequence embeddings (ESM2)
aa = AAChainEmbedder(model_key="esm2_t6_8M_UR50D", device="cpu")
df1 = embed_AAsequences(cleaned, embedder=aa, batch_size=64, inplace=False)

# Free-text embeddings (OpenAI)
txt = FreeTXTEmbedder(
    api_key=os.environ["OPENAI_API_KEY"],
    model="SMALL_OPENAI_MODEL",
    cache_file_path="embeddings.sqlite",
    caching_mode="APPEND",
    max_cache_size_kb=200_000,
)
df2 = embed_freetxt_cols(df1, cols=["Function [CC]"], embedder=txt, batch_size=512, inplace=False)

# Structured label encoding
df3, go_vocab = encode_go(df2, col_name="Gene Ontology (molecular function)", coverage_target=0.8)
df4, ec_vocab = encode_ec(df3, col_name="EC number", examples_per_class=30)

Input-shape note for wrappers:

Why staged transformation is useful:

5.5 Persist Feature Tables

from M2F import save_df, load_df

save_df(df4, "features.zip", metadata={"source": "uniprot_2026_05_11"})
restored = load_df("features.zip")

Persistence format constraints:

6. Graph Dataset Cookbook (PyG)

6.1 Build DatasetInput

from pathlib import Path
from M2F import DatasetInput

inp = DatasetInput(
    path_to_accession_ids_csv_file=Path("data/uniref_index.csv"),
    path_to_edge_csv_dir=Path("data/edges"),
    X={
        "sequence": "Sequence",
        "go_f": "go_mf",
    },
    Y={
        "ec": "target_ec",
    },
    request_size=25,
    rps=1.0,
    max_retry=20,
    num_feature_batches=8,
    edge_dst_column="j",
    edge_attr_columns=None,
)

Why num_feature_batches matters:

6.2 In-Memory Graph (ProteinGraphInMemoryDataset)

Use when graph fits in RAM/GPU workflow and you want a single Data object.

from pathlib import Path
from M2F import ProteinGraphInMemoryDataset

ds = ProteinGraphInMemoryDataset(
    root=Path("runs/graph_inmem"),
    dataset_input=inp,
    pre_transform=None,  # DataFrame -> DataFrame
    pre_filter=None,     # DataFrame -> boolean mask
    force_reload=False,
    val_set_size=0.1,
    test_set_size=0.1,
)

data = ds[0]
print(data.x.shape, data.edge_index.shape, data.y.shape)

Process summary:

  1. Download UniProt features to raw/features.csv.
  2. Materialize index + edge shards into raw/.
  3. Build node features and labels with build_features_from_DatasetInput.
  4. Build topology with build_topology_from_DatasetInput.
  5. Apply RandomNodeSplit masks.
  6. Save processed graph to processed/data.pt.

Key behavior:

6.3 On-Disk Graph (ProteinGraphOnDiskDataset)

Use when graph feature matrix is large and should be streamed from disk.

from pathlib import Path
from M2F import ProteinGraphOnDiskDataset

ondisk = ProteinGraphOnDiskDataset(
    root=Path("runs/graph_ondisk"),
    dataset_input=inp,
    pre_transform=None,
    pre_filter=None,
    force_reload=False,
    val_set_size=0.1,
    test_set_size=0.1,
)

Storage layout:

Two-pass processing logic (important):

  1. Pass 1 (features): process each feature shard and append x/y to zarr.
  2. Pass 2 (topology): once global reindex map is complete, build edges and edge attrs.

Why this is necessary:

Loaders

train_loader = ondisk.train_loader(num_neighbors=[15, 10], batch_size=1024, shuffle=True)
val_loader   = ondisk.val_loader(num_neighbors=[15, 10], batch_size=1024)
test_loader  = ondisk.test_loader(num_neighbors=[15, 10], batch_size=1024)

Under the hood:

Operational note:

6.4 Feature and Topology Builders as Standalone Functions

M2F exposes:

Use these if you need custom dataset orchestration.

Important guardrails already implemented:

7. FFNN Dataset Cookbook (ProteinDataset)

ProteinDataset is the non-graph companion for dense feed-forward models.

What it provides:

from pathlib import Path
from M2F import DatasetInput, ProteinDataset

inp_ffnn = DatasetInput(
    path_to_accession_ids_csv_file=Path("data/uniref_index.csv"),
    X={"sequence": "Sequence"},
    Y={"ec": "target_ec"},
    num_feature_batches=8,
)

dset = ProteinDataset(
    root=Path("runs/ffnn_data"),
    dataset_input=inp_ffnn,
    split="train",
    include_targets=True,
    force_reload=False,
)

x, y = dset[0]
print(x.shape, y.shape)

Split control:

# Mutate active split on same object
_ = dset.set_split("val", include_targets=True)

# Or create immutable view objects
val_view = dset.view("val", include_targets=True)
pred_view = dset.view("all", include_targets=False)

Dataloaders:

train_loader = dset.train_loader(batch_size=512, shuffle=True)
val_loader = dset.val_loader(batch_size=512)
test_loader = dset.test_loader(batch_size=512)
predict_loader = dset.predict_loader(batch_size=512, split="all")

Why separate FFNN dataset class:

Operational note:

8. Model Training Cookbook

8.1 GNN: GraphConvNodeClassifier

import torch
from M2F import GraphConvNodeClassifier

model = GraphConvNodeClassifier(
    in_dim=128,
    edge_dim=4,
    msg_dim=64,
    state_dim=64,
    out_dim=1,
    edge_features_used_as="scaling",  # or "catting"
    dropout_p=0.3,
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

history = model.fit(
    train=train_loader,
    val=val_loader,
    epochs=50,
    early_stopping=True,
    tolerance=5,
    report_performance_every_kth_epoch=1,
    save_model_to="runs/checkpoints_gnn",
)

metrics = model.test(test_loader, threshold=0.5)
print(history["best_val_loss"], metrics)

Implementation details worth knowing:

8.2 FFNN: FFNN

import torch
from M2F import FFNN

model = FFNN(in_dim=128, hidden_dim1=256, hidden_dim2=128, out_dim=1, dropout_p=0.3)
model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

history = model.fit(
    train=train_loader,
    val=val_loader,
    epochs=50,
    early_stopping=True,
    tolerance=5,
    report_performance_every_kth_epoch=1,
    save_model_to="runs/checkpoints_ffnn",
)

metrics = model.test(test_loader, threshold=0.5)
print(history["best_val_loss"], metrics)

Implementation details:

8.3 Metrics Utilities

Available helpers (M2F.testing_utils):

Use case:

9. Advanced Notes and Common Failure Modes

9.1 Duplicate Accessions in Feature Shards

build_features_from_DatasetInput explicitly rejects duplicate Entry values within a shard.

Why:

9.2 Why Topology Is Built After Features for On-Disk Workflows

If filtering drops nodes, old IDs must be remapped to dense new IDs.
You cannot finalize edges safely until global id_map is complete.

Practical consequence:

9.3 force_reload Semantics

For ProteinGraphOnDiskDataset and ProteinDataset:

Use force_reload=True when:

9.4 Consistent Vector Dimensions Are Mandatory

M2F validates that all rows produce identical flattened dimensions for:

If dimensions vary across rows, processing fails early.

Why:

9.5 OpenAI Embedding Cost and Caching

For FreeTXTEmbedder:

Why:

10. Full Cookbook Example (Graph Pipeline)

import logging
from pathlib import Path

from M2F import (
    configure_logging,
    DatasetInput,
    ProteinGraphOnDiskDataset,
    GraphConvNodeClassifier,
)

configure_logging("logs", file_level=logging.DEBUG, console_level=logging.INFO)

inp = DatasetInput(
    path_to_accession_ids_csv_file=Path("data/uniref_index.csv"),
    path_to_edge_csv_dir=Path("data/edges"),
    X={"sequence": "Sequence", "go_f": "go_mf"},
    Y={"ec": "target_ec"},
    request_size=25,
    rps=1.0,
    max_retry=20,
    num_feature_batches=8,
)

ds = ProteinGraphOnDiskDataset(
    root=Path("runs/gds"),
    dataset_input=inp,
    force_reload=False,
    val_set_size=0.1,
    test_set_size=0.1,
)

train_loader = ds.train_loader(num_neighbors=[15, 10], batch_size=1024, shuffle=True)
val_loader = ds.val_loader(num_neighbors=[15, 10], batch_size=1024)
test_loader = ds.test_loader(num_neighbors=[15, 10], batch_size=1024)

x_dim = int(ds.meta["x_dim"])
edge_dim = int(ds.meta["edge_attr_dim"])
y_dim = int(ds.meta["y_dim"])

model = GraphConvNodeClassifier(
    in_dim=x_dim,
    edge_dim=edge_dim,
    msg_dim=128,
    state_dim=128,
    out_dim=y_dim,
)

history = model.fit(
    train=train_loader,
    val=val_loader,
    epochs=30,
    tolerance=5,
    report_performance_every_kth_epoch=1,
    save_model_to="runs/checkpoints",
)

metrics = model.test(test_loader)
print(history["best_val_loss"], metrics)

ds.close()

11. Testing and CI

Local test run:

python -m unittest discover -s tests -p "test_*.py"

Current CI workflows:

Install from built artifact (example):

python -m pip install dist/microbiome2function-0.1.0-py3-none-any.whl

12. Practical Recommendations

13. Module Index