Your LangGraph agent works perfectly in development. Then it hits production and you discover the problem every agent developer eventually hits: when the process restarts, your agent remembers nothing.
In-memory state is fine for demos and local testing. For production agents — especially those handling multi-step workflows that can span hours, serve concurrent users, or need to resume after infrastructure failures — you need persistent state. This guide walks through adding Aerospike Database 8 as a durable memory store for your LangGraph agent.
What You’ll Build
By the end of this guide, your LangGraph agent will:
- Persist all graph state to Aerospike, surviving process restarts
- Resume from the last checkpoint after a failure without losing context
- Support concurrent sessions without state conflicts
- Maintain cross-session memory for returning users
Prerequisites
Before starting, you’ll need:
- Python 3.10+
- A working LangGraph agent (this guide assumes you have a basic graph defined)
- Aerospike Database 8 (community edition works; download from aerospike.com)
aerospikePython client (version 10.0+)langgraph0.2.x or later
Step 1: Install and Configure Aerospike
If you’re running Aerospike locally for development, the quickest path is Docker:
docker pull aerospike/aerospike-server:8.0
docker run -d --name aerospike \
-p 3000:3000 -p 3001:3001 -p 3002:3002 \
aerospike/aerospike-server:8.0
For production, follow Aerospike’s deployment guide for your infrastructure. The community edition supports everything needed for agent memory workloads.
Install the Python client:
pip install aerospike==10.0.0
Step 2: Create the Aerospike Memory Store
LangGraph’s checkpointing system uses a BaseCheckpointSaver interface. We’ll implement a custom saver that writes to Aerospike.
import aerospike
import json
import pickle
from typing import Any, Iterator, Optional, Sequence, Tuple
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointMetadata
class AerospikeCheckpointSaver(BaseCheckpointSaver):
"""Aerospike-backed checkpoint saver for LangGraph agents."""
def __init__(
self,
hosts: list[tuple[str, int]],
namespace: str = "langgraph",
set_name: str = "checkpoints"
):
self.client = aerospike.client({"hosts": hosts}).connect()
self.namespace = namespace
self.set_name = set_name
def _make_key(self, thread_id: str, checkpoint_id: str) -> tuple:
"""Build the Aerospike key tuple."""
record_key = f"{thread_id}:{checkpoint_id}"
return (self.namespace, self.set_name, record_key)
def get(self, config: RunnableConfig) -> Optional[Checkpoint]:
"""Retrieve the latest checkpoint for a thread."""
thread_id = config["configurable"]["thread_id"]
checkpoint_id = config["configurable"].get("checkpoint_id")
if checkpoint_id:
key = self._make_key(thread_id, checkpoint_id)
try:
_, _, record = self.client.get(key)
return pickle.loads(record["checkpoint"])
except aerospike.exception.RecordNotFound:
return None
# Get the latest checkpoint by scanning
return self._get_latest(thread_id)
def _get_latest(self, thread_id: str) -> Optional[Checkpoint]:
"""Retrieve the most recent checkpoint for a thread."""
scan = self.client.scan(self.namespace, self.set_name)
scan.select("checkpoint", "thread_id", "created_at")
results = []
def collect(record):
if record[2].get("thread_id") == thread_id:
results.append(record)
scan.foreach(collect)
if not results:
return None
# Sort by creation time, return the latest
latest = max(results, key=lambda r: r[2].get("created_at", 0))
return pickle.loads(latest[2]["checkpoint"])
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata
) -> RunnableConfig:
"""Save a checkpoint to Aerospike."""
thread_id = config["configurable"]["thread_id"]
checkpoint_id = checkpoint["id"]
key = self._make_key(thread_id, checkpoint_id)
bins = {
"checkpoint": pickle.dumps(checkpoint),
"metadata": json.dumps(metadata),
"thread_id": thread_id,
"checkpoint_id": checkpoint_id,
"created_at": checkpoint.get("ts", 0)
}
# Write with TTL (optional — set to 0 for no expiration)
write_policy = {"ttl": 0}
self.client.put(key, bins, policy=write_policy)
return {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_id
}
}
def list(
self,
config: Optional[RunnableConfig],
*,
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None
) -> Iterator[Checkpoint]:
"""List checkpoints for a thread."""
thread_id = config["configurable"]["thread_id"] if config else None
scan = self.client.scan(self.namespace, self.set_name)
scan.select("checkpoint", "thread_id", "created_at")
results = []
def collect(record):
if thread_id is None or record[2].get("thread_id") == thread_id:
results.append(record)
scan.foreach(collect)
results.sort(key=lambda r: r[2].get("created_at", 0), reverse=True)
if limit:
results = results[:limit]
for record in results:
yield pickle.loads(record[2]["checkpoint"])
def close(self):
"""Clean up the Aerospike connection."""
self.client.close()
Step 3: Connect It to Your LangGraph Agent
Here’s how to integrate the Aerospike saver with an existing LangGraph graph:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint import MemorySaver
from typing import TypedDict, Annotated
import operator
# Your existing state definition
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
memory: dict
session_id: str
# Initialize the Aerospike checkpoint saver
aerospike_memory = AerospikeCheckpointSaver(
hosts=[("127.0.0.1", 3000)], # Your Aerospike hosts
namespace="langgraph",
set_name="agent_checkpoints"
)
# Build your graph as normal
def build_graph(saver):
workflow = StateGraph(AgentState)
# Add your nodes
workflow.add_node("agent", your_agent_function)
workflow.add_node("tools", your_tools_function)
# Add your edges
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{"continue": "tools", "end": END}
)
workflow.add_edge("tools", "agent")
# Compile with the Aerospike checkpoint saver
return workflow.compile(checkpointer=saver)
graph = build_graph(aerospike_memory)
Step 4: Run Your Agent with Persistent State
With the Aerospike saver connected, every execution automatically checkpoints to Aerospike:
# Thread ID is the key for persistent memory
# Same thread_id = same memory across sessions and restarts
thread_config = {
"configurable": {
"thread_id": "user-123-session"
}
}
# First run
result = graph.invoke(
{"messages": [{"role": "user", "content": "Remember my name is Alex."}]},
config=thread_config
)
# Process terminates here — all state saved to Aerospike
# Later, in a new process or after a restart
result = graph.invoke(
{"messages": [{"role": "user", "content": "What's my name?"}]},
config=thread_config # Same thread_id retrieves previous state
)
# Agent correctly recalls "Alex" from Aerospike
Step 5: Handle Cross-Session Memory
For agents that should remember information across conversations — not just within a session — you’ll want a stable thread_id pattern per user:
def get_user_thread_id(user_id: str) -> str:
"""Generate a stable thread ID for a user."""
return f"user:{user_id}:main"
def get_session_thread_id(user_id: str, session_id: str) -> str:
"""Generate a session-specific thread ID."""
return f"user:{user_id}:session:{session_id}"
Using user:{user_id}:main as the thread ID gives your agent persistent memory that survives across sessions. Using a session-specific ID gives you per-conversation memory that’s isolated from other conversations.
Step 6: Configure Aerospike for Production
For production deployments, tune these Aerospike settings:
# aerospike.conf
service {
work-threads 8
transaction-queues 8
transaction-threads-per-queue 4
}
namespace langgraph {
replication-factor 2 # Replicate across 2 nodes
memory-size 4G # RAM for indexes
default-ttl 0 # Never expire checkpoints
storage-engine device { # Flash storage for data
file /data/aerospike.dat
filesize 100G
write-block-size 128K
}
}
The default-ttl 0 setting is important for agent memory — you generally don’t want checkpoints expiring automatically. If you need expiration (for privacy or storage reasons), set default-ttl to the number of seconds you want checkpoints to persist.
Testing Your Implementation
Run a quick test to verify persistence is working:
# First terminal — run your agent
python your_agent.py --thread-id test-001 --message "Remember: the answer is 42."
# Kill the process (Ctrl+C or kill the container)
# Wait a moment
# Start a new process
python your_agent.py --thread-id test-001 --message "What answer should you remember?"
# The agent should respond with 42, retrieved from Aerospike
Troubleshooting
Connection refused: Check that Aerospike is running on port 3000 and that your hosts config matches. asinfo -v STATUS will confirm the daemon is healthy.
RecordNotFound on get: Normal for a new thread_id with no prior checkpoints. Your agent should handle None returns from get() as a fresh state.
Slow reads: If you’re seeing higher-than-expected latency, check that your namespace is using SSD storage (not rotating disk) and that the memory-size setting is large enough to hold your working index set in RAM.
Pickling errors: Some LangGraph state types don’t serialize cleanly with pickle. Use json serialization for state that contains standard Python types, and reserve pickle for state that includes non-serializable objects (like model outputs).
Aerospike Database 8 with LangGraph integration is available now. This guide uses community edition, which is free for most workloads. Commercial features (multi-datacenter replication, enhanced security) are available in Aerospike Enterprise.