import duckdb
import geopandas as gpd
from pathlib import Path
import logging
from typing import Optional, List, Dict, Any, Tuple, Union
import pyarrow as pa
import pyarrow.parquet as pq
from shapely.geometry import shape
import json
import uuid
import yaml
import os
import sys
from dotenv import load_dotenv
import logging
import pkg_resources
import numpy as np
import pandas as pd
from datetime import datetime
import subprocess
import gzip
import shutil
# Initialize GPU support flags
HAS_GPU_SUPPORT = False
HAS_CUDF = False
HAS_CUSPATIAL = False
try:
import cudf
HAS_CUDF = True
except ImportError:
logging.warning("cudf not available. GPU acceleration for dataframes will be disabled.")
try:
import cuspatial
HAS_CUSPATIAL = True
except ImportError:
logging.warning("cuspatial not available. GPU acceleration for spatial operations will be disabled.")
if HAS_CUDF and HAS_CUSPATIAL:
HAS_GPU_SUPPORT = True
logging.info("GPU support enabled with cudf and cuspatial.")
# Load environment variables
load_dotenv()
import os
import sys
from dotenv import load_dotenv
import logging
#print(f"Using project root: {project_root}")
class Config:
def __init__(self, config_path: str = 'config/db_config.yml'):
"""Initialize configuration by loading the YAML file."""
# Store project root
self.project_root = self._get_project_root()
print(f"[Config] Project root: {self.project_root}")
# Make config_path absolute if it's not already
if not os.path.isabs(config_path):
config_path = os.path.join(self.project_root, config_path)
# Load the configuration
if not os.path.exists(config_path):
raise FileNotFoundError(f"Config file not found at: {config_path}")
self.config = self._load_config(config_path)
# Set default storage path if not specified
if 'storage' not in self.config:
self.config['storage'] = {}
if 'path' not in self.config['storage']:
self.config['storage']['path'] = os.path.join(self.project_root, 'data')
os.makedirs(self.config['storage']['path'], exist_ok=True)
print(f"[Config] Using default storage path: {self.config['storage']['path']}")
def _get_project_root(self) -> str:
"""Get the project root directory."""
# Get the project root from environment variable or compute it
project_root = os.getenv("PROJECT_ROOT")
if not project_root:
# If PROJECT_ROOT is not set, try to find it relative to the current file
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(os.path.dirname(current_dir)))
print(f"[Config] Determined project root: {project_root}")
return project_root
def _load_config(self, config_path: str) -> dict:
"""Load configuration from YAML file"""
print(f"[Config] Loading config from: {config_path}")
with open(config_path, 'r') as f:
return yaml.safe_load(f)
@property
def database_path(self) -> str:
"""Get full database path"""
db_path = os.path.join(
self.config['database']['path'],
self.config['database']['name']
)
if not os.path.isabs(db_path):
db_path = os.path.join(self.project_root, db_path)
return db_path
@property
def raw_data_path(self) -> Path:
"""Get raw data directory path"""
data_path = self.config['data']['raw_path']
if not os.path.isabs(data_path):
data_path = os.path.join(self.project_root, data_path)
return Path(data_path)
@property
def log_path(self) -> str:
"""Get log file path"""
log_path = 'logs/database.log'
if not os.path.isabs(log_path):
log_path = os.path.join(self.project_root, log_path)
# Ensure log directory exists
os.makedirs(os.path.dirname(log_path), exist_ok=True)
return log_path
def _discover_modalities(self):
"""Discover modalities and their tables from folder structure"""
self.modality_tables = {}
raw_path = self.raw_data_path
# Scan through modality folders
for modality_path in raw_path.iterdir():
if modality_path.is_dir():
modality = modality_path.name
# Get all parquet files in this modality folder
parquet_files = [
f.stem for f in modality_path.glob('*.parquet')
]
if parquet_files:
self.modality_tables[modality] = parquet_files
self.config['modalities'] = self.modality_tables
def get_modality_path(self, modality: str) -> Path:
"""Get path for a specific modality"""
return self.raw_data_path / modality
logger = logging.getLogger(__name__)
[docs]class ColdMemory:
"""Cold memory layer using DuckDB for persistent storage."""
[docs] def __init__(self):
"""Initialize cold memory."""
self.logger = logging.getLogger(__name__)
self.config = Config()
# Lazy import to avoid circular dependency
from memories.core.memory_catalog import memory_catalog
self.memory_catalog = memory_catalog
# Initialize database
self.db_path = self.config.database_path
self._initialize_schema()
# Set up paths
self.raw_data_path = self.config.raw_data_path
os.makedirs(self.raw_data_path, exist_ok=True)
def _initialize_schema(self):
"""Initialize database schema."""
try:
# Create data table if it doesn't exist
self.con = duckdb.connect()
self.con.execute("""
CREATE TABLE IF NOT EXISTS cold_data (
id VARCHAR PRIMARY KEY,
data JSON
)
""")
self.logger.info("Initialized cold storage schema")
except Exception as e:
self.logger.error(f"Failed to initialize database schema: {e}")
raise
[docs] async def register_external_file(self, file_path: str) -> None:
"""Register an external file in the cold storage metadata."""
try:
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Get file metadata
file_stats = file_path.stat()
file_type = file_path.suffix.lstrip('.')
# Register in memory catalog
await self.memory_catalog.register_data(
tier="cold",
location=str(file_path),
size=file_stats.st_size,
data_type=file_type,
metadata={
"is_external": True,
"file_path": str(file_path)
}
)
self.logger.info(f"Registered external file: {file_path}")
except Exception as e:
self.logger.error(f"Error registering external file: {e}")
raise
[docs] async def store(
self,
data: Any,
metadata: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None
) -> bool:
"""Store data in cold storage.
Args:
data: Data to store (DataFrame or dictionary)
metadata: Optional metadata about the data
tags: Optional tags for categorizing the data
Returns:
bool: True if storage was successful, False otherwise
"""
try:
# Convert data to DataFrame if needed
if isinstance(data, dict):
df = pd.DataFrame.from_dict(data)
elif isinstance(data, pd.DataFrame):
df = data
else:
logger.error("Data must be a dictionary or DataFrame for cold storage")
return False
# Generate unique ID
data_id = await self.memory_catalog.register_data(
tier="cold",
location=f"cold_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
size=df.memory_usage(deep=True).sum(),
data_type="dataframe",
tags=tags,
metadata=metadata
)
# Store data in DuckDB
self.con.execute(
"INSERT INTO cold_data (id, data) VALUES (?, ?)",
[data_id, df.to_json()]
)
return True
except Exception as e:
logger.error(f"Error storing in cold storage: {e}")
return False
[docs] async def retrieve(self, query: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Retrieve data from cold storage."""
try:
# Get data info from catalog
data_info = await self.memory_catalog.get_data_info(query.get('data_id'))
if not data_info:
return None
# Get data from cold storage
result = self.con.execute("""
SELECT data FROM cold_data
WHERE id = ?
LIMIT 1
""", [data_info['data_id']]).fetchone()
if result:
data = pd.read_json(result[0])
return {
"data": data,
"metadata": json.loads(data_info['additional_meta'])
}
return None
except Exception as e:
logger.error(f"Failed to retrieve data: {e}")
return None
[docs] async def clear(self) -> None:
"""Clear all data from cold storage."""
try:
# Get all cold tier data from catalog
cold_data = await self.memory_catalog.get_tier_data("cold")
# Clear data table
self.con.execute("DELETE FROM cold_data")
# Remove files if they exist
for item in cold_data:
if json.loads(item['additional_meta']).get('is_external', False):
file_path = Path(item['location'])
if file_path.exists():
file_path.unlink()
logger.info("Cleared all cold storage data")
except Exception as e:
logger.error(f"Failed to clear cold storage: {e}")
[docs] async def unregister_file(self, file_id: str) -> bool:
"""Unregister a specific file from cold storage.
Args:
file_id: ID of the file to unregister
Returns:
bool: True if successful, False otherwise
"""
try:
# Get file info from catalog
file_info = await self.memory_catalog.get_data_info(file_id)
if not file_info:
return False
# Remove data if exists
self.con.execute("DELETE FROM cold_data WHERE id = ?", [file_id])
# Remove file if it's external
if json.loads(file_info['additional_meta']).get('is_external', False):
file_path = Path(file_info['location'])
if file_path.exists():
file_path.unlink()
logger.info(f"Successfully unregistered file: {file_id}")
return True
except Exception as e:
logger.error(f"Failed to unregister file {file_id}: {e}")
return False
[docs] async def list_registered_files(self) -> List[Dict]:
"""List all registered files and their metadata."""
try:
# Get all cold tier data from catalog
cold_data = await self.memory_catalog.get_tier_data("cold")
# Filter and format results
files = []
for item in cold_data:
meta = json.loads(item['additional_meta'])
if meta.get('is_external', False):
files.append({
'id': item['data_id'],
'timestamp': item['created_at'],
'size': item['size'],
'file_path': meta.get('file_path'),
'data_type': item['data_type'],
**meta
})
return files
except Exception as e:
self.logger.error(f"Failed to list registered files: {e}")
return []
[docs] def cleanup(self) -> None:
"""Cleanup resources."""
try:
if hasattr(self, 'con') and self.con:
self.con.close()
self.logger.info("Closed DuckDB connection")
except Exception as e:
self.logger.error(f"Error during cleanup: {e}")
[docs] def __del__(self):
"""Ensure cleanup is called when object is destroyed."""
self.cleanup()
[docs] async def get_all_schemas(self):
"""Get all file paths from cold storage metadata and extract their schemas."""
try:
# Get all cold tier data from catalog
cold_data = await self.memory_catalog.get_tier_data("cold")
# Extract schema for each file
schemas = []
for item in cold_data:
meta = json.loads(item['additional_meta'])
if meta.get('is_external', False):
file_path = item['location']
try:
# Use DuckDB to get schema information
schema_query = f"""
DESCRIBE SELECT * FROM parquet_scan('{file_path}')
"""
schema_df = self.con.execute(schema_query).fetchdf()
schema = {
'file_path': file_path,
'columns': list(schema_df['column_name']),
'dtypes': dict(zip(schema_df['column_name'], schema_df['column_type'])),
'type': 'schema'
}
schemas.append(schema)
logger.debug(f"Extracted schema from {file_path}")
except Exception as e:
logger.error(f"Error extracting schema from {file_path}: {e}")
continue
logger.info(f"Extracted schemas from {len(schemas)} files")
return schemas
except Exception as e:
logger.error(f"Error getting file paths from cold storage: {e}")
return []
[docs] async def get_schema(self, data_id: str) -> Optional[Dict[str, Any]]:
"""Get schema information for stored data.
Args:
data_id: ID of the data to get schema for
Returns:
Dictionary containing:
- columns: List of column names
- dtypes: Dictionary mapping column names to their data types
- type: Type of schema (e.g., 'table', 'file', 'dataframe')
- source: Source of the schema (e.g., 'duckdb', 'parquet', 'json')
Returns None if data not found or schema cannot be determined
"""
try:
# Get data from cold storage
result = self.con.execute("""
SELECT data FROM cold_data
WHERE id = ?
LIMIT 1
""", [data_id]).fetchone()
if not result:
return None
# Convert JSON to DataFrame
df = pd.read_json(result[0])
schema = {
'columns': list(df.columns),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'type': 'dataframe',
'source': 'duckdb'
}
return schema
except Exception as e:
self.logger.error(f"Failed to get schema for {data_id}: {e}")
return None