New from O’Reilly: The memory architecture behind adaptive AI agents

Read the report
For developersWhat is Agent Memory? Example using LangGraph and Redis

What is Agent Memory? Example using LangGraph and Redis

This notebook demonstrates how to manage short-term and long-term agent memory using LangGraph and Redis. We'll explore:

  1. 1.Short-term memory management using LangGraph's checkpointer
  2. 2.Long-term memory storage and retrieval using RedisVL
  3. 3.Managing long-term memory manually vs. exposing tool access (AKA function-calling)
  4. 4.Managing conversation history size with summarization
  5. 5.Memory consolidation

What we'll build#

We're going to build two versions of a travel agent, one that manages long-term memory manually and one that does so using tools the LLM calls.

Here are two diagrams showing the components used in both agents:

Setup#

Packages

pip install -q langchain-openai langgraph-checkpoint langgraph-checkpoint-redis "langchain-community>=0.2.11" tavily-python langchain-redis pydantic ulid

Required API keys#

You must add an OpenAI API key with billing information for this lesson. You will also need a Tavily API key. Tavily API keys come with free credits at the time of this writing.

# NBVAL_SKIP import getpass import os
def _set_env(key: str): if key not in os.environ: os.environ[key] = getpass.getpass(f"{key}:")
_set_env("OPENAI_API_KEY")
# Uncomment this if you have a Tavily API key and want to # use the web search tool. # _set_env("TAVILY_API_KEY")

Run Redis#

For Colab#

Convert the following cell to Python to run it in Colab.

# Exit if this is not running in Colab if [ -z "$COLAB_RELEASE_TAG" ]; then exit 0 fi
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list sudo apt-get update > /dev/null 2>&1 sudo apt-get install redis-stack-server > /dev/null 2>&1 redis-stack-server --daemonize yes

For alternative environments#

There are many ways to get the necessary redis-stack instance running

  1. 1.On cloud, deploy a FREE instance of Redis in the cloud. Or, if you have your own version of Redis Software running, that works too!
  2. 2.Per OS, see the docs

With docker: docker run -d --name redis-stack-server -p 6379:6379 redis/redis-stack-server:latest

Test connection to Redis#

import os from redis import Redis
# Use the environment variable if set, otherwise default to localhost REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
redis_client = Redis.from_url(REDIS_URL) redis_client.ping()

Short-term vs. long-term memory#

The agent uses short-term memory and long-term memory. The implementations of short-term and long-term memory differ, as does how the agent uses them. Let's dig into the details. We'll return to code soon.

Short-term memory

For short-term memory, the agent keeps track of conversation history with Redis. Because this is a LangGraph agent, we use the RedisSaver class to achieve this. RedisSaver is what LangGraph refers to as a . You can read more about checkpointers in the LangGraph documentation. In short, they store state for each node in the graph, which for this agent includes conversation history.

Here's a diagram showing how the agent uses Redis for short-term memory. Each node in the graph (Retrieve Users, Respond, Summarize Conversation) persists its "state" to Redis. The state object contains the agent's message conversation history for the current thread.

If Redis persistence is on, then Redis will persist short-term memory to disk. This means if you quit the agent and return with the same thread ID and user ID, you'll resume the same conversation.

Conversation histories can grow long and pollute an LLM's context window. To manage this, after every "turn" of a conversation, the agent summarizes messages when the conversation grows past a configurable threshold. Checkpointers do not do this by default, so we've created a node in the graph for summarization.

NOTE: We'll see example code for the summarization node later in this notebook.

Long-term memory

Aside from conversation history, the agent stores long-term memories in a search index in Redis, using RedisVL. Here's a diagram showing the components involved:

The agent tracks two types of long-term memories:

  • Episodic: User-specific experiences and preferences
  • Semantic: General knowledge about travel destinations and requirements

NOTE If you're familiar with the CoALA paper, the terms "episodic" and "semantic" here map to the same concepts in the paper. CoALA discusses a third type of memory, . In our example, we consider logic encoded in Python in the agent codebase to be its procedural memory.

Representing long-term memory in python

We use a couple of Pydantic models to represent long-term memories, both before and after they're stored in Redis:

