Skip to content

Extract Statistical Information

Insight into the earthstat.analysis_aggregation module, which facilitates advanced data aggregation methods for geospatial analysis, enhancing efficiency and accuracy.

Data Aggregation Techniques

Aggregating Data with Aggregate Process Function

conAggregate(predictor_dir, shapefile_path, output_csv_path, mask_path=None, use_mask=False, invalid_values=None, calculation_mode='overall_mean', predictor_name='Value', all_touched=False)

Aggregates raster values to polygons in a shapefile, optionally using a crop mask for weighted calculations.

Parameters:

Name Type Description Default
predictor_dir str

Directory containing raster datasets.

required
shapefile_path str

Path to the shapefile with polygons for aggregation.

required
output_csv_path str

Path where the aggregated output CSV will be saved.

required
crop_mask_path str

Path to the crop mask raster, required if use_crop_mask is True.

required
use_crop_mask bool

Whether to use the crop mask for weighted aggregation.

required
predictor_name str

Column name for the aggregated values in the output CSV.

'Value'

Exceptions:

Type Description
ValueError

If use_crop_mask is True but crop_mask_path is not provided.

Aggregates values from each raster within the specified directory to each polygon in the shapefile, writing the results to a CSV file. If a crop mask is used, values are aggregated using weights from the mask; otherwise, simple averaging is applied.

Source code in earthstat/analysis_aggregation/aggregate_process.py
def conAggregate(

        predictor_dir,
        shapefile_path,
        output_csv_path,
        mask_path=None,
        use_mask=False,
        invalid_values=None,
        calculation_mode="overall_mean",
        predictor_name="Value",
        all_touched=False
):
    """
    Aggregates raster values to polygons in a shapefile, optionally using a crop mask for weighted calculations.

    Args:
        predictor_dir (str): Directory containing raster datasets.
        shapefile_path (str): Path to the shapefile with polygons for aggregation.
        output_csv_path (str): Path where the aggregated output CSV will be saved.
        crop_mask_path (str, optional): Path to the crop mask raster, required if use_crop_mask is True.
        use_crop_mask (bool): Whether to use the crop mask for weighted aggregation.
        predictor_name (str): Column name for the aggregated values in the output CSV.

    Raises:
        ValueError: If use_crop_mask is True but crop_mask_path is not provided.

    Aggregates values from each raster within the specified directory to each polygon in the shapefile,
    writing the results to a CSV file. If a crop mask is used, values are aggregated using weights from
    the mask; otherwise, simple averaging is applied.
    """
    predictor_paths = loadTiff(predictor_dir)
    data_list = []

    shape_file = gpd.read_file(shapefile_path)

    if use_mask and not mask_path:
        raise ValueError("Mask path must be provided if use_mask is True.")

    for raster_path in tqdm(predictor_paths, desc="Processing rasters", unit="raster"):

        # Directly call the processing function for each raster
        data = process_and_aggregate_raster(

            raster_path,
            shape_file,
            invalid_values,
            use_mask,
            mask_path,
            calculation_mode,
            predictor_name,
            all_touched
        )

        data_list.extend(data)

    df = pd.DataFrame(data_list)
    df[predictor_name] = df[predictor_name].round(3)
    df.to_csv(output_csv_path, index=False)

process_and_aggregate_raster(raster_path, shape_file, invalid_values=None, use_mask=False, mask_path=None, calculation_mode='overall_mean', predictor_name='Value', all_touched=False)

Processes a single raster for aggregation into shapefile geometries.

Parameters:

Name Type Description Default
raster_path str

Path to the raster file.

required
shape_file GeoDataFrame

Loaded shapefile for geometries.

required
invalid_values list

Values to consider as invalid in raster.

None
use_mask bool

If True, uses an additional mask for calculations.

False
mask_path str

Path to the mask file, required if use_mask is True.

None
calculation_mode str

Mode of calculation ('overall_mean', 'weighted_mean', or 'filtered_mean').

'overall_mean'
predictor_name str

Column name for the output data.

'Value'
all_touched bool

Consider all pixels that touch geometry for masking.

False

Returns:

Type Description
list

Aggregated data for each geometry in the shapefile.

