Your LangGraph agent works perfectly in development. Then it hits production and you discover the problem every agent developer eventually hits: when the process restarts, your agent remembers nothing.

In-memory state is fine for demos and local testing. For production agents — especially those handling multi-step workflows that can span hours, serve concurrent users, or need to resume after infrastructure failures — you need persistent state. This guide walks through adding Aerospike Database 8 as a durable memory store for your LangGraph agent.

What You’ll Build

By the end of this guide, your LangGraph agent will:

  • Persist all graph state to Aerospike, surviving process restarts
  • Resume from the last checkpoint after a failure without losing context
  • Support concurrent sessions without state conflicts
  • Maintain cross-session memory for returning users

Prerequisites

Before starting, you’ll need:

  • Python 3.10+
  • A working LangGraph agent (this guide assumes you have a basic graph defined)
  • Aerospike Database 8 (community edition works; download from aerospike.com)
  • aerospike Python client (version 10.0+)
  • langgraph 0.2.x or later

Step 1: Install and Configure Aerospike

If you’re running Aerospike locally for development, the quickest path is Docker:

docker pull aerospike/aerospike-server:8.0
docker run -d --name aerospike \
  -p 3000:3000 -p 3001:3001 -p 3002:3002 \
  aerospike/aerospike-server:8.0

For production, follow Aerospike’s deployment guide for your infrastructure. The community edition supports everything needed for agent memory workloads.

Install the Python client:

pip install aerospike==10.0.0

Step 2: Create the Aerospike Memory Store

LangGraph’s checkpointing system uses a BaseCheckpointSaver interface. We’ll implement a custom saver that writes to Aerospike.

import aerospike
import json
import pickle
from typing import Any, Iterator, Optional, Sequence, Tuple
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointMetadata

class AerospikeCheckpointSaver(BaseCheckpointSaver):
    """Aerospike-backed checkpoint saver for LangGraph agents."""
    
    def __init__(
        self, 
        hosts: list[tuple[str, int]],
        namespace: str = "langgraph",
        set_name: str = "checkpoints"
    ):
        self.client = aerospike.client({"hosts": hosts}).connect()
        self.namespace = namespace
        self.set_name = set_name
        
    def _make_key(self, thread_id: str, checkpoint_id: str) -> tuple:
        """Build the Aerospike key tuple."""
        record_key = f"{thread_id}:{checkpoint_id}"
        return (self.namespace, self.set_name, record_key)
    
    def get(self, config: RunnableConfig) -> Optional[Checkpoint]:
        """Retrieve the latest checkpoint for a thread."""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = config["configurable"].get("checkpoint_id")
        
        if checkpoint_id:
            key = self._make_key(thread_id, checkpoint_id)
            try:
                _, _, record = self.client.get(key)
                return pickle.loads(record["checkpoint"])
            except aerospike.exception.RecordNotFound:
                return None
        
        # Get the latest checkpoint by scanning
        return self._get_latest(thread_id)
    
    def _get_latest(self, thread_id: str) -> Optional[Checkpoint]:
        """Retrieve the most recent checkpoint for a thread."""
        scan = self.client.scan(self.namespace, self.set_name)
        scan.select("checkpoint", "thread_id", "created_at")
        
        results = []
        def collect(record):
            if record[2].get("thread_id") == thread_id:
                results.append(record)
        
        scan.foreach(collect)
        
        if not results:
            return None
            
        # Sort by creation time, return the latest
        latest = max(results, key=lambda r: r[2].get("created_at", 0))
        return pickle.loads(latest[2]["checkpoint"])
    
    def put(
        self, 
        config: RunnableConfig, 
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata
    ) -> RunnableConfig:
        """Save a checkpoint to Aerospike."""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = checkpoint["id"]
        
        key = self._make_key(thread_id, checkpoint_id)
        bins = {
            "checkpoint": pickle.dumps(checkpoint),
            "metadata": json.dumps(metadata),
            "thread_id": thread_id,
            "checkpoint_id": checkpoint_id,
            "created_at": checkpoint.get("ts", 0)
        }
        
        # Write with TTL (optional — set to 0 for no expiration)
        write_policy = {"ttl": 0}
        self.client.put(key, bins, policy=write_policy)
        
        return {
            "configurable": {
                "thread_id": thread_id,
                "checkpoint_id": checkpoint_id
            }
        }
    
    def list(
        self,
        config: Optional[RunnableConfig],
        *,
        filter: Optional[dict[str, Any]] = None,
        before: Optional[RunnableConfig] = None,
        limit: Optional[int] = None
    ) -> Iterator[Checkpoint]:
        """List checkpoints for a thread."""
        thread_id = config["configurable"]["thread_id"] if config else None
        scan = self.client.scan(self.namespace, self.set_name)
        scan.select("checkpoint", "thread_id", "created_at")
        
        results = []
        def collect(record):
            if thread_id is None or record[2].get("thread_id") == thread_id:
                results.append(record)
        
        scan.foreach(collect)
        results.sort(key=lambda r: r[2].get("created_at", 0), reverse=True)
        
        if limit:
            results = results[:limit]
            
        for record in results:
            yield pickle.loads(record[2]["checkpoint"])
    
    def close(self):
        """Clean up the Aerospike connection."""
        self.client.close()

