"""Adapter for the external MemoryOS baseline.""" from __future__ import annotations import importlib import os import shutil import sys import tempfile from pathlib import Path from typing import Any, Callable from eval_framework.datasets.schemas import ( MemoryDeltaRecord, MemorySnapshotRecord, NormalizedTurn, RetrievalItem, RetrievalRecord, ) from eval_framework.memory_adapters.base import MemoryAdapter _BACKEND_ID = "MemoryOS" INTEGRATION_ERROR = ( f"{_BACKEND_ID} backend unavailable." ) class MemoryOSAdapter(MemoryAdapter): """Thin wrapper around MemoryOS's local Python API.""" def __init__( self, *, backend: Any | None = None, backend_factory: Callable[[], Any] | None = None, source_root: str | os.PathLike[str] | None = None, storage_root: str | os.PathLike[str] | None = None, user_id: str = "eval_user", assistant_id: str = "eval_assistant", llm_model: str | None = None, embedding_model_name: str = "all-MiniLM-L6-v2", openai_api_key: str | None = None, openai_base_url: str | None = None, ) -> None: self._source_root = Path(source_root).resolve() if source_root else self._default_source_root() self._storage_root = Path(storage_root).resolve() if storage_root else Path( tempfile.mkdtemp(prefix="memoryos_eval_") ) self._user_id = user_id self._assistant_id = assistant_id self._llm_model = llm_model or os.getenv("OPENAI_MODEL") or "gpt-5.1" self._embedding_model_name = embedding_model_name self._openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY") self._openai_base_url = openai_base_url or os.getenv("OPENAI_BASE_URL") self._backend_factory = backend_factory self._backend: Any | None = None self._integration_error: str | None = None self._session_id = "" self._prev_snapshot_ids: set[str] = set() self._pending_user_turns: list[NormalizedTurn] = [] if backend is not None: self._backend = backend else: try: if self._backend_factory is None: self._backend_factory = self._build_backend_factory() self._backend = self._backend_factory() except Exception as exc: self._integration_error = str(exc) @staticmethod def _default_source_root() -> Path: here = Path(__file__).resolve() # memory_adapters/ -> eval_framework/ -> nips26/ -> baselines/MemoryOS/memoryos-pypi return (here.parents[2] / "baselines" / "MemoryOS" / "memoryos-pypi").resolve() def _build_backend_factory(self) -> Callable[[], Any]: if not self._source_root.is_dir(): raise RuntimeError( f"{_BACKEND_ID}: source root not found at {self._source_root}" ) src = str(self._source_root) if src not in sys.path: sys.path.insert(0, src) mod = importlib.import_module("memoryos") backend_cls = getattr(mod, "Memoryos") def _factory() -> Any: run_root = self._storage_root / "runtime" shutil.rmtree(run_root, ignore_errors=True) run_root.mkdir(parents=True, exist_ok=True) return backend_cls( user_id=self._user_id, openai_api_key=self._openai_api_key or "", openai_base_url=self._openai_base_url, data_storage_path=str(run_root), llm_model=self._llm_model, assistant_id=self._assistant_id, embedding_model_name=self._embedding_model_name, ) return _factory def _runtime_error(self) -> RuntimeError: detail = self._integration_error or INTEGRATION_ERROR return RuntimeError( f"{_BACKEND_ID}: backend unavailable — {detail}" ) def reset(self) -> None: if self._backend_factory is None and self._backend is None: raise self._runtime_error() if self._backend_factory is not None: self._backend = self._backend_factory() self._prev_snapshot_ids = set() self._pending_user_turns = [] self._session_id = "" def ingest_turn(self, turn: NormalizedTurn) -> None: self._require_backend() self._session_id = turn.session_id if turn.role == "assistant": self._store_pair(turn) else: self._pending_user_turns.append(turn) def end_session(self, session_id: str) -> None: self._require_backend() self._session_id = session_id if self._pending_user_turns: synthetic = self._pending_user_turns[-1] self._store_memory( session_id=session_id, user_input=self._joined_user_text(), agent_response="", timestamp=synthetic.timestamp, ) self._pending_user_turns = [] def snapshot_memories(self) -> list[MemorySnapshotRecord]: backend = self._require_backend() rows: list[MemorySnapshotRecord] = [] sid = self._session_id for idx, qa in enumerate(backend.short_term_memory.get_all()): rows.append( MemorySnapshotRecord( memory_id=f"st:{idx}", text=self._format_qa_text(qa), session_id=sid, status="active", source=_BACKEND_ID, raw_backend_id=f"st:{idx}", raw_backend_type="short_term", metadata={"timestamp": qa.get("timestamp")}, ) ) for internal_session_id, session in getattr(backend.mid_term_memory, "sessions", {}).items(): for page_idx, page in enumerate(session.get("details", [])): rows.append( MemorySnapshotRecord( memory_id=f"mt:{internal_session_id}:{page_idx}", text=self._format_qa_text(page), session_id=sid, status="active", source=_BACKEND_ID, raw_backend_id=str(page.get("page_id", f"{internal_session_id}:{page_idx}")), raw_backend_type="mid_term_page", metadata={"memoryos_session_id": internal_session_id}, ) ) user_profile = backend.user_long_term_memory.get_raw_user_profile(backend.user_id) if user_profile and str(user_profile).lower() != "none": rows.append( MemorySnapshotRecord( memory_id="lt:user_profile", text=str(user_profile), session_id=sid, status="active", source=_BACKEND_ID, raw_backend_id="user_profile", raw_backend_type="user_profile", metadata={}, ) ) for idx, item in enumerate(backend.user_long_term_memory.get_user_knowledge()): rows.append( MemorySnapshotRecord( memory_id=f"lt:user:{idx}", text=str(item.get("knowledge", "")), session_id=sid, status="active", source=_BACKEND_ID, raw_backend_id=f"user:{idx}", raw_backend_type="user_knowledge", metadata={"timestamp": item.get("timestamp")}, ) ) assistant_ltm = getattr(backend, "assistant_long_term_memory", None) if assistant_ltm is not None and hasattr(assistant_ltm, "get_assistant_knowledge"): for idx, item in enumerate(assistant_ltm.get_assistant_knowledge()): rows.append( MemorySnapshotRecord( memory_id=f"lt:assistant:{idx}", text=str(item.get("knowledge", "")), session_id=sid, status="active", source=_BACKEND_ID, raw_backend_id=f"assistant:{idx}", raw_backend_type="assistant_knowledge", metadata={"timestamp": item.get("timestamp")}, ) ) return rows def export_memory_delta(self, session_id: str) -> list[MemoryDeltaRecord]: """Export delta by diffing current snapshot against previous snapshot.""" self._require_backend() current_snapshot = self.snapshot_memories() deltas: list[MemoryDeltaRecord] = [] current_ids: set[str] = set() for snap in current_snapshot: current_ids.add(snap.memory_id) if snap.memory_id not in self._prev_snapshot_ids: deltas.append( MemoryDeltaRecord( session_id=session_id, op="add", text=snap.text, linked_previous=(), raw_backend_id=snap.raw_backend_id, metadata={ "baseline": _BACKEND_ID, "backend_type": snap.raw_backend_type, }, ) ) self._prev_snapshot_ids = current_ids return deltas def retrieve(self, query: str, top_k: int) -> RetrievalRecord: backend = self._require_backend() raw = backend.retriever.retrieve_context(query, user_id=backend.user_id) items: list[RetrievalItem] = [] for page in raw.get("retrieved_pages", []): items.append( RetrievalItem( rank=len(items), memory_id=f"page:{len(items)}", text=self._format_qa_text(page), score=1.0 / float(len(items) + 1), raw_backend_id=page.get("page_id"), ) ) for item in raw.get("retrieved_user_knowledge", []): items.append( RetrievalItem( rank=len(items), memory_id=f"user:{len(items)}", text=str(item.get("knowledge", "")), score=1.0 / float(len(items) + 1), raw_backend_id=None, ) ) for item in raw.get("retrieved_assistant_knowledge", []): items.append( RetrievalItem( rank=len(items), memory_id=f"assistant:{len(items)}", text=str(item.get("knowledge", "")), score=1.0 / float(len(items) + 1), raw_backend_id=None, ) ) return RetrievalRecord( query=query, top_k=top_k, items=items[:top_k], raw_trace={"baseline": _BACKEND_ID, "retrieved_at": raw.get("retrieved_at")}, ) def get_capabilities(self) -> dict[str, Any]: available = self._backend is not None or self._backend_factory is not None return { "backend": _BACKEND_ID, "baseline": _BACKEND_ID, "available": available and self._integration_error is None, "integration_status": "integrated" if available and self._integration_error is None else "unavailable", "integration_error": self._integration_error or INTEGRATION_ERROR, "delta_granularity": "ingest_pair_only", "snapshot_mode": "short_mid_long_term", } def _require_backend(self) -> Any: if self._backend is None: raise self._runtime_error() return self._backend def _store_pair(self, assistant_turn: NormalizedTurn) -> None: user_input = self._joined_user_text() self._store_memory( session_id=assistant_turn.session_id, user_input=user_input, agent_response=self._turn_text(assistant_turn), timestamp=assistant_turn.timestamp, ) self._pending_user_turns = [] def _store_memory( self, *, session_id: str, user_input: str, agent_response: str, timestamp: str | None, ) -> None: backend = self._require_backend() backend.add_memory( user_input=user_input, agent_response=agent_response, timestamp=timestamp, meta_data={"session_id": session_id}, ) def _joined_user_text(self) -> str: if not self._pending_user_turns: return "" return "\n".join(self._turn_text(turn) for turn in self._pending_user_turns) @staticmethod def _turn_text(turn: NormalizedTurn) -> str: parts = [turn.text] for att in turn.attachments: parts.append(f"[{att.type}] {att.caption}") return "\n".join(parts) @staticmethod def _format_qa_text(item: dict[str, Any]) -> str: parts = [] user_text = item.get("user_input", "") if user_text: parts.append(f"user: {user_text}") assistant_text = item.get("agent_response", "") if assistant_text: parts.append(f"assistant: {assistant_text}") if not parts: parts.append(str(item)) return "\n".join(parts)