Advanced Features
Overview
memories-dev provides several advanced features for power users and complex use cases.
Custom Memory Stores
Creating Custom Stores
from memories.core.memory import MemoryStore
from memories.core.base import BaseStore
class CustomStore(BaseStore):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.custom_config = kwargs.get('custom_config', {})
def custom_operation(self):
# Implement custom operation
pass
Advanced Querying
Spatial Queries
# Query by location and radius
results = memory_store.query_memories(
location=(37.7749, -122.4194),
radius_km=10,
query_type="spatial"
)
Temporal Queries
# Query by time range
results = memory_store.query_memories(
time_range=("2024-01-01", "2024-02-01"),
temporal_resolution="1h"
)
Semantic Queries
# Query by semantic similarity
results = memory_store.query_memories(
query="urban development near parks",
semantic_threshold=0.85
)
Performance Optimization
Caching Strategies
# Configure caching
memory_store.configure_cache(
cache_size_gb=2,
cache_policy="lru",
ttl_seconds=3600
)
Batch Operations
# Batch process memories
with memory_store.batch_context():
for data in large_dataset:
memory_store.process_memory(data)
Distributed Processing
# Configure distributed processing
memory_store.enable_distributed(
num_workers=4,
scheduler="dynamic"
)
Security Features
Encryption
# Enable encryption
memory_store.enable_encryption(
key_type="aes-256",
key_rotation_days=30
)
Access Control
# Configure access control
memory_store.set_access_control(
read_roles=["analyst", "viewer"],
write_roles=["admin"]
)
Best Practices
Performance Tuning - Profile memory operations - Optimize query patterns - Monitor resource usage
Security - Implement proper authentication - Use encryption when needed - Regular security audits
Scalability - Design for horizontal scaling - Implement proper sharding - Use appropriate caching
GPU Acceleration
memories-dev supports GPU acceleration for model inference and data processing:
from memories.models.load_model import LoadModel
# Initialize model with GPU support
model = LoadModel(
use_gpu=True,
model_provider="deepseek-ai",
deployment_type="local",
model_name="deepseek-coder-small"
)
# For multi-GPU systems, specify a device
model = LoadModel(
use_gpu=True,
device="cuda:1", # Use the second GPU
model_provider="deepseek-ai",
deployment_type="local",
model_name="deepseek-coder-small"
)
The system automatically handles GPU memory management and cleanup:
# Generate text
response = model.get_response("Write a function to calculate factorial")
# Clean up GPU resources when done
model.cleanup()
Deployment Options
Standalone Deployment
For single-instance deployments:
from memories.deployments.standalone import StandaloneDeployment
# Configure standalone deployment
deployment = StandaloneDeployment(
provider="gcp", # "aws", "azure", or "gcp"
config={
"machine_type": "n2-standard-4",
"region": "us-west1",
"zone": "us-west1-a"
}
)
# Deploy the system
deployment.deploy()
Consensus Deployment
For high-reliability distributed deployments:
from memories.deployments.consensus import ConsensusDeployment
# Configure consensus deployment
deployment = ConsensusDeployment(
provider="aws",
config={
"algorithm": "raft",
"min_nodes": 3,
"max_nodes": 5,
"quorum_size": 2
},
node_specs=[
{"id": "node1", "instance_type": "t3.medium", "zone": "us-west-2a"},
{"id": "node2", "instance_type": "t3.medium", "zone": "us-west-2b"},
{"id": "node3", "instance_type": "t3.medium", "zone": "us-west-2c"}
]
)
# Deploy the system
deployment.deploy()
Swarmed Deployment
For scalable, container-based deployments:
from memories.deployments.swarmed import SwarmedDeployment
# Configure swarmed deployment
deployment = SwarmedDeployment(
provider="azure",
config={
"min_nodes": 3,
"max_nodes": 10,
"manager_nodes": 3,
"worker_nodes": 5
}
)
# Deploy the system
deployment.deploy()
API Connectors
memories-dev supports multiple API providers for model inference:
from memories.models.load_model import LoadModel
# OpenAI
openai_model = LoadModel(
model_provider="openai",
deployment_type="api",
model_name="gpt-4",
api_key="your-openai-key" # Or set OPENAI_API_KEY environment variable
)
# Anthropic
anthropic_model = LoadModel(
model_provider="anthropic",
deployment_type="api",
model_name="claude-3-opus",
api_key="your-anthropic-key" # Or set ANTHROPIC_API_KEY environment variable
)
# Deepseek
deepseek_model = LoadModel(
model_provider="deepseek",
deployment_type="api",
model_name="deepseek-chat",
api_key="your-deepseek-key" # Or set DEEPSEEK_API_KEY environment variable
)
Concurrent Data Processing
memories-dev supports concurrent data processing for improved performance:
import asyncio
from memories.data_acquisition.sources.sentinel_api import SentinelAPI
async def process_multiple_regions():
api = SentinelAPI(data_dir="./sentinel_data")
await api.initialize()
# Define multiple regions
regions = [
{
'xmin': -122.4018, 'ymin': 37.7914,
'xmax': -122.3928, 'ymax': 37.7994
},
{
'xmin': -118.2437, 'ymin': 34.0522,
'xmax': -118.2337, 'ymax': 34.0622
}
]
# Process concurrently
tasks = [
api.download_data(
bbox=region,
start_date=start_date,
end_date=end_date,
bands=["B04", "B08"]
)
for region in regions
]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
return results
# Run the concurrent processing
results = asyncio.run(process_multiple_regions())