from datetime import datetime from enum import Enum from typing import List, Optional
from pydantic import BaseModel, Field import ulid
class MemoryType(str, Enum): """ The type of a long-term memory. EPISODIC: User specific experiences and preferences SEMANTIC: General knowledge on top of the user's preferences and LLM's training data. """
EPISODIC = "episodic" SEMANTIC = "semantic"
class Memory(BaseModel): """Represents a single long-term memory."""
content: str memory_type: MemoryType metadata: str
class Memories(BaseModel): """ A list of memories extracted from a conversation by an LLM. NOTE: OpenAI's structured output requires us to wrap the list in an object. """ memories: List[Memory]
class StoredMemory(Memory): """A stored long-term memory"""
id: str # The redis key memory_id: ulid.ULID = Field(default_factory=lambda: ulid.ULID()) created_at: datetime = Field(default_factory=datetime.now) user_id: Optional[str] = None thread_id: Optional[str] = None memory_type: Optional[MemoryType] = None
class MemoryStrategy(str, Enum): """ Supported strategies for managing long-term memory. This notebook supports two strategies for working with long-term memory: TOOLS: The LLM decides when to store and retrieve long-term memories, using tools (AKA, function-calling) to do so. MANUAL: The agent manually retrieves long-term memories relevant to the current conversation before sending every message and analyzes every response to extract memories to store. NOTE: In both cases, the agent runs a background thread to consolidate memories, and a workflow step to summarize conversations after the history grows past a threshold. """
TOOLS = "tools" MANUAL = "manual"
# By default, we'll use the manual strategy memory_strategy = MemoryStrategy.MANUAL

We'll return to these models soon to see them in action.

Short-term memory storage and retrieval#

The RedisSaver class handles the basics of short-term memory storage for us, so we don't need to do anything here.

Long-term memory storage and retrieval#

We use RedisVL to store and retrieve long-term memories with vector embeddings. This allows for semantic search of past experiences and knowledge.

Let's set up a new search index to store and query memories:

from redisvl.index import SearchIndex from redisvl.schema.schema import IndexSchema
# Define schema for long-term memory index memory_schema = IndexSchema.from_dict({ "index": { "name": "agent_memories", "prefix": "memory:", "key_separator": ":", "storage_type": "json", }, "fields": [ {"name": "content", "type": "text"}, {"name": "memory_type", "type": "tag"}, {"name": "metadata", "type": "text"}, {"name": "created_at", "type": "text"}, {"name": "user_id", "type": "tag"}, {"name": "memory_id", "type": "tag"}, { "name": "embedding", "type": "vector", "attrs": { "algorithm": "flat", "dims": 1536, # OpenAI embedding dimension "distance_metric": "cosine", "datatype": "float32", }, }, ], } )
# Create search index try: long_term_memory_index = SearchIndex( schema=memory_schema, redis_client=redis_client, overwrite=True ) long_term_memory_index.create() print("Long-term memory index ready") except Exception as e: print(f"Error creating index: {e}")

Storage and retrieval functions

Now that we have a search index in Redis, we can write functions to store and retrieve memories. We can use RedisVL to write these.

First, we'll write a utility function to check if a memory similar to a given memory already exists in the index. Later, we can use this to avoid storing duplicate memories.

Checking for similar memories

import logging
from redisvl.query import VectorRangeQuery from redisvl.query.filter import Tag from redisvl.utils.vectorize.text.openai import OpenAITextVectorizer

logger = logging.getLogger(__name__)
# If we have any memories that aren't associated with a user, we'll use this ID. SYSTEM_USER_ID = "system"
openai_embed = OpenAITextVectorizer(model="text-embedding-ada-002")
# Change this to MemoryStrategy.TOOLS to use function-calling to store and # retrieve memories. memory_strategy = MemoryStrategy.MANUAL

def similar_memory_exists( content: str, memory_type: MemoryType, user_id: str = SYSTEM_USER_ID, thread_id: Optional[str] = None, distance_threshold: float = 0.1, ) -> bool: """Check if a similar long-term memory already exists in Redis.""" query_embedding = openai_embed.embed(content) filters = (Tag("user_id") == user_id) & (Tag("memory_type") == memory_type) if thread_id: filters = filters & (Tag("thread_id") == thread_id)
# Search for similar memories vector_query = VectorRangeQuery( vector=query_embedding, num_results=1, vector_field_name="embedding", filter_expression=filters, distance_threshold=distance_threshold, return_fields=["id"], ) results = long_term_memory_index.query(vector_query) logger.debug(f"Similar memory search results: {results}")
if results: logger.debug( f"{len(results)} similar {'memory' if results.count == 1 else 'memories'} found. First: " f"{results[0]['id']}. Skipping storage." ) return True
return False

Storing and retrieving long-term memories

We'll use the similar_memory_exists() function when we store memories:

from datetime import datetime from typing import List, Optional, Union
import ulid

