"""
High-performance Earth Observation processing library.
This library provides Rust-accelerated functions for common EO/geospatial
computations that can be used within XArray/Dask workflows to bypass Python's GIL.
NOTE: All public spectral and temporal functions accept any numeric NumPy dtype
(int, uint, float32, float64, etc.). Inputs are automatically coerced to float64
in the Rust layer for consistent and stable computation.
"""
from ._core import (
chebyshev_distance as _chebyshev_distance,
composite_mean as _composite_mean,
composite_std as _composite_std,
delta_nbr as _delta_nbr,
delta_ndvi as _delta_ndvi,
enhanced_vegetation_index as _enhanced_vegetation_index,
euclidean_distance as _euclidean_distance,
gci as _gci,
manhattan_distance as _manhattan_distance,
mask_in_range as _mask_in_range,
mask_invalid as _mask_invalid,
mask_out_range as _mask_out_range,
mask_scl as _mask_scl,
mask_with_scl as _mask_with_scl,
mask_vals as _mask_vals,
median as _median,
minkowski_distance as _minkowski_distance,
moving_average_temporal as _moving_average_temporal,
moving_average_temporal_stride as _moving_average_temporal_stride,
nbr as _nbr,
nbr2 as _nbr2,
ndmi as _ndmi,
ndvi as _ndvi,
ndwi as _ndwi,
normalized_difference as _normalized_difference,
pixelwise_transform as _pixelwise_transform,
replace_nans as _replace_nans,
savi as _savi,
linear_regression as _linear_regression,
temporal_sum as _temporal_sum,
temporal_composite as _temporal_composite,
zonal_stats as _zonal_stats,
ZoneStats as _ZoneStats,
binary_dilation as _binary_dilation,
binary_erosion as _binary_erosion,
binary_opening as _binary_opening,
binary_closing as _binary_closing,
bfast_monitor as _bfast_monitor,
complex_classification as _complex_classification,
random_forest_predict as _random_forest_predict,
random_forest_train as _random_forest_train,
haralick_features as _haralick_features,
)
import logging
import structlog
import numpy as np
import xarray as xr
from functools import partial
# Configure structlog for structured, extensible logging
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Set up a standard logger
log = structlog.get_logger()
# Add a handler to print logs to stdout
handler = logging.StreamHandler()
# Use a simple formatter for the handler
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
log.addHandler(handler)
# Set the log level to INFO
log.setLevel(logging.INFO)
__version__ = "0.19.0"
__all__ = [
"chebyshev_distance",
"composite",
"delta_nbr",
"delta_ndvi",
"enhanced_vegetation_index",
"euclidean_distance",
"evi",
"gci",
"log",
"manhattan_distance",
"mask_in_range",
"mask_invalid",
"mask_out_range",
"mask_scl",
"mask_with_scl",
"mask_vals",
"median",
"minkowski_distance",
"moving_average_temporal",
"moving_average_temporal_stride",
"nbr",
"nbr2",
"ndmi",
"ndvi",
"ndwi",
"normalized_difference",
"pixelwise_transform",
"replace_nans",
"savi",
"temporal_mean",
"temporal_std",
"temporal_sum",
"temporal_composite",
"trend_analysis",
"zonal_stats",
"ZoneStats",
"binary_dilation",
"binary_erosion",
"binary_opening",
"binary_closing",
"bfast_monitor",
"complex_classification",
"haralick_features",
"random_forest_predict",
"random_forest_train",
]
def random_forest_train(
features,
labels,
n_estimators=100,
min_samples_split=2,
max_depth=None,
max_features=None,
):
"""
Train a random forest model.
"""
if max_features is None:
max_features = int(np.sqrt(features.shape[1]))
return _random_forest_train(
features, labels, n_estimators, min_samples_split, max_depth, max_features
)
def random_forest_predict(model_json, features):
"""
Predict using a random forest model.
"""
return _random_forest_predict(model_json, features)
def bfast_monitor(
stack,
dates,
history_start_date,
monitor_start_date,
order=1,
h=0.25,
alpha=0.05,
):
"""
BFAST Monitor workflow for change detection.
"""
return _bfast_monitor(
stack,
dates,
history_start_date,
monitor_start_date,
order,
h,
alpha,
)
def complex_classification(blue, green, red, nir, swir1, swir2, temp):
"""
Scaffold for a complex, multi-band, short-circuiting classification workflow.
"""
return _complex_classification(blue, green, red, nir, swir1, swir2, temp)
def _apply_haralick(data_block, window_size, levels, boundary, dtype):
"""Helper to apply Haralick features and handle dask chunk boundaries."""
# If the original block is smaller than the window, no features can be calculated.
if data_block.shape[0] < window_size or data_block.shape[1] < window_size:
empty_shape = (data_block.shape[0], data_block.shape[1])
return (
np.full(empty_shape, np.nan, dtype=dtype),
np.full(empty_shape, np.nan, dtype=dtype),
np.full(empty_shape, np.nan, dtype=dtype),
np.full(empty_shape, np.nan, dtype=dtype),
)
# Pad the block to handle boundaries correctly
padded_block = np.pad(data_block, pad_width=boundary, mode="reflect")
# Calculate features on the padded block
contrast, dissimilarity, homogeneity, entropy = _haralick_features(
padded_block, window_size, levels
)
# Un-pad the results to match the original chunk's dimensions
return (
contrast[boundary:-boundary, boundary:-boundary],
dissimilarity[boundary:-boundary, boundary:-boundary],
homogeneity[boundary:-boundary, boundary:-boundary],
entropy[boundary:-boundary, boundary:-boundary],
)
def haralick_features(
data: xr.DataArray,
window_size: int = 3,
levels: int = 8,
features: list = None,
) -> xr.DataArray:
"""
Calculate Haralick texture features over a sliding window.
This function is designed to work with Dask-backed xarray DataArrays,
allowing for parallel, out-of-memory computation.
:param data: Input 2D xarray.DataArray. Values should be integers,
ideally quantized to the specified number of levels.
:param window_size: The size of the square window for GLCM calculation.
:param levels: Number of gray levels to use for the GLCM. The input data
should be quantized to this range [0, levels-1].
:param features: List of feature names to compute. Defaults to all four:
['contrast', 'dissimilarity', 'homogeneity', 'entropy'].
:return: An xarray.DataArray with a new 'feature' dimension containing
the calculated texture metrics.
"""
if features is None:
features = ["contrast", "dissimilarity", "homogeneity", "entropy"]
if data.ndim != 2:
raise ValueError("Input data must be a 2D xarray.DataArray.")
# Quantize data to the specified number of levels
if data.max() > levels - 1:
log.warning(
"Data contains values greater than `levels`-1. "
"Quantizing data to the range [0, levels-1]."
)
data = (data / data.max() * (levels - 1)).astype(np.uint8)
else:
data = data.astype(np.uint8)
# Dask requires specifying the output template.
template = xr.DataArray(
np.empty((len(features), data.shape[0], data.shape[1]), dtype=np.float64),
dims=("feature",) + data.dims,
coords={"feature": features},
)
# Calculate boundary overlap for dask chunks
boundary = window_size // 2
# Use a partial function to pass static arguments to map_blocks
apply_func = partial(
_apply_haralick,
window_size=window_size,
levels=levels,
boundary=boundary,
dtype=template.dtype,
)
# `map_blocks` applies the function to each Dask chunk.
# We must specify the output chunks, which we can derive from the input.
# We get a tuple of arrays from our rust function, one for each feature
contrast, dissimilarity, homogeneity, entropy = xr.apply_ufunc(
apply_func,
data,
input_core_dims=[("y", "x")],
dask="parallelized",
output_dtypes=[template.dtype] * 4,
output_core_dims=(("y", "x"), ("y", "x"), ("y", "x"), ("y", "x")),
dask_gufunc_kwargs=dict(allow_rechunk=True),
)
# Combine the results into a single DataArray
feature_map = {
"contrast": contrast,
"dissimilarity": dissimilarity,
"homogeneity": homogeneity,
"entropy": entropy,
}
result = xr.concat([feature_map[f] for f in features], dim="feature")
result["feature"] = features
return result
[docs]
def normalized_difference(a, b):
"""
Compute normalized difference (a - b) / (a + b) using the Rust core.
Supports 1D or 2D numpy float arrays; dimensional dispatch occurs in Rust.
"""
return _normalized_difference(a, b)
[docs]
def ndvi(nir, red):
"""
Compute NDVI = (NIR - Red) / (NIR + Red) via Rust core (1D or 2D).
"""
return _ndvi(nir, red)
def linear_regression(y):
"""
Perform simple linear regression on a 1D array.
Returns (slope, intercept, residuals).
"""
return _linear_regression(y)
ZoneStats = _ZoneStats
def zonal_stats(values: np.ndarray, zones: np.ndarray) -> dict[int, ZoneStats]:
"""
Calculate zonal statistics.
Args:
values: Input value array (any numeric dtype, coerced to float64).
zones: Input zone label array (must be broadcastable to values, coerced to int64).
Returns:
Dictionary mapping zone ID (int) to ZoneStats object.
"""
return _zonal_stats(values, zones)
def binary_dilation(input: np.ndarray, kernel_size: int = 3) -> np.ndarray:
"""
Perform binary dilation on a 2D boolean/int array.
Args:
input: 2D input array (treated as boolean: >0 is True).
kernel_size: Size of the square structuring element (default 3).
Returns:
Dilated 2D array (uint8: 0 or 1).
"""
return _binary_dilation(input, kernel_size)
def binary_erosion(input: np.ndarray, kernel_size: int = 3) -> np.ndarray:
"""
Perform binary erosion on a 2D boolean/int array.
Args:
input: 2D input array (treated as boolean: >0 is True).
kernel_size: Size of the square structuring element (default 3).
Returns:
Eroded 2D array (uint8: 0 or 1).
"""
return _binary_erosion(input, kernel_size)
def binary_opening(input: np.ndarray, kernel_size: int = 3) -> np.ndarray:
"""
Perform binary opening (erosion followed by dilation).
Args:
input: 2D input array (treated as boolean: >0 is True).
kernel_size: Size of the square structuring element (default 3).
Returns:
Opened 2D array (uint8: 0 or 1).
"""
return _binary_opening(input, kernel_size)
def binary_closing(input: np.ndarray, kernel_size: int = 3) -> np.ndarray:
"""
Perform binary closing (dilation followed by erosion).
Args:
input: 2D input array (treated as boolean: >0 is True).
kernel_size: Size of the square structuring element (default 3).
Returns:
Closed 2D array (uint8: 0 or 1).
"""
return _binary_closing(input, kernel_size)
[docs]
def ndwi(green, nir):
"""
Compute NDWI = (Green - NIR) / (Green + NIR) via Rust core (1D or 2D).
"""
return _ndwi(green, nir)
[docs]
def savi(nir, red, L=0.5, **kwargs):
"""
Compute Soil Adjusted Vegetation Index (SAVI).
SAVI = (NIR - Red) / (NIR + Red + L) * (1 + L)
Parameters
----------
nir : numpy.ndarray
Near-infrared band.
red : numpy.ndarray
Red band.
L : float, optional
Soil brightness correction factor (default 0.5). Typical range 0–1.
Larger L reduces soil background influence.
**kwargs :
May contain 'l' to specify the soil adjustment factor instead of 'L'.
Returns
-------
numpy.ndarray
SAVI values with same shape as inputs.
Notes
-----
You can call as:
savi(nir, red) # uses L=0.5
savi(nir, red, L=0.25) # custom L
savi(nir, red, l=0.25) # alternative keyword
If both L and l are provided, 'l' takes precedence.
"""
l_val = kwargs.get("l", L)
return _savi(nir, red, l_val)
[docs]
def ndmi(nir, swir1):
"""
Normalized Difference Moisture Index (NDMI)
NDMI = (NIR - SWIR1) / (NIR + SWIR1)
Parameters
----------
nir : numpy.ndarray
Near-infrared band.
swir1 : numpy.ndarray
Short-wave infrared 1 band.
Returns
-------
numpy.ndarray
NDMI values (-1 .. 1).
"""
return _ndmi(nir, swir1)
[docs]
def nbr2(swir1, swir2):
"""
Normalized Burn Ratio 2 (NBR2)
NBR2 = (SWIR1 - SWIR2) / (SWIR1 + SWIR2)
Parameters
----------
swir1 : numpy.ndarray
Short-wave infrared 1 band.
swir2 : numpy.ndarray
Short-wave infrared 2 band.
Returns
-------
numpy.ndarray
NBR2 values (-1 .. 1).
"""
return _nbr2(swir1, swir2)
[docs]
def gci(nir, green):
"""
Green Chlorophyll Index (GCI)
GCI = (NIR / Green) - 1
Parameters
----------
nir : numpy.ndarray
Near-infrared band (any numeric dtype; auto-coerced to float64).
green : numpy.ndarray
Green band (any numeric dtype).
Returns
-------
numpy.ndarray
GCI values (unbounded; typical vegetation > 0).
Notes
-----
Division-by-near-zero guarded; returns 0 where Green is ~0.
"""
return _gci(nir, green)
[docs]
def delta_ndvi(pre_nir, pre_red, post_nir, post_red):
"""
Change in NDVI (pre - post).
Parameters
----------
pre_nir, pre_red : numpy.ndarray
Pre-event near-infrared and red bands.
post_nir, post_red : numpy.ndarray
Post-event near-infrared and red bands.
Returns
-------
numpy.ndarray
ΔNDVI array (same shape as inputs), positive values often indicate vegetation loss.
Notes
-----
Inputs may be any numeric dtype; values are coerced to float64 internally.
"""
return _delta_ndvi(pre_nir, pre_red, post_nir, post_red)
[docs]
def delta_nbr(pre_nir, pre_swir2, post_nir, post_swir2):
"""
Change in NBR (pre - post) for burn severity analysis.
Parameters
----------
pre_nir, pre_swir2 : numpy.ndarray
Pre-event NIR and SWIR2 bands.
post_nir, post_swir2 : numpy.ndarray
Post-event NIR and SWIR2 bands.
Returns
-------
numpy.ndarray
ΔNBR array (same shape as inputs). Larger positive values generally indicate higher burn severity.
Notes
-----
Inputs may be any numeric dtype; internal coercion to float64.
"""
return _delta_nbr(pre_nir, pre_swir2, post_nir, post_swir2)
[docs]
def nbr(nir, swir2):
"""
Compute Normalized Burn Ratio (NBR).
NBR = (NIR - SWIR2) / (NIR + SWIR2)
Parameters
----------
nir : numpy.ndarray
Near-infrared band.
swir2 : numpy.ndarray
Short-wave infrared (SWIR2) band.
Returns
-------
numpy.ndarray
NBR values with same shape as inputs.
"""
return _nbr(nir, swir2)
[docs]
def enhanced_vegetation_index(nir, red, blue):
"""
Compute EVI = 2.5 * (NIR - Red) / (NIR + 6*Red - 7.5*Blue + 1) via Rust core (1D or 2D).
"""
return _enhanced_vegetation_index(nir, red, blue)
# Alias
evi = enhanced_vegetation_index
[docs]
def composite(arr, method="median", **kwargs):
"""
Composite convenience wrapper over temporal aggregation functions.
The composite is computed along the leading time axis for arrays with shape:
- 1D: (time,)
- 2D: (time, bands|features)
- 3D: (time, y, x)
- 4D: (time, band, y, x)
Parameters
----------
arr : numpy.ndarray
Input array (1D–4D). Any numeric dtype accepted; coerced to float64 internally.
method : str, optional
Name of compositing method, one of {"median", "mean", "std"}.
**kwargs
Passed through to the underlying method. This includes `skip_na` (bool)
to control NaN handling (default True).
Returns
-------
numpy.ndarray
Composite with time axis removed. Output dimensionality:
- 1D input -> scalar (float64)
- 2D input -> (bands,)
- 3D input -> (y, x)
- 4D input -> (band, y, x)
Raises
------
ValueError
If method is not recognized.
"""
if method == "median":
return median(arr, **kwargs)
elif method == "mean":
return _composite_mean(arr, **kwargs)
elif method == "std":
return _composite_std(arr, **kwargs)
else:
raise ValueError(f"Unknown composite method: {method}")
[docs]
def temporal_mean(arr, skip_na=True):
"""
Compute the mean along the leading time axis of a 1D–4D time‑first array.
Parameters
----------
arr : numpy.ndarray
Time‑first array (1D–4D). Shapes:
(T,), (T, F), (T, Y, X), (T, B, Y, X).
skip_na : bool, default True
If True, NaNs are excluded per pixel/band; all‑NaN series produce NaN.
If False, any NaN in a series propagates NaN to the output position.
Returns
-------
numpy.ndarray
Mean with time axis removed; float64 dtype. Scalar for 1D input.
"""
return composite(arr, method="mean", skip_na=skip_na)
[docs]
def temporal_std(arr, skip_na=True):
"""
Compute the sample standard deviation (ddof=1) along the leading time axis
of a 1D–4D time‑first array.
Parameters
----------
arr : numpy.ndarray
Time‑first array (1D–4D). Shapes:
(T,), (T, F), (T, Y, X), (T, B, Y, X).
skip_na : bool, default True
If True, NaNs are excluded before variance; fewer than 2 valid values
yield NaN. If False, any NaN in a series propagates NaN.
Returns
-------
numpy.ndarray
Standard deviation with time axis removed; float64 dtype. Scalar for 1D input.
"""
return composite(arr, method="std", skip_na=skip_na)
[docs]
def euclidean_distance(points_a, points_b):
"""
Compute pairwise Euclidean distances between two point sets.
Parameters
----------
points_a : numpy.ndarray (N, D)
points_b : numpy.ndarray (M, D)
Returns
-------
numpy.ndarray (N, M)
Distance matrix where element (i, j) is distance between
points_a[i] and points_b[j].
"""
return _euclidean_distance(points_a, points_b)
[docs]
def manhattan_distance(points_a, points_b):
"""
Compute pairwise Manhattan (L1) distances between two point sets.
See `euclidean_distance` for shape conventions.
"""
return _manhattan_distance(points_a, points_b)
[docs]
def chebyshev_distance(points_a, points_b):
"""
Compute pairwise Chebyshev (L∞) distances between two point sets.
"""
return _chebyshev_distance(points_a, points_b)
[docs]
def minkowski_distance(points_a, points_b, p):
"""
Compute pairwise Minkowski distances (order `p`) between two point sets.
Parameters
----------
points_a : numpy.ndarray (N, D)
First point set.
points_b : numpy.ndarray (M, D)
Second point set.
p : float
Norm order (must be >= 1). p=1 → Manhattan, p=2 → Euclidean,
large p → approximates Chebyshev (L∞).
Returns
-------
numpy.ndarray (N, M)
Distance matrix.
Raises
------
ValueError
If p < 1.0 (propagated from the Rust implementation).
"""
return _minkowski_distance(points_a, points_b, p)
[docs]
def mask_vals(arr, values=None, fill_value=None, nan_to=None):
"""
Mask specified values (exact equality) and optionally replace NaNs.
Parameters
----------
arr : numpy.ndarray (1D–4D)
Input array; any numeric dtype accepted (coerced to float64 internally).
values : sequence, optional
Iterable of numeric codes to mask. If None, no value masking is performed.
fill_value : float, optional
Value to write for masked codes. Defaults to NaN when None.
nan_to : float, optional
If provided, all NaNs (original or created by masking) are replaced with this value
after masking.
Returns
-------
numpy.ndarray (float64)
Masked array preserving original shape.
Notes
-----
This is a thin pass-through to the Rust implementation; see README “Masking Utilities”.
"""
return _mask_vals(arr, values=values, fill_value=fill_value, nan_to=nan_to)
[docs]
def replace_nans(arr, value):
"""
Replace all NaNs in `arr` with `value`.
Parameters
----------
arr : numpy.ndarray
Input array (1D–4D supported).
value : float
Replacement for every NaN.
Returns
-------
numpy.ndarray
Array with NaNs replaced.
"""
return _replace_nans(arr, value)
[docs]
def mask_out_range(arr, min_val=None, max_val=None, fill_value=None):
"""
Mask values outside a specified numeric range [min_val, max_val].
Parameters
----------
arr : numpy.ndarray
Input array (1D–4D).
min_val : float, optional
Minimum valid value (inclusive).
max_val : float, optional
Maximum valid value (inclusive).
fill_value : float, optional
Value for masked positions (default NaN).
Returns
-------
numpy.ndarray
Masked array.
"""
return _mask_out_range(arr, min=min_val, max=max_val, fill_value=fill_value)
[docs]
def mask_invalid(arr, invalid_values, fill_value=None):
"""
Mask a list of common invalid sentinel values.
Parameters
----------
arr : numpy.ndarray
Input array.
invalid_values : sequence
List of numeric codes to mask.
fill_value : float, optional
Value for masked positions (default NaN).
Returns
-------
numpy.ndarray
Masked array.
"""
return _mask_invalid(arr, invalid_values=invalid_values, fill_value=fill_value)
[docs]
def mask_in_range(arr, min_val=None, max_val=None, fill_value=None):
"""
Mask values inside a specified numeric range [min_val, max_val].
Parameters
----------
arr : numpy.ndarray
Input array (1D–4D).
min_val : float, optional
Minimum value of range to mask (inclusive).
max_val : float, optional
Maximum value of range to mask (inclusive).
fill_value : float, optional
Value for masked positions (default NaN).
Returns
-------
numpy.ndarray
Masked array.
"""
return _mask_in_range(arr, min=min_val, max=max_val, fill_value=fill_value)
[docs]
def mask_scl(scl, keep_codes=None, fill_value=None):
"""
Mask a Sentinel-2 Scene Classification Layer (SCL) array.
By default, keeps vegetation, water, bare soil, and snow.
Parameters
----------
scl : numpy.ndarray
SCL array.
keep_codes : sequence, optional
List of SCL codes to keep. Defaults to [4, 5, 6, 7, 11].
fill_value : float, optional
Value for masked positions (default NaN).
Returns
-------
numpy.ndarray
Masked SCL array.
"""
return _mask_scl(scl, keep_codes=keep_codes, fill_value=fill_value)
[docs]
def mask_with_scl(data, scl, mask_codes=None, fill_value=None):
"""
Apply SCL-based masking to a data array.
This function masks pixels in the data array based on the Scene Classification
Layer (SCL) values. Unlike ``mask_scl`` which only returns a masked SCL array,
this function applies the mask to actual data (e.g., spectral bands).
Supported array shapes:
- 2D data (y, x) with 2D SCL (y, x)
- 3D data (time, y, x) with 3D SCL (time, y, x)
- 4D data (time, band, y, x) with 3D SCL (time, y, x) - SCL broadcast across bands
Parameters
----------
data : numpy.ndarray
The data array to mask (2D, 3D, or 4D).
scl : numpy.ndarray
The SCL array (2D or 3D). For 4D data, SCL should be 3D (time, y, x).
mask_codes : sequence of float, optional
SCL codes to mask (set to fill_value). Defaults to clouds/shadows/etc:
[0, 1, 2, 3, 8, 9, 10] (no data, saturated, dark, shadow, cloud med/high, cirrus).
fill_value : float, optional
Value to assign to masked pixels. Defaults to NaN.
Returns
-------
numpy.ndarray
Data array with masked pixels replaced by fill_value.
Examples
--------
>>> import numpy as np
>>> from eo_processor import mask_with_scl
>>> # 3D example: (time=2, y=3, x=3)
>>> data = np.ones((2, 3, 3), dtype=np.float64)
>>> scl = np.array([[[4, 4, 9], [4, 8, 4], [4, 4, 4]],
... [[4, 4, 4], [3, 4, 4], [4, 4, 10]]], dtype=np.float64)
>>> result = mask_with_scl(data, scl)
>>> # Pixels with SCL codes 9, 8, 3, 10 are now NaN
>>> np.isnan(result[0, 0, 2]) # SCL=9 (cloud high)
True
"""
return _mask_with_scl(data, scl, mask_codes=mask_codes, fill_value=fill_value)
[docs]
def moving_average_temporal(arr, window, skip_na=True, mode="same"):
"""
Sliding window mean along leading time axis of a 1D–4D time-first array.
Parameters
----------
arr : numpy.ndarray
Time-first array (T,...).
window : int
Window size (>=1).
skip_na : bool, default True
Exclude NaNs from window mean; if all NaN -> NaN.
mode : {"same","valid"}, default "same"
"same": output length equals T (edge windows shrink).
"valid": only full windows; output length = T - window + 1.
Returns
-------
numpy.ndarray
"""
return _moving_average_temporal(arr, window, skip_na=skip_na, mode=mode)
[docs]
def moving_average_temporal_stride(arr, window, stride, skip_na=True, mode="same"):
"""
Stride-based sliding window mean along leading time axis.
Computes moving_average_temporal then samples every `stride` steps
along the time axis to reduce temporal resolution.
Parameters
----------
arr : numpy.ndarray
Time-first array (T,...).
window : int
Window size (>=1).
stride : int
Sampling interval along output time axis (>=1).
skip_na : bool, default True
Exclude NaNs from window mean; if all NaN -> NaN.
mode : {"same","valid"}, default "same"
"same": output length equals T (variable-size edges).
"valid": only full windows; base length = T - window + 1.
Returns
-------
numpy.ndarray
Downsampled moving average with time dimension approximately
ceil(base_length / stride).
"""
return _moving_average_temporal_stride(
arr, window, stride, skip_na=skip_na, mode=mode
)
[docs]
def temporal_sum(arr, skip_na=True):
"""
Compute the sum along the leading time axis of a 1D–4D time‑first array.
Parameters
----------
arr : numpy.ndarray
Time‑first array (1D–4D).
skip_na : bool, default True
If True, NaNs are excluded.
Returns
-------
numpy.ndarray
Sum with time axis removed; float64 dtype. Scalar for 1D input.
"""
return _temporal_sum(arr, skip_na=skip_na)
[docs]
def temporal_composite(arr, weights, skip_na=True):
"""
Compute a temporal composite of a 4D array using a weighted median.
Parameters
----------
arr : numpy.ndarray
4D array with shape (time, bands, y, x).
weights : numpy.ndarray
1D array of weights with the same length as the time dimension of arr.
skip_na : bool, default True
If True, NaNs are excluded.
Returns
-------
numpy.ndarray
Composited 3D array with shape (bands, y, x).
"""
return _temporal_composite(arr, weights, skip_na=skip_na)