Source code for memories.core.hot

"""
Hot memory implementation using DuckDB for in-memory storage.
"""

import logging
from typing import Dict, Any, Optional, List, Union
import duckdb
import json
from datetime import datetime
import uuid
import numpy as np

logger = logging.getLogger(__name__)

[docs]class HotMemory: """Hot memory layer using DuckDB for fast in-memory storage."""
[docs] def __init__(self): """Initialize hot memory with in-memory DuckDB.""" self.logger = logging.getLogger(__name__) # Lazy import to avoid circular dependency from memories.core.memory_manager import MemoryManager self.memory_manager = MemoryManager() # Initialize in-memory DuckDB connection self.con = self._init_duckdb() self._init_tables(self.con) self.logger.info("Initialized hot memory with in-memory DuckDB")
def _init_duckdb(self) -> duckdb.DuckDBPyConnection: """Initialize in-memory DuckDB connection. Returns: DuckDB connection """ try: # Get DuckDB configuration from memory manager if available hot_config = self.memory_manager.config.get('memory', {}).get('hot', {}) duckdb_config = hot_config.get('duckdb', {}) # Set memory limit and threads memory_limit = duckdb_config.get('memory_limit', '2GB') threads = duckdb_config.get('threads', 2) # Create in-memory connection con = duckdb.connect(database=':memory:', read_only=False) # Set configuration con.execute(f"SET memory_limit='{memory_limit}'") con.execute(f"SET threads={threads}") return con except Exception as e: self.logger.error(f"Error initializing DuckDB for hot storage: {e}") raise def _init_tables(self, con: duckdb.DuckDBPyConnection) -> None: """Initialize database tables. Args: con: DuckDB connection to initialize tables in """ try: # Create tables if they don't exist con.execute(""" CREATE TABLE IF NOT EXISTS hot_data ( id VARCHAR PRIMARY KEY, data JSON, metadata JSON, tags JSON, stored_at TIMESTAMP ) """) con.execute(""" CREATE TABLE IF NOT EXISTS hot_tags ( tag VARCHAR, data_id VARCHAR, PRIMARY KEY (tag, data_id), FOREIGN KEY (data_id) REFERENCES hot_data(id) ) """) self.logger.info("Initialized hot memory tables") except Exception as e: self.logger.error(f"Error initializing tables for hot storage: {e}") raise
[docs] async def store( self, data: Any, metadata: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None ) -> bool: """Store data in hot memory with metadata and tags. Args: data: Data to store metadata: Optional metadata about the data tags: Optional tags for categorizing the data Returns: bool: True if storage was successful, False otherwise """ try: # Generate unique ID data_id = str(uuid.uuid4()) # Prepare data for storage metadata = metadata or {} tags_list = tags or [] # Convert data to JSON if needed if isinstance(data, (dict, list)): data_json = json.dumps(data) elif isinstance(data, np.ndarray): data_json = json.dumps(data.tolist()) else: data_json = json.dumps(str(data)) # Store in hot_data table self.con.execute( """ INSERT INTO hot_data (id, data, metadata, tags, stored_at) VALUES (?, ?, ?, ?, ?) """, [ data_id, data_json, json.dumps(metadata), json.dumps(tags_list), datetime.now() ] ) # Store tags in hot_tags table if tags_list: for tag in tags_list: self.con.execute( """ INSERT INTO hot_tags (tag, data_id) VALUES (?, ?) """, [tag, data_id] ) return True except Exception as e: self.logger.error(f"Failed to store in hot memory: {e}") return False
[docs] async def retrieve( self, query: Dict[str, Any], tags: Optional[List[str]] = None ) -> Optional[Dict[str, Any]]: """Retrieve data from hot memory. Args: query: Query parameters (can contain 'id' to retrieve specific data) tags: Optional tags to filter by Returns: Retrieved data or None if not found """ try: if tags: # Get data by tags tag_placeholders = ', '.join(['?'] * len(tags)) result = self.con.execute( f""" SELECT hd.id, hd.data, hd.metadata, hd.tags, hd.stored_at FROM hot_data hd JOIN hot_tags ht ON hd.id = ht.data_id WHERE ht.tag IN ({tag_placeholders}) GROUP BY hd.id, hd.data, hd.metadata, hd.tags, hd.stored_at """, tags ).fetchall() if not result: return None # Convert to list of dictionaries results = [] for row in result: results.append({ 'id': row[0], 'data': json.loads(row[1]), 'metadata': json.loads(row[2]), 'tags': json.loads(row[3]), 'stored_at': row[4].isoformat() if row[4] else None }) return results elif 'id' in query: # Get data by ID result = self.con.execute( """ SELECT id, data, metadata, tags, stored_at FROM hot_data WHERE id = ? """, [query['id']] ).fetchone() if not result: return None return { 'id': result[0], 'data': json.loads(result[1]), 'metadata': json.loads(result[2]), 'tags': json.loads(result[3]), 'stored_at': result[4].isoformat() if result[4] else None } else: # Get most recent data result = self.con.execute( """ SELECT id, data, metadata, tags, stored_at FROM hot_data ORDER BY stored_at DESC LIMIT 1 """ ).fetchone() if not result: return None return { 'id': result[0], 'data': json.loads(result[1]), 'metadata': json.loads(result[2]), 'tags': json.loads(result[3]), 'stored_at': result[4].isoformat() if result[4] else None } except Exception as e: self.logger.error(f"Failed to retrieve from hot memory: {e}") return None
[docs] def clear(self) -> None: """Clear hot memory.""" try: self.con.execute("DELETE FROM hot_tags") self.con.execute("DELETE FROM hot_data") self.logger.info("Cleared hot memory") except Exception as e: self.logger.error(f"Failed to clear hot memory: {e}")
[docs] def cleanup(self) -> None: """Clean up resources.""" try: if hasattr(self, 'con') and self.con: self.con.close() self.con = None self.logger.info("Cleaned up hot memory resources") except Exception as e: self.logger.error(f"Failed to cleanup hot memory: {e}")
[docs] def __del__(self): """Destructor to ensure cleanup is performed.""" self.cleanup()
[docs] async def get_schema(self, data_id: str) -> Optional[Dict[str, Any]]: """Get schema information for stored data. Args: data_id: ID of the data to get schema for Returns: Dictionary containing schema information or None if not found """ try: # Get data from DuckDB result = self.con.execute( """ SELECT data FROM hot_data WHERE id = ? """, [data_id] ).fetchone() if not result: return None # Parse stored data data_value = json.loads(result[0]) if isinstance(data_value, dict): schema = { 'fields': list(data_value.keys()), 'types': {k: type(v).__name__ for k, v in data_value.items()}, 'type': 'dict', 'source': 'hot_memory' } elif isinstance(data_value, list): if data_value: if all(isinstance(x, dict) for x in data_value): # List of dictionaries - combine all keys all_keys = set().union(*(d.keys() for d in data_value if isinstance(d, dict))) schema = { 'fields': list(all_keys), 'types': {k: type(next((d[k] for d in data_value if isinstance(d, dict) and k in d), None)).__name__ for k in all_keys}, 'type': 'list_of_dicts', 'source': 'hot_memory' } else: schema = { 'type': 'list', 'element_type': type(data_value[0]).__name__, 'length': len(data_value), 'source': 'hot_memory' } else: schema = { 'type': 'list', 'length': 0, 'source': 'hot_memory' } else: schema = { 'type': type(data_value).__name__, 'source': 'hot_memory' } return schema except Exception as e: self.logger.error(f"Failed to get schema for {data_id}: {e}") return None