def store_memory( content: str, memory_type: MemoryType, user_id: str = SYSTEM_USER_ID, thread_id: Optional[str] = None, metadata: Optional[str] = None, ): """Store a long-term memory in Redis, avoiding duplicates.""" if metadata is None: metadata = "{}"
logger.info(f"Preparing to store memory: {content}")
if similar_memory_exists(content, memory_type, user_id, thread_id): logger.info("Similar memory found, skipping storage") return
embedding = openai_embed.embed(content)
memory_data = { "user_id": user_id or SYSTEM_USER_ID, "content": content, "memory_type": memory_type.value, "metadata": metadata, "created_at": datetime.now().isoformat(), "embedding": embedding, "memory_id": str(ulid.ULID()), "thread_id": thread_id, }
try: long_term_memory_index.load([memory_data]) except Exception as e: logger.error(f"Error storing memory: {e}") return
logger.info(f"Stored {memory_type} memory: {content}")

And now that we're storing memories, we can retrieve them:

def retrieve_memories( query: str, memory_type: Union[Optional[MemoryType], List[MemoryType]] = None, user_id: str = SYSTEM_USER_ID, thread_id: Optional[str] = None, distance_threshold: float = 0.1, limit: int = 5, ) -> List[StoredMemory]: """Retrieve relevant memories from Redis""" # Create vector query logger.debug(f"Retrieving memories for query: {query}") vector_query = VectorRangeQuery( vector=openai_embed.embed(query), return_fields=[ "content", "memory_type", "metadata", "created_at", "memory_id", "thread_id", "user_id", ], num_results=limit, vector_field_name="embedding", dialect=2, distance_threshold=distance_threshold, )
base_filters = [f"@user_id:{{{user_id or SYSTEM_USER_ID}}}"]
if memory_type: if isinstance(memory_type, list): base_filters.append(f"@memory_type:{{{'|'.join(memory_type)}}}") else: base_filters.append(f"@memory_type:{{{memory_type.value}}}")
if thread_id: base_filters.append(f"@thread_id:{{{thread_id}}}")
vector_query.set_filter(" ".join(base_filters))
# Execute search results = long_term_memory_index.query(vector_query)
# Parse results memories = [] for doc in results: try: memory = StoredMemory( id=doc["id"], memory_id=doc["memory_id"], user_id=doc["user_id"], thread_id=doc.get("thread_id", None), memory_type=MemoryType(doc["memory_type"]), content=doc["content"], created_at=doc["created_at"], metadata=doc["metadata"], ) memories.append(memory) except Exception as e: logger.error(f"Error parsing memory: {e}") continue return memories

Managing long-term memory manually vs. calling tools#