Step 3: Connect It to Your LangGraph Agent

Here’s how to integrate the Aerospike saver with an existing LangGraph graph:

from langgraph.graph import StateGraph, END
from langgraph.checkpoint import MemorySaver
from typing import TypedDict, Annotated
import operator

# Your existing state definition
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    memory: dict
    session_id: str

# Initialize the Aerospike checkpoint saver
aerospike_memory = AerospikeCheckpointSaver(
    hosts=[("127.0.0.1", 3000)],  # Your Aerospike hosts
    namespace="langgraph",
    set_name="agent_checkpoints"
)

# Build your graph as normal
def build_graph(saver):
    workflow = StateGraph(AgentState)
    
    # Add your nodes
    workflow.add_node("agent", your_agent_function)
    workflow.add_node("tools", your_tools_function)
    
    # Add your edges
    workflow.set_entry_point("agent")
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {"continue": "tools", "end": END}
    )
    workflow.add_edge("tools", "agent")
    
    # Compile with the Aerospike checkpoint saver
    return workflow.compile(checkpointer=saver)

graph = build_graph(aerospike_memory)

Step 4: Run Your Agent with Persistent State

With the Aerospike saver connected, every execution automatically checkpoints to Aerospike:

# Thread ID is the key for persistent memory
# Same thread_id = same memory across sessions and restarts
thread_config = {
    "configurable": {
        "thread_id": "user-123-session"
    }
}

# First run
result = graph.invoke(
    {"messages": [{"role": "user", "content": "Remember my name is Alex."}]},
    config=thread_config
)

# Process terminates here — all state saved to Aerospike

# Later, in a new process or after a restart
result = graph.invoke(
    {"messages": [{"role": "user", "content": "What's my name?"}]},
    config=thread_config  # Same thread_id retrieves previous state
)
# Agent correctly recalls "Alex" from Aerospike

Step 5: Handle Cross-Session Memory

For agents that should remember information across conversations — not just within a session — you’ll want a stable thread_id pattern per user:

def get_user_thread_id(user_id: str) -> str:
    """Generate a stable thread ID for a user."""
    return f"user:{user_id}:main"

def get_session_thread_id(user_id: str, session_id: str) -> str:
    """Generate a session-specific thread ID."""
    return f"user:{user_id}:session:{session_id}"

Using user:{user_id}:main as the thread ID gives your agent persistent memory that survives across sessions. Using a session-specific ID gives you per-conversation memory that’s isolated from other conversations.

Step 6: Configure Aerospike for Production

For production deployments, tune these Aerospike settings:

# aerospike.conf
service {
    work-threads 8
    transaction-queues 8
    transaction-threads-per-queue 4
}

namespace langgraph {
    replication-factor 2          # Replicate across 2 nodes
    memory-size 4G                # RAM for indexes
    default-ttl 0                 # Never expire checkpoints
    storage-engine device {       # Flash storage for data
        file /data/aerospike.dat
        filesize 100G
        write-block-size 128K
    }
}

The default-ttl 0 setting is important for agent memory — you generally don’t want checkpoints expiring automatically. If you need expiration (for privacy or storage reasons), set default-ttl to the number of seconds you want checkpoints to persist.

Testing Your Implementation

Run a quick test to verify persistence is working:

# First terminal — run your agent
python your_agent.py --thread-id test-001 --message "Remember: the answer is 42."

# Kill the process (Ctrl+C or kill the container)
# Wait a moment

# Start a new process
python your_agent.py --thread-id test-001 --message "What answer should you remember?"

# The agent should respond with 42, retrieved from Aerospike

Troubleshooting

Connection refused: Check that Aerospike is running on port 3000 and that your hosts config matches. asinfo -v STATUS will confirm the daemon is healthy.

RecordNotFound on get: Normal for a new thread_id with no prior checkpoints. Your agent should handle None returns from get() as a fresh state.

Slow reads: If you’re seeing higher-than-expected latency, check that your namespace is using SSD storage (not rotating disk) and that the memory-size setting is large enough to hold your working index set in RAM.

Pickling errors: Some LangGraph state types don’t serialize cleanly with pickle. Use json serialization for state that contains standard Python types, and reserve pickle for state that includes non-serializable objects (like model outputs).


Aerospike Database 8 with LangGraph integration is available now. This guide uses community edition, which is free for most workloads. Commercial features (multi-datacenter replication, enhanced security) are available in Aerospike Enterprise.