Advanced Features

Overview

memories-dev provides several advanced features for power users and complex use cases.

Custom Memory Stores

Creating Custom Stores

from memories.core.memory import MemoryStore
from memories.core.base import BaseStore

class CustomStore(BaseStore):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.custom_config = kwargs.get('custom_config', {})

    def custom_operation(self):
        # Implement custom operation
        pass

Advanced Querying

Spatial Queries

# Query by location and radius
results = memory_store.query_memories(
    location=(37.7749, -122.4194),
    radius_km=10,
    query_type="spatial"
)

Temporal Queries

# Query by time range
results = memory_store.query_memories(
    time_range=("2024-01-01", "2024-02-01"),
    temporal_resolution="1h"
)

Semantic Queries

# Query by semantic similarity
results = memory_store.query_memories(
    query="urban development near parks",
    semantic_threshold=0.85
)

Performance Optimization

Caching Strategies

# Configure caching
memory_store.configure_cache(
    cache_size_gb=2,
    cache_policy="lru",
    ttl_seconds=3600
)

Batch Operations

# Batch process memories
with memory_store.batch_context():
    for data in large_dataset:
        memory_store.process_memory(data)

Distributed Processing

# Configure distributed processing
memory_store.enable_distributed(
    num_workers=4,
    scheduler="dynamic"
)

Security Features

Encryption

# Enable encryption
memory_store.enable_encryption(
    key_type="aes-256",
    key_rotation_days=30
)

Access Control

# Configure access control
memory_store.set_access_control(
    read_roles=["analyst", "viewer"],
    write_roles=["admin"]
)

Best Practices

  1. Performance Tuning - Profile memory operations - Optimize query patterns - Monitor resource usage

  2. Security - Implement proper authentication - Use encryption when needed - Regular security audits

  3. Scalability - Design for horizontal scaling - Implement proper sharding - Use appropriate caching

GPU Acceleration

memories-dev supports GPU acceleration for model inference and data processing:

from memories.models.load_model import LoadModel

# Initialize model with GPU support
model = LoadModel(
    use_gpu=True,
    model_provider="deepseek-ai",
    deployment_type="local",
    model_name="deepseek-coder-small"
)

# For multi-GPU systems, specify a device
model = LoadModel(
    use_gpu=True,
    device="cuda:1",  # Use the second GPU
    model_provider="deepseek-ai",
    deployment_type="local",
    model_name="deepseek-coder-small"
)

The system automatically handles GPU memory management and cleanup:

# Generate text
response = model.get_response("Write a function to calculate factorial")

# Clean up GPU resources when done
model.cleanup()

Deployment Options

Standalone Deployment

For single-instance deployments:

from memories.deployments.standalone import StandaloneDeployment

# Configure standalone deployment
deployment = StandaloneDeployment(
    provider="gcp",  # "aws", "azure", or "gcp"
    config={
        "machine_type": "n2-standard-4",
        "region": "us-west1",
        "zone": "us-west1-a"
    }
)

# Deploy the system
deployment.deploy()

Consensus Deployment

For high-reliability distributed deployments:

from memories.deployments.consensus import ConsensusDeployment

# Configure consensus deployment
deployment = ConsensusDeployment(
    provider="aws",
    config={
        "algorithm": "raft",
        "min_nodes": 3,
        "max_nodes": 5,
        "quorum_size": 2
    },
    node_specs=[
        {"id": "node1", "instance_type": "t3.medium", "zone": "us-west-2a"},
        {"id": "node2", "instance_type": "t3.medium", "zone": "us-west-2b"},
        {"id": "node3", "instance_type": "t3.medium", "zone": "us-west-2c"}
    ]
)

# Deploy the system
deployment.deploy()

Swarmed Deployment

For scalable, container-based deployments:

from memories.deployments.swarmed import SwarmedDeployment

# Configure swarmed deployment
deployment = SwarmedDeployment(
    provider="azure",
    config={
        "min_nodes": 3,
        "max_nodes": 10,
        "manager_nodes": 3,
        "worker_nodes": 5
    }
)

# Deploy the system
deployment.deploy()

API Connectors

memories-dev supports multiple API providers for model inference:

from memories.models.load_model import LoadModel

# OpenAI
openai_model = LoadModel(
    model_provider="openai",
    deployment_type="api",
    model_name="gpt-4",
    api_key="your-openai-key"  # Or set OPENAI_API_KEY environment variable
)

# Anthropic
anthropic_model = LoadModel(
    model_provider="anthropic",
    deployment_type="api",
    model_name="claude-3-opus",
    api_key="your-anthropic-key"  # Or set ANTHROPIC_API_KEY environment variable
)

# Deepseek
deepseek_model = LoadModel(
    model_provider="deepseek",
    deployment_type="api",
    model_name="deepseek-chat",
    api_key="your-deepseek-key"  # Or set DEEPSEEK_API_KEY environment variable
)

Concurrent Data Processing

memories-dev supports concurrent data processing for improved performance:

import asyncio
from memories.data_acquisition.sources.sentinel_api import SentinelAPI

async def process_multiple_regions():
    api = SentinelAPI(data_dir="./sentinel_data")
    await api.initialize()

    # Define multiple regions
    regions = [
        {
            'xmin': -122.4018, 'ymin': 37.7914,
            'xmax': -122.3928, 'ymax': 37.7994
        },
        {
            'xmin': -118.2437, 'ymin': 34.0522,
            'xmax': -118.2337, 'ymax': 34.0622
        }
    ]

    # Process concurrently
    tasks = [
        api.download_data(
            bbox=region,
            start_date=start_date,
            end_date=end_date,
            bands=["B04", "B08"]
        )
        for region in regions
    ]

    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks)
    return results

# Run the concurrent processing
results = asyncio.run(process_multiple_regions())