While making LLM queries, agents can store and retrieve relevant long-term memories in one of two ways (and more, but these are the two we'll discuss):

  1. 1.Expose memory retrieval and storage as "tools" that the LLM can decide to call contextually.
  2. 2.Manually augment prompts with relevant memories, and manually extract and store relevant memories.

These approaches both have tradeoffs.

Tool-calling leaves the decision to store a memory or find relevant memories up to the LLM. This can add latency to requests. It will generally result in fewer calls to Redis but will also sometimes miss out on retrieving potentially relevant context and/or extracting relevant memories from a conversation.

Manual memory management will result in more calls to Redis but will produce fewer round-trip LLM requests, reducing latency. Manually extracting memories will generally extract more memories than tool calls, which will store more data in Redis and should result in more context added to LLM requests. More context means more contextual awareness but also higher token spend.

You can test both approaches with this agent by changing the memory_strategy variable.

Managing memory manually#

With the manual memory management strategy, we're going to extract memories after every interaction between the user and the agent. We're then going to retrieve those memories during future interactions before we send the query.

Extracting memories

We'll call this extract_memories function manually after each interaction:

from langchain_core.messages import HumanMessage from langchain_core.runnables.config import RunnableConfig from langchain_openai import ChatOpenAI from langgraph.graph.message import MessagesState
class RuntimeState(MessagesState): """Agent state (just messages for now)"""
pass
memory_llm = ChatOpenAI(model="gpt-4o", temperature=0.3).with_structured_output( Memories
)
def extract_memories( last_processed_message_id: Optional[str], state: RuntimeState, config: RunnableConfig, ) -> Optional[str]: """Extract and store memories in long-term memory""" logger.debug(f"Last message ID is: {last_processed_message_id}")
if len(state["messages"]) < 3: # Need at least a user message and agent response logger.debug("Not enough messages to extract memories") return last_processed_message_id
user_id = config.get("configurable", {}).get("user_id", None) if not user_id: logger.warning("No user ID found in config when extracting memories") return last_processed_message_id
# Get the messages messages = state["messages"]
# Find the newest message ID (or None if no IDs) newest_message_id = None for msg in reversed(messages): if hasattr(msg, "id") and msg.id: newest_message_id = msg.id break
logger.debug(f"Newest message ID is: {newest_message_id}")
# If we've already processed up to this message ID, skip if ( last_processed_message_id
and newest_message_id and last_processed_message_id == newest_message_id ): logger.debug(f"Already processed messages up to ID {newest_message_id}") return last_processed_message_id
# Find the index of the message with last_processed_message_id start_index = 0 if last_processed_message_id: for i, msg in enumerate(messages): if hasattr(msg, "id") and msg.id == last_processed_message_id: start_index = i + 1 # Start processing from the next message break
# Check if there are messages to process if start_index >= len(messages): logger.debug("No new messages to process since last processed message") return newest_message_id
# Get only the messages after the last processed message messages_to_process = messages[start_index:]
# If there are not enough messages to process, include some context if len(messages_to_process) < 3 and start_index > 0: # Include up to 3 messages before the start_index for context context_start = max(0, start_index - 3) messages_to_process = messages[context_start:]
# Format messages for the memory agent message_history = "\n".join( [ f"{'User' if isinstance(msg, HumanMessage) else 'Assistant'}: {msg.content}" for msg in messages_to_process ] )
prompt = f""" You are a long-memory manager. Your job is to analyze this message history and extract information that might be useful in future conversations. Extract two types of memories: 1. EPISODIC: Personal experiences and preferences specific to this user Example: "User prefers window seats" or "User had a bad experience in Paris" 2. SEMANTIC: General facts and knowledge about travel that could be useful Example: "The best time to visit Japan is during cherry blossom season in April" For each memory, provide: - Type: The memory type (EPISODIC/SEMANTIC) - Content: The actual information to store - Metadata: Relevant tags and context (as JSON) IMPORTANT RULES: 1. Only extract information that would be genuinely useful for future interactions. 2. Do not extract procedural knowledge - that is handled by the system's built-in tools and prompts. 3. You are a large language model, not a human - do not extract facts that you already know. Message history: {message_history} Extracted memories: """
memories_to_store: Memories = memory_llm.invoke([HumanMessage(content=prompt)]) # type: ignore
# Store each extracted memory for memory_data in memories_to_store.memories: store_memory( content=memory_data.content, memory_type=memory_data.memory_type, user_id=user_id, metadata=memory_data.metadata, ) # Return data with the newest processed message ID return newest_message_id

We'll use this function in a background thread. We'll start the thread in manual memory mode but not in tool mode, and we'll run it as a worker that pulls message histories from a Queue to process:

import time from queue import Queue
DEFAULT_MEMORY_WORKER_INTERVAL = 5 * 60 # 5 minutes DEFAULT_MEMORY_WORKER_BACKOFF_INTERVAL = 10 * 60 # 10 minutes
def memory_worker( memory_queue: Queue, user_id: str, interval: int = DEFAULT_MEMORY_WORKER_INTERVAL, backoff_interval: int = DEFAULT_MEMORY_WORKER_BACKOFF_INTERVAL, ): """Worker function that processes long-term memory extraction requests""" key = f"memory_worker:{user_id}:last_processed_message_id"
last_processed_message_id = redis_client.get(key) logger.debug(f"Last processed message ID: {last_processed_message_id}") last_processed_message_id = ( str(last_processed_message_id) if last_processed_message_id else None )
while True: try: # Get the next state and config from the queue (blocks until an item is available) state, config = memory_queue.get()
# Extract long-term memories from the conversation history last_processed_message_id = extract_memories( last_processed_message_id, state, config ) logger.debug( f"Memory worker extracted memories. Last processed message ID: {last_processed_message_id}" )
if last_processed_message_id: logger.debug( f"Setting last processed message ID: {last_processed_message_id}" ) redis_client.set(key, last_processed_message_id)
# Mark the task as done memory_queue.task_done() logger.debug("Memory extraction completed for queue item") # Wait before processing next item time.sleep(interval) except Exception as e: # Wait before processing next item after an error logger.exception(f"Error in memory worker thread: {e}") time.sleep(backoff_interval)
# NOTE: We'll actually start the worker thread later, in the main loop.

Augmenting queries with relevant memories#

