Source code for memories.core.warm

"""
Warm memory implementation using DuckDB for intermediate data storage.
"""

import logging
from typing import Dict, Any, Optional, List, Union
from pathlib import Path
import json
from datetime import datetime
import duckdb
import uuid
import numpy as np
import os

# Remove direct import to avoid circular dependency
# from memories.core.memory_manager import MemoryManager

logger = logging.getLogger(__name__)

[docs]class WarmMemory: """Warm memory layer using DuckDB for storage."""
[docs] def __init__(self, storage_path: str = None): """Initialize warm memory. Args: storage_path: Optional path to store DuckDB files """ self.logger = logging.getLogger(__name__) # Lazy import to avoid circular dependency from memories.core.memory_manager import MemoryManager self.memory_manager = MemoryManager() # Set up storage path if storage_path: self.storage_path = Path(storage_path) else: # Use default path from memory manager self.storage_path = Path(self.memory_manager.get_warm_path()) self.storage_path.mkdir(parents=True, exist_ok=True) self.logger.info(f"Warm memory storage path: {self.storage_path}") # Initialize DuckDB connection self.con = self._init_duckdb() self._init_tables(self.con)
def _init_duckdb(self, db_file: Optional[str] = None) -> duckdb.DuckDBPyConnection: """Initialize DuckDB connection. Args: db_file: Optional path to a DuckDB file. If None, creates an in-memory database. Returns: DuckDB connection """ try: # Get DuckDB configuration from memory manager warm_config = self.memory_manager.config['memory'].get('warm', {}) duckdb_config = warm_config.get('duckdb', {}) # Set memory limit and threads memory_limit = duckdb_config.get('memory_limit', '8GB') threads = duckdb_config.get('threads', 4) # Create connection if db_file: con = duckdb.connect(database=db_file, read_only=False) else: con = duckdb.connect(database=':memory:', read_only=False) # Set configuration con.execute(f"SET memory_limit='{memory_limit}'") con.execute(f"SET threads={threads}") return con except Exception as e: self.logger.error(f"Error initializing DuckDB for warm storage: {e}") raise def _init_tables(self, con: duckdb.DuckDBPyConnection) -> None: """Initialize database tables. Args: con: DuckDB connection to initialize tables in """ try: # Create tables if they don't exist con.execute(""" CREATE TABLE IF NOT EXISTS warm_data ( id VARCHAR PRIMARY KEY, data JSON, metadata JSON, tags JSON, stored_at TIMESTAMP ) """) con.execute(""" CREATE TABLE IF NOT EXISTS warm_tags ( tag VARCHAR, data_id VARCHAR, PRIMARY KEY (tag, data_id), FOREIGN KEY (data_id) REFERENCES warm_data(id) ) """) self.logger.info("Initialized warm memory tables") except Exception as e: self.logger.error(f"Error initializing tables for warm storage: {e}") raise
[docs] async def store( self, data: Any, metadata: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, db_name: Optional[str] = None, table_name: Optional[str] = None ) -> Dict[str, Any]: """Store data in warm memory with metadata and tags. Args: data: Data to store metadata: Optional metadata about the data tags: Optional tags for categorizing the data db_name: Optional name of the database file to store in (without .duckdb extension) table_name: Optional name for the table to create. If None, a name will be generated. Returns: Dict containing success status and table information: - success: True if storage was successful, False otherwise - data_id: The unique ID of the stored data - table_name: The name of the table where data is stored """ try: # Get connection con = self.get_connection(db_name) # Generate unique ID data_id = str(uuid.uuid4()) # Generate table name if not provided if not table_name: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') table_name = f"warm_data_{timestamp}_{data_id[:8]}" # Sanitize table name (remove special characters) table_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in table_name) # Convert data to JSON if isinstance(data, (np.ndarray, np.generic)): data_json = json.dumps(data.tolist()) elif hasattr(data, 'to_dict'): # Handle pandas DataFrame or Series data_json = json.dumps(data.to_dict()) else: data_json = json.dumps(data) # Convert metadata to JSON metadata_json = json.dumps(metadata or {}) # Convert tags to JSON tags_json = json.dumps(tags or []) # Create a new table for this data entry con.execute(f""" CREATE TABLE IF NOT EXISTS {table_name} ( id VARCHAR PRIMARY KEY, data JSON, metadata JSON, tags JSON, stored_at TIMESTAMP ) """) # Store data in the new table con.execute(f""" INSERT INTO {table_name} (id, data, metadata, tags, stored_at) VALUES (?, ?, ?, ?, ?) """, [data_id, data_json, metadata_json, tags_json, datetime.now()]) # Also store in the main warm_data table for backward compatibility con.execute(""" INSERT INTO warm_data (id, data, metadata, tags, stored_at) VALUES (?, ?, ?, ?, ?) """, [data_id, data_json, metadata_json, tags_json, datetime.now()]) # Store tags for indexing if tags: for tag in tags: con.execute(""" INSERT INTO warm_tags (tag, data_id) VALUES (?, ?) """, [tag, data_id]) return { "success": True, "data_id": data_id, "table_name": table_name } except Exception as e: self.logger.error(f"Error storing in warm storage: {e}") return { "success": False, "data_id": None, "table_name": None }
[docs] async def retrieve( self, query: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, db_name: Optional[str] = None, table_name: Optional[str] = None ) -> Union[Dict[str, Any], List[Dict[str, Any]], None]: """Retrieve data from warm memory. Args: query: Optional query parameters tags: Optional tags to filter by db_name: Optional name of the database file to retrieve from (without .duckdb extension) table_name: Optional name of the specific table to query Returns: Retrieved data or None if not found """ try: # Get connection con = self.get_connection(db_name) results = [] # If table_name is provided, query that specific table if table_name: # Check if table exists table_exists = con.execute(f""" SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}' """).fetchone() if not table_exists: self.logger.warning(f"Table {table_name} does not exist") return None # Query the specific table query_result = con.execute(f""" SELECT * FROM {table_name} ORDER BY stored_at DESC """).fetchall() elif tags: # Get data by tags placeholders = ', '.join(['?'] * len(tags)) query_result = con.execute(f""" SELECT DISTINCT d.* FROM warm_data d JOIN warm_tags t ON d.id = t.data_id WHERE t.tag IN ({placeholders}) ORDER BY d.stored_at DESC """, tags).fetchall() elif query: # Build query conditions conditions = [] params = [] # Handle data query if 'data' in query: for key, value in query['data'].items(): # Use the appropriate JSON extraction function based on value type if isinstance(value, str): conditions.append(f"json_extract_string(data, '$.{key}') = ?") elif isinstance(value, (int, float, bool)): conditions.append(f"json_extract(data, '$.{key}') = ?") else: # For complex types, convert to JSON and compare conditions.append(f"json_extract(data, '$.{key}') = ?") params.append(value) # Handle metadata query if 'metadata' in query: for key, value in query['metadata'].items(): # Use the appropriate JSON extraction function based on value type if isinstance(value, str): conditions.append(f"json_extract_string(metadata, '$.{key}') = ?") elif isinstance(value, (int, float, bool)): conditions.append(f"json_extract(metadata, '$.{key}') = ?") else: # For complex types, convert to JSON and compare conditions.append(f"json_extract(metadata, '$.{key}') = ?") params.append(value) # Build WHERE clause where_clause = " AND ".join(conditions) if conditions else "1=1" # Execute query query_result = con.execute(f""" SELECT * FROM warm_data WHERE {where_clause} ORDER BY stored_at DESC """, params).fetchall() else: # Get all data query_result = con.execute(""" SELECT * FROM warm_data ORDER BY stored_at DESC """).fetchall() # Process results for row in query_result: data_json = row[1] # data column metadata_json = row[2] # metadata column tags_json = row[3] # tags column stored_at = row[4] # stored_at column # Parse JSON data = json.loads(data_json) metadata = json.loads(metadata_json) tags = json.loads(tags_json) # Add to results results.append({ "data": data, "metadata": metadata, "tags": tags, "stored_at": stored_at.isoformat() if stored_at else None }) return results[0] if len(results) == 1 else results if results else None except Exception as e: self.logger.error(f"Error retrieving from warm storage: {e}") return None
[docs] def clear(self) -> None: """Clear all data from warm memory.""" try: # Delete all data from tables self.con.execute("DELETE FROM warm_tags") self.con.execute("DELETE FROM warm_data") self.logger.info("Cleared warm memory") except Exception as e: self.logger.error(f"Failed to clear warm memory: {e}")
[docs] def cleanup(self) -> None: """Clean up resources.""" try: # Don't close the connection if it's from the memory manager if not hasattr(self.memory_manager, 'con') or self.memory_manager.con != self.con: if hasattr(self, 'con') and self.con: self.con.close() self.con = None self.logger.info("Cleaned up warm memory resources") except Exception as e: self.logger.error(f"Failed to cleanup warm memory: {e}")
[docs] def __del__(self): """Destructor to ensure cleanup is performed.""" self.cleanup()
[docs] async def get_schema(self, data_id: str) -> Optional[Dict[str, Any]]: """Get schema information for stored data. Args: data_id: ID of the data to get schema for Returns: Dictionary containing schema information or None if not found """ try: # Get data by ID result = self.con.execute(""" SELECT data, metadata FROM warm_data WHERE id = ? """, [data_id]).fetchone() if not result: return None data_json, metadata_json = result # Parse JSON data_value = json.loads(data_json) metadata = json.loads(metadata_json) # Generate schema if isinstance(data_value, dict): schema = { 'fields': list(data_value.keys()), 'types': {k: type(v).__name__ for k, v in data_value.items()}, 'type': 'dict', 'source': 'duckdb' } elif isinstance(data_value, list): if data_value: if all(isinstance(x, dict) for x in data_value): # List of dictionaries - combine all keys all_keys = set().union(*(d.keys() for d in data_value if isinstance(d, dict))) schema = { 'fields': list(all_keys), 'types': {k: type(next(d[k] for d in data_value if k in d)).__name__ for k in all_keys}, 'type': 'list_of_dicts', 'source': 'duckdb' } else: schema = { 'type': 'list', 'element_type': type(data_value[0]).__name__, 'length': len(data_value), 'source': 'duckdb' } else: schema = { 'type': 'list', 'length': 0, 'source': 'duckdb' } else: schema = { 'type': type(data_value).__name__, 'source': 'duckdb' } # Add metadata if available if metadata: schema['metadata'] = metadata return schema except Exception as e: self.logger.error(f"Failed to get schema for {data_id}: {e}") return None
[docs] def get_connection(self, db_name: Optional[str] = None) -> duckdb.DuckDBPyConnection: """Get a connection to a specific database file. Args: db_name: Name of the database file (without .duckdb extension). If None, returns the default connection. Returns: DuckDB connection """ if not db_name: return self.con # Check if connection already exists if db_name in self.connections: return self.connections[db_name] # Create new connection db_file = str(self.storage_path / f"{db_name}.duckdb") con = self._init_duckdb(db_file) # Initialize tables self._init_tables(con) # Store connection self.connections[db_name] = con return con
[docs] async def import_from_parquet( self, parquet_file: str, metadata: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, db_name: Optional[str] = None, table_name: Optional[str] = None ) -> Dict[str, Any]: """Import data from a parquet file into warm memory. Args: parquet_file: Path to the parquet file metadata: Optional metadata about the data tags: Optional tags for categorizing the data db_name: Optional name of the database file to store in (without .duckdb extension) table_name: Optional name for the table to create. If None, a name will be generated. Returns: Dict containing success status and table information: - success: True if import was successful, False otherwise - data_id: The unique ID of the stored data - table_name: The name of the table where data is stored """ try: # Get connection con = self.get_connection(db_name) # Generate unique ID data_id = str(uuid.uuid4()) # Generate table name if not provided if not table_name: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') file_basename = os.path.basename(parquet_file).split('.')[0] table_name = f"parquet_{file_basename}_{timestamp}_{data_id[:8]}" # Sanitize table name (remove special characters) table_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in table_name) # Create a new table from the parquet file con.execute(f""" CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{parquet_file}') """) # Get the data as JSON for storage in warm_data data_result = con.execute(f"SELECT * FROM {table_name}").fetchall() column_names = [desc[0] for desc in con.description] # Convert to list of dictionaries data_list = [] for row in data_result: data_list.append(dict(zip(column_names, row))) # Convert to JSON data_json = json.dumps(data_list) # Convert metadata to JSON metadata_json = json.dumps(metadata or {}) # Convert tags to JSON tags_json = json.dumps(tags or []) # Store metadata in warm_data for backward compatibility con.execute(""" INSERT INTO warm_data (id, data, metadata, tags, stored_at) VALUES (?, ?, ?, ?, ?) """, [data_id, data_json, metadata_json, tags_json, datetime.now()]) # Store tags for indexing if tags: for tag in tags: con.execute(""" INSERT INTO warm_tags (tag, data_id) VALUES (?, ?) """, [tag, data_id]) self.logger.info(f"Imported parquet file {parquet_file} to table {table_name}") return { "success": True, "data_id": data_id, "table_name": table_name } except Exception as e: self.logger.error(f"Error importing parquet file: {e}") return { "success": False, "data_id": None, "table_name": None }
[docs] async def import_from_duckdb( self, source_db_file: str, tables: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, db_name: Optional[str] = None ) -> Dict[str, Any]: """Import tables from another DuckDB database using direct table copying. Args: source_db_file: Path to the source DuckDB database file tables: Optional list of table names to import. If None, imports all tables. metadata: Optional metadata about the data tags: Optional tags for categorizing the data db_name: Optional name of the database file to store in (without .duckdb extension) Returns: Dict containing success status and table information: - success: True if import was successful, False otherwise - imported_tables: List of imported table names - data_ids: Dict mapping table names to data IDs """ try: # Get connection to target database target_con = self.get_connection(db_name) # Attach the source database with an alias target_con.execute(f"ATTACH '{source_db_file}' AS source_db") # Get list of tables in source database source_tables = target_con.execute(""" SELECT name FROM source_db.sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' """).fetchall() source_table_names = [table[0] for table in source_tables] # Filter tables if specified if tables: tables_to_import = [t for t in tables if t in source_table_names] if not tables_to_import: self.logger.warning(f"None of the specified tables found in source database") # Detach the source database target_con.execute("DETACH source_db") return { "success": False, "imported_tables": [], "data_ids": {} } else: tables_to_import = source_table_names # Import each table imported_tables = [] data_ids = {} for table_name in tables_to_import: try: # Generate unique ID data_id = str(uuid.uuid4()) # Create table in target database directly from source target_con.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM source_db.{table_name}") # Get row count row_count = target_con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0] # Create minimal metadata for the warm_data table meta_dict = metadata.copy() if metadata else {} meta_dict["source_db"] = source_db_file meta_dict["source_table"] = table_name meta_dict["row_count"] = row_count meta_dict["imported_at"] = datetime.now().isoformat() # Convert to JSON - we store empty data since the actual data is in the table data_json = "[]" # Convert metadata to JSON metadata_json = json.dumps(meta_dict) # Convert tags to JSON tags_json = json.dumps(tags or []) # Store metadata in warm_data for backward compatibility target_con.execute(""" INSERT INTO warm_data (id, data, metadata, tags, stored_at) VALUES (?, ?, ?, ?, ?) """, [data_id, data_json, metadata_json, tags_json, datetime.now()]) # Store tags for indexing if tags: for tag in tags: target_con.execute(""" INSERT INTO warm_tags (tag, data_id) VALUES (?, ?) """, [tag, data_id]) # Add special tags for imported tables target_con.execute(""" INSERT INTO warm_tags (tag, data_id) VALUES (?, ?) """, ["duckdb_import", data_id]) target_con.execute(""" INSERT INTO warm_tags (tag, data_id) VALUES (?, ?) """, [f"table:{table_name}", data_id]) imported_tables.append(table_name) data_ids[table_name] = data_id except Exception as e: self.logger.error(f"Error importing table {table_name}: {e}") # Detach the source database target_con.execute("DETACH source_db") if imported_tables: self.logger.info(f"Imported {len(imported_tables)} tables from {source_db_file}") return { "success": True, "imported_tables": imported_tables, "data_ids": data_ids } except Exception as e: self.logger.error(f"Error importing from DuckDB: {e}") # Make sure to detach in case of error try: target_con.execute("DETACH source_db") except: pass return { "success": False, "imported_tables": [], "data_ids": {} }
[docs] async def import_from_csv( self, csv_file: str, metadata: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, db_name: Optional[str] = None, table_name: Optional[str] = None ) -> Dict[str, Any]: """Import data from a CSV file into warm memory. Args: csv_file: Path to the CSV file metadata: Optional metadata about the data tags: Optional tags for categorizing the data db_name: Optional name of the database file to store in (without .duckdb extension) table_name: Optional name for the table to create. If None, a name will be generated. Returns: Dict containing success status and table information: - success: True if import was successful, False otherwise - data_id: The unique ID of the stored data - table_name: The name of the table where data is stored """ try: # Get connection con = self.get_connection(db_name) # Generate unique ID data_id = str(uuid.uuid4()) # Generate table name if not provided if not table_name: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') file_basename = os.path.basename(csv_file).split('.')[0] table_name = f"csv_{file_basename}_{timestamp}_{data_id[:8]}" # Sanitize table name (remove special characters) table_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in table_name) # Create a new table from the CSV file con.execute(f""" CREATE TABLE {table_name} AS SELECT * FROM read_csv_auto('{csv_file}') """) # Get the data as JSON for storage in warm_data data_result = con.execute(f"SELECT * FROM {table_name}").fetchall() column_names = [desc[0] for desc in con.description] # Convert to list of dictionaries data_list = [] for row in data_result: data_list.append(dict(zip(column_names, row))) # Convert to JSON data_json = json.dumps(data_list) # Convert metadata to JSON metadata_json = json.dumps(metadata or {}) # Convert tags to JSON tags_json = json.dumps(tags or []) # Store metadata in warm_data for backward compatibility con.execute(""" INSERT INTO warm_data (id, data, metadata, tags, stored_at) VALUES (?, ?, ?, ?, ?) """, [data_id, data_json, metadata_json, tags_json, datetime.now()]) # Store tags for indexing if tags: for tag in tags: con.execute(""" INSERT INTO warm_tags (tag, data_id) VALUES (?, ?) """, [tag, data_id]) self.logger.info(f"Imported CSV file {csv_file} to table {table_name}") return { "success": True, "data_id": data_id, "table_name": table_name } except Exception as e: self.logger.error(f"Error importing CSV file: {e}") return { "success": False, "data_id": None, "table_name": None }