Source code in earthstat/analysis_aggregation/aggregate_process.py
def process_and_aggregate_raster(

    raster_path,
    shape_file,
    invalid_values=None,
    use_mask=False,
    mask_path=None,
    calculation_mode="overall_mean",
    predictor_name="Value",
    all_touched=False
):
    """
    Processes a single raster for aggregation into shapefile geometries.

    Args:
        raster_path (str): Path to the raster file.
        shape_file (GeoDataFrame): Loaded shapefile for geometries.
        invalid_values (list, optional): Values to consider as invalid in raster.
        use_mask (bool): If True, uses an additional mask for calculations.
        mask_path (str, optional): Path to the mask file, required if use_mask is True.
        calculation_mode (str): Mode of calculation ('overall_mean', 'weighted_mean', or 'filtered_mean').
        predictor_name (str): Column name for the output data.
        all_touched (bool): Consider all pixels that touch geometry for masking.

    Returns:
        list: Aggregated data for each geometry in the shapefile.
    """

    file_name = os.path.basename(raster_path)
    date_str = extractDateFromFilename(file_name)
    aggregated_data = []

    with rasterio.open(raster_path) as src:
        no_data_value = src.nodata
        geoms = [mapping(shape) for shape in shape_file.geometry]

        mask_no_data_value = None
        mask_src = None

        if use_mask and mask_path:
            mask_src = rasterio.open(mask_path)
            mask_no_data_value = mask_src.nodata

        for index, geom in enumerate(geoms):
            geom_mask, geom_transform = mask(
                src, [geom], crop=True, all_touched=all_touched)
            geom_mask = geom_mask.astype('float32')
            geom_mask[geom_mask == no_data_value] = np.nan

            if invalid_values:
                for invalid_value in invalid_values:
                    geom_mask[geom_mask == invalid_value] = np.nan

            if use_mask and mask_path and mask_src:
                crop_mask, _ = mask(
                    mask_src, [geom], crop=True, all_touched=all_touched)

                if calculation_mode == "weighted_mean":
                    valid_mask = (crop_mask[0] != mask_no_data_value)
                    valid_data = geom_mask[0][valid_mask]
                    valid_weights = crop_mask[0][valid_mask]
                    mean_value = np.nansum(valid_data * valid_weights) / np.nansum(
                        valid_weights) if np.nansum(valid_weights) > 0 else np.nan

                elif calculation_mode == "filtered_mean":
                    valid_mask = (crop_mask[0] != mask_no_data_value)
                    masked_data = geom_mask[0][valid_mask]
                    mean_value = np.nanmean(masked_data) if np.nansum(
                        masked_data) > 0 else np.nan

            elif calculation_mode == "overall_mean" or not use_mask:
                mean_value = np.nanmean(geom_mask)

            new_row = {col: shape_file.iloc[index][col]
                       for col in shape_file.columns if col != 'geometry'}
            new_row.update({'date': date_str, predictor_name: mean_value})
            aggregated_data.append(new_row)

        if mask_src:
            mask_src.close()

    return aggregated_data

Efficient Aggregation via Parallel Clip Aggregate Function

parallelAggregate(predictor_dir, shapefile_path, output_csv_path, mask_path=None, use_mask=False, invalid_values=None, calculation_mode='overall_mean', predictor_name='Value', all_touched=False, max_workers=None)

Aggregates raster data from a directory in parallel into shapefile geometries, optionally using a mask.

Parameters:

Name Type Description Default
predictor_dir str

Directory containing raster datasets.

required
shapefile_path str

Path to the shapefile.

required
output_csv_path str

Path to save the aggregated CSV.

required
mask_path str

Path to the mask file, required if use_mask is True.

None
use_mask bool

Use a mask for the aggregation process.

False
invalid_values list

List of values to treat as invalid in the raster data.

None
calculation_mode str

Determines how values are aggregated ('overall_mean', 'weighted_mean', or 'filtered_mean').

'overall_mean'
predictor_name str

Name for the output predictor column.

'Value'
all_touched bool

Include all pixels touching geometry in the aggregation.

False

Exceptions:

Type Description
ValueError

If use_mask is True and mask_path is not provided.

Returns a CSV with aggregated data per shapefile geometry. Utilizes multiprocessing for efficiency.

Source code in earthstat/analysis_aggregation/parallel_clip_aggregate.py
def parallelAggregate(
    predictor_dir,
    shapefile_path,
    output_csv_path,
    mask_path=None,
    use_mask=False,
    invalid_values=None,
    calculation_mode="overall_mean",
    predictor_name="Value",
    all_touched=False,
    max_workers=None
):
    """
    Aggregates raster data from a directory in parallel into shapefile geometries, optionally using a mask.

    Args:
        predictor_dir (str): Directory containing raster datasets.
        shapefile_path (str): Path to the shapefile.
        output_csv_path (str): Path to save the aggregated CSV.
        mask_path (str, optional): Path to the mask file, required if use_mask is True.
        use_mask (bool): Use a mask for the aggregation process.
        invalid_values (list, optional): List of values to treat as invalid in the raster data.
        calculation_mode (str): Determines how values are aggregated ('overall_mean', 'weighted_mean', or 'filtered_mean').
        predictor_name (str): Name for the output predictor column.
        all_touched (bool): Include all pixels touching geometry in the aggregation.

    Raises:
        ValueError: If use_mask is True and mask_path is not provided.

    Returns a CSV with aggregated data per shapefile geometry. Utilizes multiprocessing for efficiency.
    """
    if not max_workers:
        max_workers = os.cpu_count() - 1 if os.cpu_count() > 1 else 1

    predictor_paths = loadTiff(predictor_dir)
    data_list = []

    shape_file = gpd.read_file(shapefile_path)

    if use_mask and not mask_path:
        raise ValueError("Mask path must be provided if use_mask is True.")

    # Prepare arguments for starmap
    task_args = [
        (
            raster_path,
            shape_file,
            invalid_values,
            use_mask,
            mask_path,
            calculation_mode,
            predictor_name,
            all_touched
        ) for raster_path in predictor_paths
    ]

    # with multiprocessing.Pool(processes=max_workers) as pool:
    with Pool(processes=max_workers) as pool:
        results = []
        # Wrap pool.imap or pool.imap_unordered for a real-time tqdm progress bar
        # results = list(tqdm(pool.starmap(process_and_aggregate_raster, task_args), total=len(
        #     task_args), desc="Processing rasters", unit="raster"))

        # for result in results:
        #     data_list.extend(result)
        for result in tqdm(pool.imap(process_wrapper, task_args, chunksize=1), total=len(task_args), desc="Processing rasters", unit="raster"):
            results.append(result)
            data_list.extend(result)

    df = pd.DataFrame(data_list)
    df[predictor_name] = df[predictor_name].round(3)
    df.to_csv(output_csv_path, index=False)