For every user interaction with the agent, we'll query for relevant memories and add them to the LLM prompt with retrieve_relevant_memories().

NOTE: We only run this node in the "manual" memory management strategy. If using "tools," the LLM will decide when to retrieve memories.

def retrieve_relevant_memories( state: RuntimeState, config: RunnableConfig ) -> RuntimeState: """Retrieve relevant memories based on the current conversation.""" if not state["messages"]: logger.debug("No messages in state") return state
latest_message = state["messages"][-1] if not isinstance(latest_message, HumanMessage): logger.debug("Latest message is not a HumanMessage: ", latest_message) return state
user_id = config.get("configurable", {}).get("user_id", SYSTEM_USER_ID)
query = str(latest_message.content) relevant_memories = retrieve_memories( query=query, memory_type=[MemoryType.EPISODIC, MemoryType.SEMANTIC], limit=5, user_id=user_id, distance_threshold=0.3, )
logger.debug(f"All relevant memories: {relevant_memories}")
# We'll augment the latest human message with the relevant memories. if relevant_memories: memory_context = "\n\n### Relevant memories from previous conversations:\n"
# Group by memory type memory_types = { MemoryType.EPISODIC: "User Preferences & History", MemoryType.SEMANTIC: "Travel Knowledge", }
for mem_type, type_label in memory_types.items(): memories_of_type = [ m for m in relevant_memories if m.memory_type == mem_type ] if memories_of_type: memory_context += f"\n**{type_label}**:\n" for mem in memories_of_type: memory_context += f"- {mem.content}\n"
augmented_message = HumanMessage(content=f"{query}\n{memory_context}") state["messages"][-1] = augmented_message
logger.debug(f"Augmented message: {augmented_message.content}")
return state.copy()

This is the first function we've seen that represents a node in the LangGraph graph we'll build. As a node representation, this function receives a state object containing the runtime state of the graph, which is where conversation history resides. Its config parameter contains data like the user and thread IDs.

This will be the starting node in the graph we'll assemble later. When a user invokes the graph with a message, the first thing we'll do (when using the "manual" memory strategy) is augment that message with potentially related memories.

Defining tools#

Now that we have our storage functions defined, we can create tools. We'll need these to set up our agent in a moment. These tools will only be used when the agent is operating in "tools" memory management mode.

from langchain_core.tools import tool from typing import Dict, Optional

@tool def store_memory_tool( content: str, memory_type: MemoryType, metadata: Optional[Dict[str, str]] = None, config: Optional[RunnableConfig] = None, ) -> str: """ Store a long-term memory in the system. Use this tool to save important information about user preferences, experiences, or general knowledge that might be useful in future interactions. """ config = config or RunnableConfig() user_id = config.get("user_id", SYSTEM_USER_ID) thread_id = config.get("thread_id")
try: # Store in long-term memory store_memory( content=content, memory_type=memory_type, user_id=user_id, thread_id=thread_id, metadata=str(metadata) if metadata else None, )
return f"Successfully stored {memory_type} memory: {content}" except Exception as e: return f"Error storing memory: {str(e)}"

@tool def retrieve_memories_tool( query: str, memory_type: List[MemoryType], limit: int = 5, config: Optional[RunnableConfig] = None, ) -> str: """ Retrieve long-term memories relevant to the query. Use this tool to access previously stored information about user preferences, experiences, or general knowledge. """ config = config or RunnableConfig() user_id = config.get("user_id", SYSTEM_USER_ID)
try: # Get long-term memories stored_memories = retrieve_memories( query=query, memory_type=memory_type, user_id=user_id, limit=limit, distance_threshold=0.3, )
# Format the response response = []
if stored_memories: response.append("Long-term memories:") for memory in stored_memories: response.append(f"- [{memory.memory_type}] {memory.content}")
return "\n".join(response) if response else "No relevant memories found."
except Exception as e: return f"Error retrieving memories: {str(e)}"

Creating the agent#

Because we're using different LLM objects configured for different purposes and a prebuilt ReAct agent, we need a node that invokes the agent and returns the response. But before we can invoke the agent, we need to set it up. This will involve defining the tools the agent will need.

import json from typing import Dict, List, Optional, Tuple, Union
from langchain_community.tools.tavily_search import TavilySearchResults from langchain_core.callbacks.manager import CallbackManagerForToolRun from langchain_core.messages import AIMessage, AIMessageChunk, SystemMessage from langgraph.prebuilt.chat_agent_executor import create_react_agent from langgraph.checkpoint.redis import RedisSaver

