39.5. Data Utilitiesο
The Data Utilities module provides a comprehensive suite of tools for efficient data processing, querying, and manipulation within the memories.dev framework. These utilities are designed to handle large-scale data operations with optimal performance and memory efficiency.
39.5.1. π Key Featuresο
High-Performance Querying: - Efficient processing of large-scale parquet datasets - Optimized query execution - Parallel processing support - Memory-aware operations
Data Management: - Automatic schema inference - Type-safe operations - Data validation - Error handling
Resource Optimization: - Memory usage control - Parallel execution - Caching strategies - Performance monitoring
39.5.2. Data Acquisitionο
39.5.2.1. Data Managerο
39.5.2.2. Sentinel APIο
39.5.2.3. Landsat APIο
39.5.2.4. OpenStreetMap APIο
39.5.2.5. Overture APIο
39.5.3. Data Processingο
39.5.3.1. Image Processingο
39.5.3.2. Vector Processingο
39.5.3.3. Data Fusionο
39.5.4. Caching Systemο
39.5.5. DuckDB Query Utilitiesο
39.5.6. query_multiple_parquetο
39.5.6.1. Parametersο
parquet_files (List[str]): - List of parquet file paths or glob patterns - Supports both absolute and relative paths - Accepts wildcards for pattern matching
query (str): - SQL query to execute against the parquet files - Supports standard SQL syntax - Allows complex aggregations and joins
parallel (bool, optional): - Enable parallel execution - Defaults to True - Recommended for large datasets
memory_limit (str, optional): - Memory limit for query execution - Defaults to β75%β - Format: percentage or bytes (e.g., β75%β, β8GBβ)
39.5.6.2. Returnsο
- pandas.DataFrame:
Query results as a DataFrame
Column types preserved from source
Index automatically generated
NaN values handled appropriately
39.5.6.3. Raisesο
FileNotFoundError: - No parquet files found at specified paths - Invalid file patterns - Permission issues
QueryExecutionError: - Invalid SQL syntax - Unsupported operations - Runtime errors
MemoryError: - Memory limit exceeded - System resources exhausted - Large result sets
39.5.7. π Example Usageο
39.5.7.1. Basic Queriesο
from memories.utils import query_multiple_parquet
# Simple time-based query
recent_data = query_multiple_parquet(
parquet_files=["data/2025-02-*.parquet"],
query="""
SELECT
timestamp,
location,
measurements
FROM parquet_files
WHERE timestamp >= '2025-02-01'
ORDER BY timestamp DESC
LIMIT 1000
# Spatial query with aggregation location_stats = query_multiple_parquet(
parquet_files=[βdata/locations/*.parquetβ], query=βββ
- SELECT
location, COUNT(*) as event_count, AVG(temperature) as avg_temp
FROM parquet_files GROUP BY location HAVING event_count > 100 ORDER BY avg_temp DESC
39.5.7.2. Advanced Operationsο
# Complex time-series analysis
results = query_multiple_parquet(
parquet_files=[
"data/environmental/*.parquet",
"data/sensors/*.parquet"
],
query="""
WITH hourly_stats AS (
SELECT
date_trunc('hour', timestamp) as hour,
location,
avg(temperature) as avg_temp,
max(temperature) as max_temp,
min(temperature) as min_temp,
count(*) as readings,
stddev(temperature) as temp_stddev
FROM parquet_files
WHERE
timestamp >= '2025-02-01' AND
timestamp < '2025-03-01' AND
temperature BETWEEN -50 AND 50
GROUP BY
date_trunc('hour', timestamp),
location
)
SELECT
hour,
location,
avg_temp,
max_temp,
min_temp,
readings,
temp_stddev,
CASE
WHEN temp_stddev > 5 THEN 'High Variance'
WHEN temp_stddev > 2 THEN 'Moderate Variance'
ELSE 'Stable'
END as stability
FROM hourly_stats
WHERE readings >= 10
ORDER BY hour DESC, location
""",
parallel=True,
memory_limit='50%'
)
39.5.8. β‘ Performance Optimizationο
Data Organization - Partition files by date/time - Use consistent naming patterns - Maintain optimal file sizes - Implement proper compression
Query Optimization - Use appropriate filters - Leverage indexes effectively - Optimize join operations - Minimize data movement
Resource Management - Monitor memory usage - Use chunked processing - Implement proper error handling - Clean up resources
39.5.9. π§ Troubleshootingο
39.5.9.1. Common Issuesο
Performance Problems - Reduce result set size - Optimize query patterns - Adjust memory limits - Use appropriate indexes
Memory Issues - Implement chunking - Reduce parallel operations - Clear unused resources - Monitor memory usage
Data Quality - Validate input data - Handle missing values - Check data types - Verify results
39.5.10. π See Alsoο
βmemory_storeβ - Core memory storage interface
βdata_processingβ - Data processing utilities
βquery_optimizationβ - Query optimization guide
βperformance_tuningβ - Performance tuning tips
39.5.11. Example Usageο
from memories.data_acquisition.data_manager import DataManager
import asyncio
# Initialize data manager
data_manager = DataManager(cache_dir="./data_cache")
# Define area of interest
bbox = {
'xmin': -122.4018,
'ymin': 37.7914,
'xmax': -122.3928,
'ymax': 37.7994
}
# Define async function to get data
async def get_data():
# Get satellite data
satellite_data = await data_manager.get_satellite_data(
bbox_coords=bbox,
start_date="2023-01-01",
end_date="2023-02-01"
)
# Get vector data
vector_data = await data_manager.get_vector_data(
bbox=bbox,
layers=["buildings", "roads"]
)
# Prepare training data
training_data = await data_manager.prepare_training_data(
bbox=bbox,
start_date="2023-01-01",
end_date="2023-02-01",
satellite_collections=["sentinel-2-l2a"],
vector_layers=["buildings", "roads"],
cloud_cover=10.0
)
return satellite_data, vector_data, training_data
# Run the async function
satellite_data, vector_data, training_data = asyncio.run(get_data())