| """Adapter for the external MemoryOS baseline.""" |
|
|
| from __future__ import annotations |
|
|
| import importlib |
| import os |
| import shutil |
| import sys |
| import tempfile |
| from pathlib import Path |
| from typing import Any, Callable |
|
|
| from eval_framework.datasets.schemas import ( |
| MemoryDeltaRecord, |
| MemorySnapshotRecord, |
| NormalizedTurn, |
| RetrievalItem, |
| RetrievalRecord, |
| ) |
| from eval_framework.memory_adapters.base import MemoryAdapter |
|
|
| _BACKEND_ID = "MemoryOS" |
|
|
| INTEGRATION_ERROR = ( |
| f"{_BACKEND_ID} backend unavailable." |
| ) |
|
|
|
|
| class MemoryOSAdapter(MemoryAdapter): |
| """Thin wrapper around MemoryOS's local Python API.""" |
|
|
| def __init__( |
| self, |
| *, |
| backend: Any | None = None, |
| backend_factory: Callable[[], Any] | None = None, |
| source_root: str | os.PathLike[str] | None = None, |
| storage_root: str | os.PathLike[str] | None = None, |
| user_id: str = "eval_user", |
| assistant_id: str = "eval_assistant", |
| llm_model: str | None = None, |
| embedding_model_name: str = "all-MiniLM-L6-v2", |
| openai_api_key: str | None = None, |
| openai_base_url: str | None = None, |
| ) -> None: |
| self._source_root = Path(source_root).resolve() if source_root else self._default_source_root() |
| self._storage_root = Path(storage_root).resolve() if storage_root else Path( |
| tempfile.mkdtemp(prefix="memoryos_eval_") |
| ) |
| self._user_id = user_id |
| self._assistant_id = assistant_id |
| self._llm_model = llm_model or os.getenv("OPENAI_MODEL") or "gpt-5.1" |
| self._embedding_model_name = embedding_model_name |
| self._openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY") |
| self._openai_base_url = openai_base_url or os.getenv("OPENAI_BASE_URL") |
| self._backend_factory = backend_factory |
| self._backend: Any | None = None |
| self._integration_error: str | None = None |
| self._session_id = "" |
| self._prev_snapshot_ids: set[str] = set() |
| self._pending_user_turns: list[NormalizedTurn] = [] |
|
|
| if backend is not None: |
| self._backend = backend |
| else: |
| try: |
| if self._backend_factory is None: |
| self._backend_factory = self._build_backend_factory() |
| self._backend = self._backend_factory() |
| except Exception as exc: |
| self._integration_error = str(exc) |
|
|
| @staticmethod |
| def _default_source_root() -> Path: |
| here = Path(__file__).resolve() |
| |
| return (here.parents[2] / "baselines" / "MemoryOS" / "memoryos-pypi").resolve() |
|
|
| def _build_backend_factory(self) -> Callable[[], Any]: |
| if not self._source_root.is_dir(): |
| raise RuntimeError( |
| f"{_BACKEND_ID}: source root not found at {self._source_root}" |
| ) |
| src = str(self._source_root) |
| if src not in sys.path: |
| sys.path.insert(0, src) |
| mod = importlib.import_module("memoryos") |
| backend_cls = getattr(mod, "Memoryos") |
|
|
| def _factory() -> Any: |
| run_root = self._storage_root / "runtime" |
| shutil.rmtree(run_root, ignore_errors=True) |
| run_root.mkdir(parents=True, exist_ok=True) |
| return backend_cls( |
| user_id=self._user_id, |
| openai_api_key=self._openai_api_key or "", |
| openai_base_url=self._openai_base_url, |
| data_storage_path=str(run_root), |
| llm_model=self._llm_model, |
| assistant_id=self._assistant_id, |
| embedding_model_name=self._embedding_model_name, |
| ) |
|
|
| return _factory |
|
|
| def _runtime_error(self) -> RuntimeError: |
| detail = self._integration_error or INTEGRATION_ERROR |
| return RuntimeError( |
| f"{_BACKEND_ID}: backend unavailable — {detail}" |
| ) |
|
|
| def reset(self) -> None: |
| if self._backend_factory is None and self._backend is None: |
| raise self._runtime_error() |
| if self._backend_factory is not None: |
| self._backend = self._backend_factory() |
| self._prev_snapshot_ids = set() |
| self._pending_user_turns = [] |
| self._session_id = "" |
|
|
| def ingest_turn(self, turn: NormalizedTurn) -> None: |
| self._require_backend() |
| self._session_id = turn.session_id |
| if turn.role == "assistant": |
| self._store_pair(turn) |
| else: |
| self._pending_user_turns.append(turn) |
|
|
| def end_session(self, session_id: str) -> None: |
| self._require_backend() |
| self._session_id = session_id |
| if self._pending_user_turns: |
| synthetic = self._pending_user_turns[-1] |
| self._store_memory( |
| session_id=session_id, |
| user_input=self._joined_user_text(), |
| agent_response="", |
| timestamp=synthetic.timestamp, |
| ) |
| self._pending_user_turns = [] |
|
|
| def snapshot_memories(self) -> list[MemorySnapshotRecord]: |
| backend = self._require_backend() |
| rows: list[MemorySnapshotRecord] = [] |
| sid = self._session_id |
|
|
| for idx, qa in enumerate(backend.short_term_memory.get_all()): |
| rows.append( |
| MemorySnapshotRecord( |
| memory_id=f"st:{idx}", |
| text=self._format_qa_text(qa), |
| session_id=sid, |
| status="active", |
| source=_BACKEND_ID, |
| raw_backend_id=f"st:{idx}", |
| raw_backend_type="short_term", |
| metadata={"timestamp": qa.get("timestamp")}, |
| ) |
| ) |
|
|
| for internal_session_id, session in getattr(backend.mid_term_memory, "sessions", {}).items(): |
| for page_idx, page in enumerate(session.get("details", [])): |
| rows.append( |
| MemorySnapshotRecord( |
| memory_id=f"mt:{internal_session_id}:{page_idx}", |
| text=self._format_qa_text(page), |
| session_id=sid, |
| status="active", |
| source=_BACKEND_ID, |
| raw_backend_id=str(page.get("page_id", f"{internal_session_id}:{page_idx}")), |
| raw_backend_type="mid_term_page", |
| metadata={"memoryos_session_id": internal_session_id}, |
| ) |
| ) |
|
|
| user_profile = backend.user_long_term_memory.get_raw_user_profile(backend.user_id) |
| if user_profile and str(user_profile).lower() != "none": |
| rows.append( |
| MemorySnapshotRecord( |
| memory_id="lt:user_profile", |
| text=str(user_profile), |
| session_id=sid, |
| status="active", |
| source=_BACKEND_ID, |
| raw_backend_id="user_profile", |
| raw_backend_type="user_profile", |
| metadata={}, |
| ) |
| ) |
|
|
| for idx, item in enumerate(backend.user_long_term_memory.get_user_knowledge()): |
| rows.append( |
| MemorySnapshotRecord( |
| memory_id=f"lt:user:{idx}", |
| text=str(item.get("knowledge", "")), |
| session_id=sid, |
| status="active", |
| source=_BACKEND_ID, |
| raw_backend_id=f"user:{idx}", |
| raw_backend_type="user_knowledge", |
| metadata={"timestamp": item.get("timestamp")}, |
| ) |
| ) |
|
|
| assistant_ltm = getattr(backend, "assistant_long_term_memory", None) |
| if assistant_ltm is not None and hasattr(assistant_ltm, "get_assistant_knowledge"): |
| for idx, item in enumerate(assistant_ltm.get_assistant_knowledge()): |
| rows.append( |
| MemorySnapshotRecord( |
| memory_id=f"lt:assistant:{idx}", |
| text=str(item.get("knowledge", "")), |
| session_id=sid, |
| status="active", |
| source=_BACKEND_ID, |
| raw_backend_id=f"assistant:{idx}", |
| raw_backend_type="assistant_knowledge", |
| metadata={"timestamp": item.get("timestamp")}, |
| ) |
| ) |
| return rows |
|
|
| def export_memory_delta(self, session_id: str) -> list[MemoryDeltaRecord]: |
| """Export delta by diffing current snapshot against previous snapshot.""" |
| self._require_backend() |
| current_snapshot = self.snapshot_memories() |
| deltas: list[MemoryDeltaRecord] = [] |
| current_ids: set[str] = set() |
|
|
| for snap in current_snapshot: |
| current_ids.add(snap.memory_id) |
| if snap.memory_id not in self._prev_snapshot_ids: |
| deltas.append( |
| MemoryDeltaRecord( |
| session_id=session_id, |
| op="add", |
| text=snap.text, |
| linked_previous=(), |
| raw_backend_id=snap.raw_backend_id, |
| metadata={ |
| "baseline": _BACKEND_ID, |
| "backend_type": snap.raw_backend_type, |
| }, |
| ) |
| ) |
|
|
| self._prev_snapshot_ids = current_ids |
| return deltas |
|
|
| def retrieve(self, query: str, top_k: int) -> RetrievalRecord: |
| backend = self._require_backend() |
| raw = backend.retriever.retrieve_context(query, user_id=backend.user_id) |
| items: list[RetrievalItem] = [] |
|
|
| for page in raw.get("retrieved_pages", []): |
| items.append( |
| RetrievalItem( |
| rank=len(items), |
| memory_id=f"page:{len(items)}", |
| text=self._format_qa_text(page), |
| score=1.0 / float(len(items) + 1), |
| raw_backend_id=page.get("page_id"), |
| ) |
| ) |
| for item in raw.get("retrieved_user_knowledge", []): |
| items.append( |
| RetrievalItem( |
| rank=len(items), |
| memory_id=f"user:{len(items)}", |
| text=str(item.get("knowledge", "")), |
| score=1.0 / float(len(items) + 1), |
| raw_backend_id=None, |
| ) |
| ) |
| for item in raw.get("retrieved_assistant_knowledge", []): |
| items.append( |
| RetrievalItem( |
| rank=len(items), |
| memory_id=f"assistant:{len(items)}", |
| text=str(item.get("knowledge", "")), |
| score=1.0 / float(len(items) + 1), |
| raw_backend_id=None, |
| ) |
| ) |
| return RetrievalRecord( |
| query=query, |
| top_k=top_k, |
| items=items[:top_k], |
| raw_trace={"baseline": _BACKEND_ID, "retrieved_at": raw.get("retrieved_at")}, |
| ) |
|
|
| def get_capabilities(self) -> dict[str, Any]: |
| available = self._backend is not None or self._backend_factory is not None |
| return { |
| "backend": _BACKEND_ID, |
| "baseline": _BACKEND_ID, |
| "available": available and self._integration_error is None, |
| "integration_status": "integrated" if available and self._integration_error is None else "unavailable", |
| "integration_error": self._integration_error or INTEGRATION_ERROR, |
| "delta_granularity": "ingest_pair_only", |
| "snapshot_mode": "short_mid_long_term", |
| } |
|
|
| def _require_backend(self) -> Any: |
| if self._backend is None: |
| raise self._runtime_error() |
| return self._backend |
|
|
| def _store_pair(self, assistant_turn: NormalizedTurn) -> None: |
| user_input = self._joined_user_text() |
| self._store_memory( |
| session_id=assistant_turn.session_id, |
| user_input=user_input, |
| agent_response=self._turn_text(assistant_turn), |
| timestamp=assistant_turn.timestamp, |
| ) |
| self._pending_user_turns = [] |
|
|
| def _store_memory( |
| self, |
| *, |
| session_id: str, |
| user_input: str, |
| agent_response: str, |
| timestamp: str | None, |
| ) -> None: |
| backend = self._require_backend() |
| backend.add_memory( |
| user_input=user_input, |
| agent_response=agent_response, |
| timestamp=timestamp, |
| meta_data={"session_id": session_id}, |
| ) |
|
|
| def _joined_user_text(self) -> str: |
| if not self._pending_user_turns: |
| return "" |
| return "\n".join(self._turn_text(turn) for turn in self._pending_user_turns) |
|
|
| @staticmethod |
| def _turn_text(turn: NormalizedTurn) -> str: |
| parts = [turn.text] |
| for att in turn.attachments: |
| parts.append(f"[{att.type}] {att.caption}") |
| return "\n".join(parts) |
|
|
| @staticmethod |
| def _format_qa_text(item: dict[str, Any]) -> str: |
| parts = [] |
| user_text = item.get("user_input", "") |
| if user_text: |
| parts.append(f"user: {user_text}") |
| assistant_text = item.get("agent_response", "") |
| if assistant_text: |
| parts.append(f"assistant: {assistant_text}") |
| if not parts: |
| parts.append(str(item)) |
| return "\n".join(parts) |
|
|