class CachingTavilySearchResults(TavilySearchResults): """ An interface to Tavily search that caches results in Redis. Caching the results of the web search allows us to avoid rate limiting, improve latency, and reduce costs. """
def _run( self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None, ) -> Tuple[Union[List[Dict[str, str]], str], Dict]: """Use the tool.""" cache_key = f"tavily_search:{query}" cached_result: Optional[str] = redis_client.get(cache_key) # type: ignore if cached_result: return json.loads(cached_result), {} else: result, raw_results = super()._run(query, run_manager) redis_client.set(cache_key, json.dumps(result), ex=60 * 60) return result, raw_results

# Create a checkpoint saver for short-term memory. This keeps track of the # conversation history for each thread. Later, we'll continually summarize the # conversation history to keep the context window manageable, while we also # extract long-term memories from the conversation history to store in the # long-term memory index. redis_saver = RedisSaver(redis_client=redis_client) redis_saver.setup()
# Configure an LLM for the agent with a more creative temperature. llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Uncomment these lines if you have a Tavily API key and want to use the web # search tool. The agent is much more useful with this tool. # web_search_tool = CachingTavilySearchResults(max_results=2) # base_tools = [web_search_tool] base_tools = []
if memory_strategy == MemoryStrategy.TOOLS: tools = base_tools + [store_memory_tool, retrieve_memories_tool] elif memory_strategy == MemoryStrategy.MANUAL: tools = base_tools

