LCZZZZ's picture
Upload eval_framework source code
85b19cf verified
"""Adapter for the external MemoryOS baseline."""
from __future__ import annotations
import importlib
import os
import shutil
import sys
import tempfile
from pathlib import Path
from typing import Any, Callable
from eval_framework.datasets.schemas import (
MemoryDeltaRecord,
MemorySnapshotRecord,
NormalizedTurn,
RetrievalItem,
RetrievalRecord,
)
from eval_framework.memory_adapters.base import MemoryAdapter
_BACKEND_ID = "MemoryOS"
INTEGRATION_ERROR = (
f"{_BACKEND_ID} backend unavailable."
)
class MemoryOSAdapter(MemoryAdapter):
"""Thin wrapper around MemoryOS's local Python API."""
def __init__(
self,
*,
backend: Any | None = None,
backend_factory: Callable[[], Any] | None = None,
source_root: str | os.PathLike[str] | None = None,
storage_root: str | os.PathLike[str] | None = None,
user_id: str = "eval_user",
assistant_id: str = "eval_assistant",
llm_model: str | None = None,
embedding_model_name: str = "all-MiniLM-L6-v2",
openai_api_key: str | None = None,
openai_base_url: str | None = None,
) -> None:
self._source_root = Path(source_root).resolve() if source_root else self._default_source_root()
self._storage_root = Path(storage_root).resolve() if storage_root else Path(
tempfile.mkdtemp(prefix="memoryos_eval_")
)
self._user_id = user_id
self._assistant_id = assistant_id
self._llm_model = llm_model or os.getenv("OPENAI_MODEL") or "gpt-5.1"
self._embedding_model_name = embedding_model_name
self._openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
self._openai_base_url = openai_base_url or os.getenv("OPENAI_BASE_URL")
self._backend_factory = backend_factory
self._backend: Any | None = None
self._integration_error: str | None = None
self._session_id = ""
self._prev_snapshot_ids: set[str] = set()
self._pending_user_turns: list[NormalizedTurn] = []
if backend is not None:
self._backend = backend
else:
try:
if self._backend_factory is None:
self._backend_factory = self._build_backend_factory()
self._backend = self._backend_factory()
except Exception as exc:
self._integration_error = str(exc)
@staticmethod
def _default_source_root() -> Path:
here = Path(__file__).resolve()
# memory_adapters/ -> eval_framework/ -> nips26/ -> baselines/MemoryOS/memoryos-pypi
return (here.parents[2] / "baselines" / "MemoryOS" / "memoryos-pypi").resolve()
def _build_backend_factory(self) -> Callable[[], Any]:
if not self._source_root.is_dir():
raise RuntimeError(
f"{_BACKEND_ID}: source root not found at {self._source_root}"
)
src = str(self._source_root)
if src not in sys.path:
sys.path.insert(0, src)
mod = importlib.import_module("memoryos")
backend_cls = getattr(mod, "Memoryos")
def _factory() -> Any:
run_root = self._storage_root / "runtime"
shutil.rmtree(run_root, ignore_errors=True)
run_root.mkdir(parents=True, exist_ok=True)
return backend_cls(
user_id=self._user_id,
openai_api_key=self._openai_api_key or "",
openai_base_url=self._openai_base_url,
data_storage_path=str(run_root),
llm_model=self._llm_model,
assistant_id=self._assistant_id,
embedding_model_name=self._embedding_model_name,
)
return _factory
def _runtime_error(self) -> RuntimeError:
detail = self._integration_error or INTEGRATION_ERROR
return RuntimeError(
f"{_BACKEND_ID}: backend unavailable — {detail}"
)
def reset(self) -> None:
if self._backend_factory is None and self._backend is None:
raise self._runtime_error()
if self._backend_factory is not None:
self._backend = self._backend_factory()
self._prev_snapshot_ids = set()
self._pending_user_turns = []
self._session_id = ""
def ingest_turn(self, turn: NormalizedTurn) -> None:
self._require_backend()
self._session_id = turn.session_id
if turn.role == "assistant":
self._store_pair(turn)
else:
self._pending_user_turns.append(turn)
def end_session(self, session_id: str) -> None:
self._require_backend()
self._session_id = session_id
if self._pending_user_turns:
synthetic = self._pending_user_turns[-1]
self._store_memory(
session_id=session_id,
user_input=self._joined_user_text(),
agent_response="",
timestamp=synthetic.timestamp,
)
self._pending_user_turns = []
def snapshot_memories(self) -> list[MemorySnapshotRecord]:
backend = self._require_backend()
rows: list[MemorySnapshotRecord] = []
sid = self._session_id
for idx, qa in enumerate(backend.short_term_memory.get_all()):
rows.append(
MemorySnapshotRecord(
memory_id=f"st:{idx}",
text=self._format_qa_text(qa),
session_id=sid,
status="active",
source=_BACKEND_ID,
raw_backend_id=f"st:{idx}",
raw_backend_type="short_term",
metadata={"timestamp": qa.get("timestamp")},
)
)
for internal_session_id, session in getattr(backend.mid_term_memory, "sessions", {}).items():
for page_idx, page in enumerate(session.get("details", [])):
rows.append(
MemorySnapshotRecord(
memory_id=f"mt:{internal_session_id}:{page_idx}",
text=self._format_qa_text(page),
session_id=sid,
status="active",
source=_BACKEND_ID,
raw_backend_id=str(page.get("page_id", f"{internal_session_id}:{page_idx}")),
raw_backend_type="mid_term_page",
metadata={"memoryos_session_id": internal_session_id},
)
)
user_profile = backend.user_long_term_memory.get_raw_user_profile(backend.user_id)
if user_profile and str(user_profile).lower() != "none":
rows.append(
MemorySnapshotRecord(
memory_id="lt:user_profile",
text=str(user_profile),
session_id=sid,
status="active",
source=_BACKEND_ID,
raw_backend_id="user_profile",
raw_backend_type="user_profile",
metadata={},
)
)
for idx, item in enumerate(backend.user_long_term_memory.get_user_knowledge()):
rows.append(
MemorySnapshotRecord(
memory_id=f"lt:user:{idx}",
text=str(item.get("knowledge", "")),
session_id=sid,
status="active",
source=_BACKEND_ID,
raw_backend_id=f"user:{idx}",
raw_backend_type="user_knowledge",
metadata={"timestamp": item.get("timestamp")},
)
)
assistant_ltm = getattr(backend, "assistant_long_term_memory", None)
if assistant_ltm is not None and hasattr(assistant_ltm, "get_assistant_knowledge"):
for idx, item in enumerate(assistant_ltm.get_assistant_knowledge()):
rows.append(
MemorySnapshotRecord(
memory_id=f"lt:assistant:{idx}",
text=str(item.get("knowledge", "")),
session_id=sid,
status="active",
source=_BACKEND_ID,
raw_backend_id=f"assistant:{idx}",
raw_backend_type="assistant_knowledge",
metadata={"timestamp": item.get("timestamp")},
)
)
return rows
def export_memory_delta(self, session_id: str) -> list[MemoryDeltaRecord]:
"""Export delta by diffing current snapshot against previous snapshot."""
self._require_backend()
current_snapshot = self.snapshot_memories()
deltas: list[MemoryDeltaRecord] = []
current_ids: set[str] = set()
for snap in current_snapshot:
current_ids.add(snap.memory_id)
if snap.memory_id not in self._prev_snapshot_ids:
deltas.append(
MemoryDeltaRecord(
session_id=session_id,
op="add",
text=snap.text,
linked_previous=(),
raw_backend_id=snap.raw_backend_id,
metadata={
"baseline": _BACKEND_ID,
"backend_type": snap.raw_backend_type,
},
)
)
self._prev_snapshot_ids = current_ids
return deltas
def retrieve(self, query: str, top_k: int) -> RetrievalRecord:
backend = self._require_backend()
raw = backend.retriever.retrieve_context(query, user_id=backend.user_id)
items: list[RetrievalItem] = []
for page in raw.get("retrieved_pages", []):
items.append(
RetrievalItem(
rank=len(items),
memory_id=f"page:{len(items)}",
text=self._format_qa_text(page),
score=1.0 / float(len(items) + 1),
raw_backend_id=page.get("page_id"),
)
)
for item in raw.get("retrieved_user_knowledge", []):
items.append(
RetrievalItem(
rank=len(items),
memory_id=f"user:{len(items)}",
text=str(item.get("knowledge", "")),
score=1.0 / float(len(items) + 1),
raw_backend_id=None,
)
)
for item in raw.get("retrieved_assistant_knowledge", []):
items.append(
RetrievalItem(
rank=len(items),
memory_id=f"assistant:{len(items)}",
text=str(item.get("knowledge", "")),
score=1.0 / float(len(items) + 1),
raw_backend_id=None,
)
)
return RetrievalRecord(
query=query,
top_k=top_k,
items=items[:top_k],
raw_trace={"baseline": _BACKEND_ID, "retrieved_at": raw.get("retrieved_at")},
)
def get_capabilities(self) -> dict[str, Any]:
available = self._backend is not None or self._backend_factory is not None
return {
"backend": _BACKEND_ID,
"baseline": _BACKEND_ID,
"available": available and self._integration_error is None,
"integration_status": "integrated" if available and self._integration_error is None else "unavailable",
"integration_error": self._integration_error or INTEGRATION_ERROR,
"delta_granularity": "ingest_pair_only",
"snapshot_mode": "short_mid_long_term",
}
def _require_backend(self) -> Any:
if self._backend is None:
raise self._runtime_error()
return self._backend
def _store_pair(self, assistant_turn: NormalizedTurn) -> None:
user_input = self._joined_user_text()
self._store_memory(
session_id=assistant_turn.session_id,
user_input=user_input,
agent_response=self._turn_text(assistant_turn),
timestamp=assistant_turn.timestamp,
)
self._pending_user_turns = []
def _store_memory(
self,
*,
session_id: str,
user_input: str,
agent_response: str,
timestamp: str | None,
) -> None:
backend = self._require_backend()
backend.add_memory(
user_input=user_input,
agent_response=agent_response,
timestamp=timestamp,
meta_data={"session_id": session_id},
)
def _joined_user_text(self) -> str:
if not self._pending_user_turns:
return ""
return "\n".join(self._turn_text(turn) for turn in self._pending_user_turns)
@staticmethod
def _turn_text(turn: NormalizedTurn) -> str:
parts = [turn.text]
for att in turn.attachments:
parts.append(f"[{att.type}] {att.caption}")
return "\n".join(parts)
@staticmethod
def _format_qa_text(item: dict[str, Any]) -> str:
parts = []
user_text = item.get("user_input", "")
if user_text:
parts.append(f"user: {user_text}")
assistant_text = item.get("agent_response", "")
if assistant_text:
parts.append(f"assistant: {assistant_text}")
if not parts:
parts.append(str(item))
return "\n".join(parts)