"""
Hot memory implementation using DuckDB for in-memory storage.
"""
import logging
from typing import Dict, Any, Optional, List, Union
import duckdb
import json
from datetime import datetime
import uuid
import numpy as np
logger = logging.getLogger(__name__)
[docs]class HotMemory:
"""Hot memory layer using DuckDB for fast in-memory storage."""
[docs] def __init__(self):
"""Initialize hot memory with in-memory DuckDB."""
self.logger = logging.getLogger(__name__)
# Lazy import to avoid circular dependency
from memories.core.memory_manager import MemoryManager
self.memory_manager = MemoryManager()
# Initialize in-memory DuckDB connection
self.con = self._init_duckdb()
self._init_tables(self.con)
self.logger.info("Initialized hot memory with in-memory DuckDB")
def _init_duckdb(self) -> duckdb.DuckDBPyConnection:
"""Initialize in-memory DuckDB connection.
Returns:
DuckDB connection
"""
try:
# Get DuckDB configuration from memory manager if available
hot_config = self.memory_manager.config.get('memory', {}).get('hot', {})
duckdb_config = hot_config.get('duckdb', {})
# Set memory limit and threads
memory_limit = duckdb_config.get('memory_limit', '2GB')
threads = duckdb_config.get('threads', 2)
# Create in-memory connection
con = duckdb.connect(database=':memory:', read_only=False)
# Set configuration
con.execute(f"SET memory_limit='{memory_limit}'")
con.execute(f"SET threads={threads}")
return con
except Exception as e:
self.logger.error(f"Error initializing DuckDB for hot storage: {e}")
raise
def _init_tables(self, con: duckdb.DuckDBPyConnection) -> None:
"""Initialize database tables.
Args:
con: DuckDB connection to initialize tables in
"""
try:
# Create tables if they don't exist
con.execute("""
CREATE TABLE IF NOT EXISTS hot_data (
id VARCHAR PRIMARY KEY,
data JSON,
metadata JSON,
tags JSON,
stored_at TIMESTAMP
)
""")
con.execute("""
CREATE TABLE IF NOT EXISTS hot_tags (
tag VARCHAR,
data_id VARCHAR,
PRIMARY KEY (tag, data_id),
FOREIGN KEY (data_id) REFERENCES hot_data(id)
)
""")
self.logger.info("Initialized hot memory tables")
except Exception as e:
self.logger.error(f"Error initializing tables for hot storage: {e}")
raise
[docs] async def store(
self,
data: Any,
metadata: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None
) -> bool:
"""Store data in hot memory with metadata and tags.
Args:
data: Data to store
metadata: Optional metadata about the data
tags: Optional tags for categorizing the data
Returns:
bool: True if storage was successful, False otherwise
"""
try:
# Generate unique ID
data_id = str(uuid.uuid4())
# Prepare data for storage
metadata = metadata or {}
tags_list = tags or []
# Convert data to JSON if needed
if isinstance(data, (dict, list)):
data_json = json.dumps(data)
elif isinstance(data, np.ndarray):
data_json = json.dumps(data.tolist())
else:
data_json = json.dumps(str(data))
# Store in hot_data table
self.con.execute(
"""
INSERT INTO hot_data (id, data, metadata, tags, stored_at)
VALUES (?, ?, ?, ?, ?)
""",
[
data_id,
data_json,
json.dumps(metadata),
json.dumps(tags_list),
datetime.now()
]
)
# Store tags in hot_tags table
if tags_list:
for tag in tags_list:
self.con.execute(
"""
INSERT INTO hot_tags (tag, data_id)
VALUES (?, ?)
""",
[tag, data_id]
)
return True
except Exception as e:
self.logger.error(f"Failed to store in hot memory: {e}")
return False
[docs] async def retrieve(
self,
query: Dict[str, Any],
tags: Optional[List[str]] = None
) -> Optional[Dict[str, Any]]:
"""Retrieve data from hot memory.
Args:
query: Query parameters (can contain 'id' to retrieve specific data)
tags: Optional tags to filter by
Returns:
Retrieved data or None if not found
"""
try:
if tags:
# Get data by tags
tag_placeholders = ', '.join(['?'] * len(tags))
result = self.con.execute(
f"""
SELECT hd.id, hd.data, hd.metadata, hd.tags, hd.stored_at
FROM hot_data hd
JOIN hot_tags ht ON hd.id = ht.data_id
WHERE ht.tag IN ({tag_placeholders})
GROUP BY hd.id, hd.data, hd.metadata, hd.tags, hd.stored_at
""",
tags
).fetchall()
if not result:
return None
# Convert to list of dictionaries
results = []
for row in result:
results.append({
'id': row[0],
'data': json.loads(row[1]),
'metadata': json.loads(row[2]),
'tags': json.loads(row[3]),
'stored_at': row[4].isoformat() if row[4] else None
})
return results
elif 'id' in query:
# Get data by ID
result = self.con.execute(
"""
SELECT id, data, metadata, tags, stored_at
FROM hot_data
WHERE id = ?
""",
[query['id']]
).fetchone()
if not result:
return None
return {
'id': result[0],
'data': json.loads(result[1]),
'metadata': json.loads(result[2]),
'tags': json.loads(result[3]),
'stored_at': result[4].isoformat() if result[4] else None
}
else:
# Get most recent data
result = self.con.execute(
"""
SELECT id, data, metadata, tags, stored_at
FROM hot_data
ORDER BY stored_at DESC
LIMIT 1
"""
).fetchone()
if not result:
return None
return {
'id': result[0],
'data': json.loads(result[1]),
'metadata': json.loads(result[2]),
'tags': json.loads(result[3]),
'stored_at': result[4].isoformat() if result[4] else None
}
except Exception as e:
self.logger.error(f"Failed to retrieve from hot memory: {e}")
return None
[docs] def clear(self) -> None:
"""Clear hot memory."""
try:
self.con.execute("DELETE FROM hot_tags")
self.con.execute("DELETE FROM hot_data")
self.logger.info("Cleared hot memory")
except Exception as e:
self.logger.error(f"Failed to clear hot memory: {e}")
[docs] def cleanup(self) -> None:
"""Clean up resources."""
try:
if hasattr(self, 'con') and self.con:
self.con.close()
self.con = None
self.logger.info("Cleaned up hot memory resources")
except Exception as e:
self.logger.error(f"Failed to cleanup hot memory: {e}")
[docs] def __del__(self):
"""Destructor to ensure cleanup is performed."""
self.cleanup()
[docs] async def get_schema(self, data_id: str) -> Optional[Dict[str, Any]]:
"""Get schema information for stored data.
Args:
data_id: ID of the data to get schema for
Returns:
Dictionary containing schema information or None if not found
"""
try:
# Get data from DuckDB
result = self.con.execute(
"""
SELECT data
FROM hot_data
WHERE id = ?
""",
[data_id]
).fetchone()
if not result:
return None
# Parse stored data
data_value = json.loads(result[0])
if isinstance(data_value, dict):
schema = {
'fields': list(data_value.keys()),
'types': {k: type(v).__name__ for k, v in data_value.items()},
'type': 'dict',
'source': 'hot_memory'
}
elif isinstance(data_value, list):
if data_value:
if all(isinstance(x, dict) for x in data_value):
# List of dictionaries - combine all keys
all_keys = set().union(*(d.keys() for d in data_value if isinstance(d, dict)))
schema = {
'fields': list(all_keys),
'types': {k: type(next((d[k] for d in data_value if isinstance(d, dict) and k in d), None)).__name__
for k in all_keys},
'type': 'list_of_dicts',
'source': 'hot_memory'
}
else:
schema = {
'type': 'list',
'element_type': type(data_value[0]).__name__,
'length': len(data_value),
'source': 'hot_memory'
}
else:
schema = {
'type': 'list',
'length': 0,
'source': 'hot_memory'
}
else:
schema = {
'type': type(data_value).__name__,
'source': 'hot_memory'
}
return schema
except Exception as e:
self.logger.error(f"Failed to get schema for {data_id}: {e}")
return None