Source code for pywatemsedem.geo.factory

import inspect
from functools import wraps
from pathlib import Path

import geopandas as gpd
import numpy as np
import pyogrio
import rasterio
from rasterio import RasterioIOError

from .rasterproperties import RasterProperties
from .rasters import RasterFile, RasterMemory, TemporalRaster
from .utils import (
    define_extent_from_vct,
    generate_vct_mask_from_raster_mask,
    load_raster,
    vct_to_rst_value,
    write_arr_as_rst,
)
from .valid import PywatemsedemInputError, valid_exists
from .vectors import VectorFile, VectorMemory


[docs] def valid_mask_factory(func): """Check valid mask inputted when using raster or vectofactory""" @wraps(func) def wrapper(self, *args, **kwargs): """Wrapper fun""" if self.mask is None: msg = ( f"First create a mask with " f"{Factory.create_mask.__name__}-function" ) raise PywatemsedemInputError(msg) return func(self, *args, **kwargs) return wrapper
[docs] class Factory: """Factory class Used to enable functions to generate vectors and rasters. Notes ----- By default a rasterproperties instance is made in the initialisation See :func:`pywatemsedem.geo.factory.create_mask`-function. This can be toggled of by setting :const:`pywatemsedem.geo.factory.Factory.create_rasterproperties` to False """ def __init__(self, resolution, epsg_code, nodata, resmap, bounds=None): """See class docs Parameters --------- resolution:int Model spatial resolution epsg_code: int EPSG code should be a numeric value, see https://epsg.io/. nodata: float See :class:`pywatemsedem.geo.rasterproperties.RasterProperties` resmap: pathlib.Path | str Folder path to write factory files to bounds: list, default None Raster boundaries which you wish for model. See :class:`pywatemsedem.geo.rasterproperties.RasterProperties` """ self._resolution = resolution self._epsg_code = epsg_code self._nodata = nodata self._bounds = bounds self._rp = None self._mask = None self._bounds = None self.resmap = Path(resmap) / "factory" if not self.resmap.exists(): self.resmap.mkdir(exist_ok=True) self.vectorfile_mask = self.resmap / "mask.shp" self.rasterfile_mask = self.resmap / "mask.tif" self.create_rasterproperties = True @property def rp(self): """RasterProperties. See :class:`pywatemsedem.geo.rasterproperties.RasterProperties`""" return self._rp @rp.setter def rp(self, rasterproperties): """RasterProperties. See :class:`pywatemsedem.geo.rasterproperties.RasterProperties`""" self._rp = rasterproperties @property def mask(self): """AbstractRaster mask""" return self._mask @property def vct_mask(self): """AbstractVector mask, See :class:`pywatemsedem.geo.vectors.AbstractVector`""" return self._vct_mask @mask.setter def mask(self, mask): """Set mask with an numpy array Parameters ---------- mask : str or pathlib.Path Either a valid raster of vector file """ self.create_mask(mask)
[docs] def create_mask(self, mask): """Create mask based on a mask template (raster or vector) Parameters ---- mask: pathlib.Path | str File path to mask vector or raster file Notes ----- If :const:`pywatemsedem.geo.factory.Factory.create_rasterproperties` is set to False, one needs to self-define a RasterProperties instance. """ valid_exists(mask, None) create_mask_vector = False create_mask_raster = False if not self.create_rasterproperties: if self.rp is None: msg = ( f"Define a 'RasterProperties'-instance for the " f"'{self.__class__.__name__}'-instance " f"({self.__class__.__name__}.rp = RasterProperties(...)) or set " f"'{self.__class__.__name__}.create_rasterproperties' to True." ) raise IOError(msg) try: rasterio.open(mask) create_mask_vector = True except RasterioIOError: try: pyogrio.read_info(mask) except pyogrio.errors.DataSourceError: msg = f"Input mask '{mask}' should be raster or vector polygon file." raise IOError(msg) else: create_mask_raster = True if create_mask_raster: if self.create_rasterproperties: self.rp = define_extent_from_vct( mask, self._resolution, self._nodata, self._epsg_code, self._bounds, ) self._vct_mask = VectorFile(mask) if mask != self.vectorfile_mask: self._vct_mask._geodata.to_file(self.vectorfile_mask) vct_to_rst_value( mask, self.rasterfile_mask, self.rp.gdal_profile, dtype="integer", gdal=False, ) arr, profile = load_raster(self.rasterfile_mask) arr = arr.astype("int16") # correct no data value if necessary if profile["nodata"] != self.rp.nodata: arr[arr == profile["nodata"]] = self.rp.nodata write_arr_as_rst( arr, self.rasterfile_mask, np.int16, self.rp.rasterio_profile ) if create_mask_vector: arr, rp = load_raster(mask) if self.create_rasterproperties: self.rp = RasterProperties.from_rasterio(rp, epsg=self._epsg_code) generate_vct_mask_from_raster_mask( mask, self.vectorfile_mask, self._resolution ) self._vct_mask = VectorFile(self.vectorfile_mask) self._vct_mask._geodata = self._vct_mask._geodata.set_crs(self.rp.epsg) self._mask = RasterMemory(arr, self.rp) self._mask.arr_bin = np.where( self._mask.arr == self.rp.nodata, 0, self._mask.arr ) return True
[docs] @valid_mask_factory def raster_factory( self, raster_input, flag_clip=True, flag_mask=True, allow_nodata_array=False ): """Raster factory to load rasters in memory Parameters ---------- raster_input: str, pathlib.Path or numpy.ndarray Input raster file or numpy array flag_clip: bool, default True Clip raster (True) flag_mask: bool, default True Mask raster (True) allow_nodata_array: default False Allow the returned array to only contain nodata-values, see :func:`pywatemsedem.geo.rasters.AbstractRaster.mask`. Returns ------- raster: pywatemsedem.geo.rasters.AbstractRaster See :class:`pywatemsedem.geo.rasters.AbstractRaster` """ arr_mask = self.mask.arr_bin if flag_mask else None if isinstance(raster_input, str): raster_input = Path(raster_input) if isinstance(raster_input, Path): try: rasterio.open(raster_input) except IOError: msg = ( f"Input raster file '{raster_input}' should be a valid raster " f"file (e.g. IDRISI raster, geotiff, SAGA-GRID, ..)" ) raise IOError(msg) rp = self.rp if flag_clip else None raster = RasterFile( raster_input, rp, arr_mask, allow_nodata_array=allow_nodata_array ) elif isinstance(raster_input, np.ndarray): if raster_input.ndim == 2: raster = RasterMemory( raster_input, self.rp, arr_mask, allow_nodata_array=allow_nodata_array, ) else: raster = TemporalRaster(raster_input, self.rp, arr_mask) else: m = inspect.currentframe() calframe = inspect.getouterframes(m, 2) [cal.function for cal in calframe] msg = ( f"Input '{raster_input}' should be a numpy array or raster file, " f"current type is '{type(raster_input)}'" ) raise IOError(msg) return raster
[docs] @valid_mask_factory def vector_factory( self, vector_input, geometry_type, allow_empty=False, flag_clip=True ): """Vector factory to load vectors in memory Parameters ---------- vector_input: str, pathlib.Path or geopandas.GeoDataFrame Input vector file or geopandas dataframe mask: bool, default True Mask vector (True), nodata value will be that one of `pywatemsedem.geo.factory.Factory.rp`. allow_empty: bool, default False Allow vector to be empty, see :class:`pywatemsedem.geo.vectors.AbstractVector` flag_clip: bool, default True Clip vector to mask Returns ------- vector: pywatemsedem.geo.rasters.AbstractVector See :class:`pywatemsedem.geo.vectors.AbstractVector` """ if isinstance(vector_input, str): vector_input = Path(vector_input) if isinstance(vector_input, Path): try: pyogrio.read_info(vector_input) except pyogrio.errors.DataSourceError: msg = ( f"Input vector file '{vector_input}' should be a valid " f"vector file (e.g. ESRI shape file)." ) raise IOError(msg) if flag_clip: clip_mask = self.vct_mask.file_path else: clip_mask = None vector = VectorFile( vector_input, geometry_type, vct_clip=clip_mask, allow_empty=allow_empty, ) elif isinstance(vector_input, gpd.GeoDataFrame): if flag_clip: clip_mask = self.vct_mask.geodata else: clip_mask = None vector = VectorMemory( vector_input, geometry_type, clip_mask=clip_mask, allow_empty=allow_empty, ) else: msg = ( f"Input '{vector_input}' should be a geopandas GeoDataFrame or vector" " file." ) raise IOError(msg) return vector