37.1. Data Processingο
37.1.1. Introduction to Earth Data Processingο
The Earth Memory framework provides powerful capabilities for processing, transforming, and analyzing Earth observation data. This guide covers the data processing pipeline, available processors, and how to create custom processing workflows for your Earth Memory applications.
37.1.2. Data Processing Pipelineο
The data processing pipeline in Earth Memory consists of several stages:
Data Acquisition: Retrieving raw data from various sources
Preprocessing: Cleaning, normalizing, and preparing data for analysis
Feature Extraction: Identifying and extracting relevant features from the data
Transformation: Converting data between different formats and representations
Analysis: Applying algorithms to extract insights from the data
Memory Formation: Creating structured memories from processed data
Memory Integration: Connecting new memories with existing knowledge
37.1.3. Basic Data Processing Workflowο
Hereβs a simple example of a data processing workflow:
from memories.earth import Observatory
from memories.earth.processors import (
CloudMasking,
NormalizedDifferenceIndex,
Resampling,
TemporalAggregation
)
from memories.earth.pipelines import ProcessingPipeline
# Create your observatory with data source
observatory = Observatory(name="vegetation-observatory")
# ... add your data sources ...
# Create a processing pipeline
pipeline = ProcessingPipeline(
name="ndvi-pipeline",
description="Calculates NDVI from satellite imagery"
)
# Add processing steps to the pipeline
pipeline.add_step(
CloudMasking(
method="sentinel2_scl",
mask_values=[3, 8, 9, 10], # cloud, cloud shadow, etc.
fill_value=None # use NaN for masked pixels
)
)
pipeline.add_step(
NormalizedDifferenceIndex(
name="ndvi",
band1="B08", # NIR band
band2="B04", # Red band
description="Normalized Difference Vegetation Index"
)
)
pipeline.add_step(
Resampling(
target_resolution="30m",
method="bilinear"
)
)
pipeline.add_step(
TemporalAggregation(
period="monthly",
function="mean",
min_valid_observations=3
)
)
# Register the pipeline with the observatory
observatory.register_pipeline(pipeline)
# Run the pipeline for a specific area and time range
result = observatory.run_pipeline(
pipeline_name="ndvi-pipeline",
area_of_interest={"type": "Polygon", "coordinates": [...]},
time_range=("2023-01-01", "2023-12-31")
)
# Access the processed data
ndvi_timeseries = result.get_data()
# Save the results
result.save("ndvi_monthly_2023.tif")
37.1.4. Available Processorsο
Earth Memory includes a wide range of built-in processors for common data processing tasks:
37.1.4.1. Image Processingο
Processor |
Description |
|---|---|
|
Detects and masks clouds in satellite imagery |
|
Increases spatial resolution of multispectral imagery |
|
Corrects for atmospheric effects in optical imagery |
|
Performs arithmetic operations on image bands |
|
Aligns multiple images to a common coordinate system |
|
Combines multiple images into a single seamless image |
37.1.4.2. Indices and Transformationsο
Processor |
Description |
|---|---|
|
Calculates normalized difference indices (NDVI, NDWI, etc.) |
|
Performs Tasseled Cap transformation (brightness, greenness, wetness) |
|
Reduces dimensionality of multispectral data |
|
Decomposes pixel values into endmember fractions |
|
Extracts texture features from imagery |
|
Corrects for topographic effects on reflectance |
37.1.4.3. Spatial Analysisο
Processor |
Description |
|---|---|
|
Changes the spatial resolution of data |
|
Converts data between different coordinate systems |
|
Applies spatial filters (e.g., Gaussian, median) |
|
Segments imagery into objects for analysis |
|
Calculates statistics for regions of interest |
|
Extracts landform and terrain features |
37.1.4.4. Temporal Analysisο
Processor |
Description |
|---|---|
|
Aggregates data over time periods (daily, monthly, etc.) |
|
Analyzes temporal patterns and trends |
|
Identifies changes between time periods |
|
Separates seasonal, trend, and residual components |
|
Fills missing values in time series |
|
Reduces noise in time series data |
37.1.4.5. Machine Learningο
Processor |
Description |
|---|---|
|
Groups similar data points together |
|
Models relationships between variables |
|
Assigns categories to data |
|
Identifies unusual patterns in data |
|
Extracts meaningful features from raw data |
|
Applies neural networks to Earth observation data |
37.1.5. Creating Custom Processorsο
You can create custom processors for specialized tasks:
from memories.earth.processors import BaseProcessor
import numpy as np
class BurnAreaIndex(BaseProcessor):
"""Calculate the Burn Area Index (BAI) from satellite imagery."""
def __init__(self, name="bai", description=None):
super().__init__(name=name, description=description)
self.requires_bands = ["B04", "B08"] # RED and NIR bands
def process(self, data):
"""
Calculate BAI = 1 / ((0.1 - RED)^2 + (0.06 - NIR)^2)
"""
red = data["B04"]
nir = data["B08"]
# Calculate BAI
bai = 1.0 / ((0.1 - red)**2 + (0.06 - nir)**2)
# Add to output
data[self.name] = bai
return data
# Use the custom processor in a pipeline
pipeline.add_step(
BurnAreaIndex(
name="bai",
description="Burn Area Index for fire detection"
)
)
37.1.6. Processor Configurationο
Processors can be configured using both Python API and YAML configuration files:
# processors.yml
pipelines:
- name: ndvi-pipeline
description: Calculates NDVI from satellite imagery
steps:
- type: CloudMasking
params:
method: sentinel2_scl
mask_values: [3, 8, 9, 10]
fill_value: null
- type: NormalizedDifferenceIndex
params:
name: ndvi
band1: B08
band2: B04
description: Normalized Difference Vegetation Index
- type: Resampling
params:
target_resolution: 30m
method: bilinear
- type: TemporalAggregation
params:
period: monthly
function: mean
min_valid_observations: 3
Load the configuration file in your code:
# Load processing pipelines from configuration
observatory.load_pipelines_config("processors.yml")
37.1.7. Distributed Processingο
For large-scale processing, Earth Memory supports distributed execution:
from memories.earth.execution import DistributedExecutor
# Create a distributed executor
executor = DistributedExecutor(
backend="dask", # or "ray", "spark", etc.
n_workers=4,
memory_per_worker="4GB"
)
# Run the pipeline with the distributed executor
result = observatory.run_pipeline(
pipeline_name="ndvi-pipeline",
area_of_interest={"type": "Polygon", "coordinates": [...]},
time_range=("2023-01-01", "2023-12-31"),
executor=executor
)
37.1.8. Memory Formation from Processed Dataο
After processing, you can create Earth Memories from the results:
from memories.earth import MemoryCreator
# Create memories from processing results
memory_creator = MemoryCreator()
# Create a memory from the NDVI data
vegetation_memory = memory_creator.create_memory(
name="vegetation-dynamics-2023",
description="Vegetation dynamics over the year 2023",
data=result.get_data(),
type="warm", # Memory tier
metadata={
"resolution": "30m",
"temporal_coverage": "2023-01-01/2023-12-31",
"region": "Amazon Basin",
"processing_pipeline": "ndvi-pipeline"
},
tags=["vegetation", "ndvi", "amazon", "2023"]
)
# Store the memory in the memory codex
from memories.earth import MemoryCodex
codex = MemoryCodex()
codex.add_memory(vegetation_memory)
37.1.9. Advanced Processing Patternsο
Chain multiple pipelines together for complex workflows:
# First pipeline: Preprocess satellite imagery
preprocess_pipeline = ProcessingPipeline(
name="preprocess-pipeline",
description="Preprocesses satellite imagery"
)
# ... add preprocessing steps ...
# Second pipeline: Calculate indices
indices_pipeline = ProcessingPipeline(
name="indices-pipeline",
description="Calculates various indices"
)
# ... add index calculation steps ...
# Third pipeline: Perform change detection
change_pipeline = ProcessingPipeline(
name="change-pipeline",
description="Detects changes over time"
)
# ... add change detection steps ...
# Chain the pipelines
observatory.register_pipeline(preprocess_pipeline)
observatory.register_pipeline(indices_pipeline, depends_on="preprocess-pipeline")
observatory.register_pipeline(change_pipeline, depends_on="indices-pipeline")
# Run the complete workflow
result = observatory.run_workflow(
starting_pipeline="preprocess-pipeline",
area_of_interest={"type": "Polygon", "coordinates": [...]},
time_range=("2022-01-01", "2023-12-31")
)
37.1.10. Monitoring and Debuggingο
Monitor processing jobs and debug issues:
# Get status of running jobs
jobs = observatory.get_jobs()
for job in jobs:
print(f"Job ID: {job.id}, Status: {job.status}, Progress: {job.progress}%")
# Get detailed logs from a job
logs = observatory.get_job_logs(job_id="12345")
# Debug a specific step in a pipeline
debug_result = observatory.debug_pipeline_step(
pipeline_name="ndvi-pipeline",
step_index=1, # The step to debug (0-based index)
sample_data=sample_input, # Sample input data for testing
verbose=True
)
37.1.11. Next Stepsο
After learning about data processing:
Explore memory types in Memory Types
Learn about integrating AI capabilities in AI Integration with memories-dev
Set up memory storage options in Memory Storage