process_and_aggregate_raster(raster_path, shape_file, invalid_values=None, use_mask=False, mask_path=None, calculation_mode='overall_mean', predictor_name='Value', all_touched=False)

Processes a single raster for aggregation into shapefile geometries.

Parameters:

Name Type Description Default
raster_path str

Path to the raster file.

required
shape_file GeoDataFrame

Loaded shapefile for geometries.

required
invalid_values list

Values to consider as invalid in raster.

None
use_mask bool

If True, uses an additional mask for calculations.

False
mask_path str

Path to the mask file, required if use_mask is True.

None
calculation_mode str

Mode of calculation ('overall_mean', 'weighted_mean', or 'filtered_mean').

'overall_mean'
predictor_name str

Column name for the output data.

'Value'
all_touched bool

Consider all pixels that touch geometry for masking.

False

Returns:

Type Description
list

Aggregated data for each geometry in the shapefile.

Source code in earthstat/analysis_aggregation/parallel_clip_aggregate.py
def process_and_aggregate_raster(

        raster_path,
        shape_file,
        invalid_values=None,
        use_mask=False,
        mask_path=None,
        calculation_mode="overall_mean",
        predictor_name="Value",
        all_touched=False
):
    """
    Processes a single raster for aggregation into shapefile geometries.

    Args:
        raster_path (str): Path to the raster file.
        shape_file (GeoDataFrame): Loaded shapefile for geometries.
        invalid_values (list, optional): Values to consider as invalid in raster.
        use_mask (bool): If True, uses an additional mask for calculations.
        mask_path (str, optional): Path to the mask file, required if use_mask is True.
        calculation_mode (str): Mode of calculation ('overall_mean', 'weighted_mean', or 'filtered_mean').
        predictor_name (str): Column name for the output data.
        all_touched (bool): Consider all pixels that touch geometry for masking.

    Returns:
        list: Aggregated data for each geometry in the shapefile.
    """
    file_name = os.path.basename(raster_path)
    date_str = extractDateFromFilename(file_name)
    aggregated_data = []

    with rasterio.open(raster_path) as src:
        no_data_value = src.nodata
        geoms = [mapping(shape) for shape in shape_file.geometry]

        mask_no_data_value = None
        mask_src = None

        if use_mask and mask_path:
            mask_src = rasterio.open(mask_path)
            mask_no_data_value = mask_src.nodata

        for index, geom in enumerate(geoms):
            geom_mask, geom_transform = mask(
                src, [geom], crop=True, all_touched=all_touched)
            geom_mask = geom_mask.astype('float32')
            geom_mask[geom_mask == no_data_value] = np.nan

            if invalid_values:
                for invalid_value in invalid_values:
                    geom_mask[geom_mask == invalid_value] = np.nan

            if use_mask and mask_path and mask_src:
                crop_mask, _ = mask(
                    mask_src, [geom], crop=True, all_touched=all_touched)

                if calculation_mode == "weighted_mean":
                    valid_mask = (crop_mask[0] != mask_no_data_value)
                    valid_data = geom_mask[0][valid_mask]
                    valid_weights = crop_mask[0][valid_mask]
                    mean_value = np.nansum(valid_data * valid_weights) / np.nansum(
                        valid_weights) if np.nansum(valid_weights) > 0 else np.nan

                elif calculation_mode == "filtered_mean":
                    valid_mask = (crop_mask[0] != mask_no_data_value)
                    masked_data = geom_mask[0][valid_mask]
                    mean_value = np.nanmean(masked_data) if np.nansum(
                        masked_data) > 0 else np.nan

            elif calculation_mode == "overall_mean" or not use_mask:
                mean_value = np.nanmean(geom_mask)

            new_row = {col: shape_file.iloc[index][col]
                       for col in shape_file.columns if col != 'geometry'}
            new_row.update({'date': date_str, predictor_name: mean_value})
            aggregated_data.append(new_row)

        if mask_src:
            mask_src.close()

    return aggregated_data