travel_agent = create_react_agent( model=llm, tools=tools, checkpointer=redis_saver, # Short-term memory: the conversation history prompt=SystemMessage( content=""" You are a travel assistant helping users plan their trips. You remember user preferences and provide personalized recommendations based on past interactions. You have access to the following types of memory: 1. Short-term memory: The current conversation thread 2. Long-term memory: - Episodic: User preferences and past trip experiences (e.g., "User prefers window seats") - Semantic: General knowledge about travel destinations and requirements Your procedural knowledge (how to search, book flights, etc.) is built into your tools and prompts. Always be helpful, personal, and context-aware in your responses. """ ), )

Responding to the user#

Now we can write our node that invokes the agent and responds to the user:

def respond_to_user(state: RuntimeState, config: RunnableConfig) -> RuntimeState: """Invoke the travel agent to generate a response.""" human_messages = [m for m in state["messages"] if isinstance(m, HumanMessage)] if not human_messages: logger.warning("No HumanMessage found in state") return state
try: for result in travel_agent.stream( {"messages": state["messages"]}, config=config, stream_mode="messages" ): result_messages = result.get("messages", [])
ai_messages = [ m
for m in result_messages if isinstance(m, AIMessage) or isinstance(m, AIMessageChunk) ] if ai_messages: agent_response = ai_messages[-1] # Append only the agent's response to the original state state["messages"].append(agent_response)
except Exception as e: logger.error(f"Error invoking travel agent: {e}") agent_response = AIMessage( content="I'm sorry, I encountered an error processing your request." ) return state

Summarizing conversation history#

We've been focusing on long-term memory, but let's bounce back to short-term memory for a moment. With RedisSaver, LangGraph will manage our message history automatically. Still, the message history will continue to grow indefinitely, until it overwhelms the LLM's token context window.

To solve this problem, we'll add a node to the graph that summarizes the conversation if it's grown past a threshold.

from langchain_core.messages import RemoveMessage
# An LLM configured for summarization. summarizer = ChatOpenAI(model="gpt-4o", temperature=0.3)
# The number of messages after which we'll summarize the conversation. MESSAGE_SUMMARIZATION_THRESHOLD = 10
def summarize_conversation( state: RuntimeState, config: RunnableConfig ) -> Optional[RuntimeState]: """ Summarize a list of messages into a concise summary to reduce context length while preserving important information. """ messages = state["messages"] current_message_count = len(messages) if current_message_count < MESSAGE_SUMMARIZATION_THRESHOLD: logger.debug(f"Not summarizing conversation: {current_message_count}") return state
system_prompt = """ You are a conversation summarizer. Create a concise summary of the previous conversation between a user and a travel assistant. The summary should: 1. Highlight key topics, preferences, and decisions 2. Include any specific trip details (destinations, dates, preferences) 3. Note any outstanding questions or topics that need follow-up 4. Be concise but informative Format your summary as a brief narrative paragraph. """
message_content = "\n".join( [ f"{'User' if isinstance(msg, HumanMessage) else 'Assistant'}: {msg.content}" for msg in messages ] )
# Invoke the summarizer summary_messages = [ SystemMessage(content=system_prompt), HumanMessage( content=f"Please summarize this conversation:\n\n{message_content}" ), ]
summary_response = summarizer.invoke(summary_messages)
logger.info(f"Summarized {len(messages)} messages into a conversation summary")
summary_message = SystemMessage( content=f""" Summary of the conversation so far: {summary_response.content} Please continue the conversation based on this summary and the recent messages. """ ) remove_messages = [ RemoveMessage(id=msg.id) for msg in messages if msg.id is not None ]
state["messages"] = [ # type: ignore *remove_messages, summary_message, state["messages"][-1], ]
return state.copy()

Assembling the graph#

It's time to assemble our graph.

from langgraph.graph import StateGraph, END, START
workflow = StateGraph(RuntimeState)
workflow.add_node("respond", respond_to_user) workflow.add_node("summarize_conversation", summarize_conversation)
if memory_strategy == MemoryStrategy.MANUAL: # In manual memory mode, we'll retrieve relevant memories before # responding to the user, and then augment the user's message with the # relevant memories. workflow.add_node("retrieve_memories", retrieve_relevant_memories) workflow.add_edge(START, "retrieve_memories") workflow.add_edge("retrieve_memories", "respond") else: # In tool-calling mode, we'll respond to the user and let the LLM # decide when to retrieve and store memories, using tool calls. workflow.add_edge(START, "respond")
# Regardless of memory strategy, we'll summarize the conversation after # responding to the user, to keep the context window manageable. workflow.add_edge("respond", "summarize_conversation") workflow.add_edge("summarize_conversation", END)
# Finally, compile the graph. graph = workflow.compile(checkpointer=redis_saver)

Consolidating memories in a background thread#

We're almost ready to create the main loop that runs our graph. First, though, let's create a worker that consolidates similar memories on a regular schedule, using semantic search. We'll run the worker in a background thread later, in the main loop.

from redisvl.query import FilterQuery
def consolidate_memories(user_id: str, batch_size: int = 10): """ Periodically merge similar long-term memories for a user. """ logger.info(f"Starting memory consolidation for user {user_id}")
# For each memory type, consolidate separately
for memory_type in MemoryType: all_memories = []
# Get all memories of this type for the user of_type_for_user = (Tag("user_id") == user_id) & ( Tag("memory_type") == memory_type ) filter_query = FilterQuery(filter_expression=of_type_for_user)
for batch in long_term_memory_index.paginate(filter_query, page_size=batch_size): all_memories.extend(batch)
all_memories = long_term_memory_index.query(filter_query) if not all_memories: continue
# Group similar memories processed_ids = set() for memory in all_memories: if memory["id"] in processed_ids: continue
memory_embedding = memory["embedding"] vector_query = VectorRangeQuery( vector=memory_embedding, num_results=10, vector_field_name="embedding", filter_expression=of_type_for_user & (Tag("memory_id") != memory["memory_id"]), distance_threshold=0.1, return_fields=[ "content", "metadata", ], ) similar_memories = long_term_memory_index.query(vector_query)
# If we found similar memories, consolidate them if similar_memories: combined_content = memory["content"] combined_metadata = memory["metadata"]
if combined_metadata: try: combined_metadata = json.loads(combined_metadata) except Exception as e: logger.error(f"Error parsing metadata: {e}") combined_metadata = {}
for similar in similar_memories: # Merge the content of similar memories combined_content += f" {similar['content']}"
if similar["metadata"]: try: similar_metadata = json.loads(similar["metadata"]) except Exception as e: logger.error(f"Error parsing metadata: {e}") similar_metadata = {}
combined_metadata = {**combined_metadata, **similar_metadata}
# Create a consolidated memory new_metadata = { "consolidated": True, "source_count": len(similar_memories) + 1, **combined_metadata, } consolidated_memory = { "content": summarize_memories(combined_content, memory_type), "memory_type": memory_type.value, "metadata": json.dumps(new_metadata), "user_id": user_id, }
# Delete the old memories delete_memory(memory["id"]) for similar in similar_memories: delete_memory(similar["id"])
# Store the new consolidated memory store_memory( content=consolidated_memory["content"], memory_type=memory_type, user_id=user_id, metadata=consolidated_memory["metadata"], )
logger.info( f"Consolidated {len(similar_memories) + 1} memories into one" )
def delete_memory(memory_id: str): """Delete a memory from Redis""" try: result = long_term_memory_index.drop_keys([memory_id]) except Exception as e: logger.error(f"Deleting memory {memory_id} failed: {e}") if result == 0: logger.debug(f"Deleting memory {memory_id} failed: memory not found") else: logger.info(f"Deleted memory {memory_id}")
def summarize_memories(combined_content: str, memory_type: MemoryType) -> str: """Use the LLM to create a concise summary of similar memories""" try: system_prompt = f""" You are a memory consolidation assistant. Your task is to create a single, concise memory from these similar memory fragments. The new memory should be a {memory_type.value} memory. Combine the information without repetition while preserving all important details. """
messages = [ SystemMessage(content=system_prompt), HumanMessage( content=f"Consolidate these similar memories into one:\n\n{combined_content}" ), ]
response = summarizer.invoke(messages) return str(response.content) except Exception as e: logger.error(f"Error summarizing memories: {e}") # Fall back to just using the combined content return combined_content

def memory_consolidation_worker(user_id: str): """ Worker that periodically consolidates memories for the active user. NOTE: In production, this would probably use a background task framework, such as rq or Celery, and run on a schedule. """ while True: try: consolidate_memories(user_id) # Run every 10 minutes time.sleep(10 * 60) except Exception as e: logger.exception(f"Error in memory consolidation worker: {e}") # If there's an error, wait an hour and try again time.sleep(60 * 60)

The main loop#

Now we can put everything together and run the main loop.

Running this cell should ask for your OpenAI and Tavily keys, then a username and thread ID. You'll enter a loop in which you can enter queries and see responses from the agent printed below the following cell.

import threading

def main(thread_id: str = "book_flight", user_id: str = "demo_user"): """Main interaction loop for the travel agent""" print("Welcome to the Travel Assistant! (Type 'exit' to quit)")
config = RunnableConfig(configurable={"thread_id": thread_id, "user_id": user_id}) state = RuntimeState(messages=[])
# If we're using the manual memory strategy, we need to create a queue for # memory processing and start a worker thread. After every 'round' of a # conversation, the main loop will add the current state and config to the # queue for memory processing. if memory_strategy == MemoryStrategy.MANUAL: # Create a queue for memory processing memory_queue = Queue()
# Start a worker thread that will process memory extraction tasks memory_thread = threading.Thread( target=memory_worker, args=(memory_queue, user_id), daemon=True ) memory_thread.start()
# We always run consolidation in the background, regardless of memory strategy. consolidation_thread = threading.Thread( target=memory_consolidation_worker, args=(user_id,), daemon=True ) consolidation_thread.start()
while True: user_input = input("\nYou (type 'quit' to quit): ")
if not user_input: continue
if user_input.lower() in ["exit", "quit"]: print("Thank you for using the Travel Assistant. Goodbye!") break
state["messages"].append(HumanMessage(content=user_input))
try: # Process user input through the graph for result in graph.stream(state, config=config, stream_mode="values"): state = RuntimeState(**result)
logger.debug(f"# of messages after run: {len(state['messages'])}")
# Find the most recent AI message, so we can print the response ai_messages = [m for m in state["messages"] if isinstance(m, AIMessage)] if ai_messages: message = ai_messages[-1].content else: logger.error("No AI messages after run") message = "I'm sorry, I couldn't process your request properly." # Add the error message to the state state["messages"].append(AIMessage(content=message))
print(f"\nAssistant: {message}")
# Add the current state to the memory processing queue if memory_strategy == MemoryStrategy.MANUAL: memory_queue.put((state.copy(), config))
except Exception as e: logger.exception(f"Error processing request: {e}") error_message = "I'm sorry, I encountered an error processing your request." print(f"\nAssistant: {error_message}") # Add the error message to the state state["messages"].append(AIMessage(content=error_message))
try: user_id = input("Enter a user ID: ") or "demo_user" thread_id = input("Enter a thread ID: ") or "demo_thread" except Exception: # If we're running in CI, we don't have a terminal to input from, so just exit exit() else: main(thread_id, user_id)

That's a wrap. Let’s start building#

Want to make your own agent? Try the LangGraph Quickstart. Then add our Redis checkpointer to give your agent fast, persistent memory.

Using Redis to manage memory for your AI Agent lets you build a flexible and scalable system that can store and retrieve memories fast. Check out the resources below to start building with Redis today, or connect with our team to chat about AI Agents.

  • Redis AI resources: GitHub repo with code samples and notebooks to help you build AI apps.
  • Redis AI docs : Quickstarts and tutorials to get you up and running fast.

Redis Cloud: The easiest way to deploy Redis—try it free on AWS, Azure, or GCP.