import inspect
from functools import wraps
from pathlib import Path
import geopandas as gpd
import numpy as np
import pyogrio
import rasterio
from rasterio import RasterioIOError
from .rasterproperties import RasterProperties
from .rasters import RasterFile, RasterMemory, TemporalRaster
from .utils import (
define_extent_from_vct,
generate_vct_mask_from_raster_mask,
load_raster,
vct_to_rst_value,
write_arr_as_rst,
)
from .valid import PywatemsedemInputError, valid_exists
from .vectors import VectorFile, VectorMemory
[docs]
def valid_mask_factory(func):
"""Check valid mask inputted when using raster or vectofactory"""
@wraps(func)
def wrapper(self, *args, **kwargs):
"""Wrapper fun"""
if self.mask is None:
msg = (
f"First create a mask with " f"{Factory.create_mask.__name__}-function"
)
raise PywatemsedemInputError(msg)
return func(self, *args, **kwargs)
return wrapper
[docs]
class Factory:
"""Factory class
Used to enable functions to generate vectors and rasters.
Notes
-----
By default a rasterproperties instance is made in the initialisation
See :func:`pywatemsedem.geo.factory.create_mask`-function. This can be toggled of by
setting :const:`pywatemsedem.geo.factory.Factory.create_rasterproperties` to False
"""
def __init__(self, resolution, epsg_code, nodata, resmap, bounds=None):
"""See class docs
Parameters
---------
resolution:int
Model spatial resolution
epsg_code: int
EPSG code should be a numeric value, see https://epsg.io/.
nodata: float
See :class:`pywatemsedem.geo.rasterproperties.RasterProperties`
resmap: pathlib.Path | str
Folder path to write factory files to
bounds: list, default None
Raster boundaries which you wish for model.
See :class:`pywatemsedem.geo.rasterproperties.RasterProperties`
"""
self._resolution = resolution
self._epsg_code = epsg_code
self._nodata = nodata
self._bounds = bounds
self._rp = None
self._mask = None
self._bounds = None
self.resmap = Path(resmap) / "factory"
if not self.resmap.exists():
self.resmap.mkdir(exist_ok=True)
self.vectorfile_mask = self.resmap / "mask.shp"
self.rasterfile_mask = self.resmap / "mask.tif"
self.create_rasterproperties = True
@property
def rp(self):
"""RasterProperties. See
:class:`pywatemsedem.geo.rasterproperties.RasterProperties`"""
return self._rp
@rp.setter
def rp(self, rasterproperties):
"""RasterProperties. See
:class:`pywatemsedem.geo.rasterproperties.RasterProperties`"""
self._rp = rasterproperties
@property
def mask(self):
"""AbstractRaster mask"""
return self._mask
@property
def vct_mask(self):
"""AbstractVector mask, See :class:`pywatemsedem.geo.vectors.AbstractVector`"""
return self._vct_mask
@mask.setter
def mask(self, mask):
"""Set mask with an numpy array
Parameters
----------
mask : str or pathlib.Path
Either a valid raster of vector file
"""
self.create_mask(mask)
[docs]
def create_mask(self, mask):
"""Create mask based on a mask template (raster or vector)
Parameters
----
mask: pathlib.Path | str
File path to mask vector or raster file
Notes
-----
If :const:`pywatemsedem.geo.factory.Factory.create_rasterproperties` is set
to False, one needs to self-define a RasterProperties instance.
"""
valid_exists(mask, None)
create_mask_vector = False
create_mask_raster = False
if not self.create_rasterproperties:
if self.rp is None:
msg = (
f"Define a 'RasterProperties'-instance for the "
f"'{self.__class__.__name__}'-instance "
f"({self.__class__.__name__}.rp = RasterProperties(...)) or set "
f"'{self.__class__.__name__}.create_rasterproperties' to True."
)
raise IOError(msg)
try:
rasterio.open(mask)
create_mask_vector = True
except RasterioIOError:
try:
pyogrio.read_info(mask)
except pyogrio.errors.DataSourceError:
msg = f"Input mask '{mask}' should be raster or vector polygon file."
raise IOError(msg)
else:
create_mask_raster = True
if create_mask_raster:
if self.create_rasterproperties:
self.rp = define_extent_from_vct(
mask,
self._resolution,
self._nodata,
self._epsg_code,
self._bounds,
)
self._vct_mask = VectorFile(mask)
if mask != self.vectorfile_mask:
self._vct_mask._geodata.to_file(self.vectorfile_mask)
vct_to_rst_value(
mask,
self.rasterfile_mask,
self.rp.gdal_profile,
dtype="integer",
gdal=False,
)
arr, profile = load_raster(self.rasterfile_mask)
arr = arr.astype("int16")
# correct no data value if necessary
if profile["nodata"] != self.rp.nodata:
arr[arr == profile["nodata"]] = self.rp.nodata
write_arr_as_rst(
arr, self.rasterfile_mask, np.int16, self.rp.rasterio_profile
)
if create_mask_vector:
arr, rp = load_raster(mask)
if self.create_rasterproperties:
self.rp = RasterProperties.from_rasterio(rp, epsg=self._epsg_code)
generate_vct_mask_from_raster_mask(
mask, self.vectorfile_mask, self._resolution
)
self._vct_mask = VectorFile(self.vectorfile_mask)
self._vct_mask._geodata = self._vct_mask._geodata.set_crs(self.rp.epsg)
self._mask = RasterMemory(arr, self.rp)
self._mask.arr_bin = np.where(
self._mask.arr == self.rp.nodata, 0, self._mask.arr
)
return True
[docs]
@valid_mask_factory
def raster_factory(
self, raster_input, flag_clip=True, flag_mask=True, allow_nodata_array=False
):
"""Raster factory to load rasters in memory
Parameters
----------
raster_input: str, pathlib.Path or numpy.ndarray
Input raster file or numpy array
flag_clip: bool, default True
Clip raster (True)
flag_mask: bool, default True
Mask raster (True)
allow_nodata_array: default False
Allow the returned array to only contain nodata-values,
see :func:`pywatemsedem.geo.rasters.AbstractRaster.mask`.
Returns
-------
raster: pywatemsedem.geo.rasters.AbstractRaster
See :class:`pywatemsedem.geo.rasters.AbstractRaster`
"""
arr_mask = self.mask.arr_bin if flag_mask else None
if isinstance(raster_input, str):
raster_input = Path(raster_input)
if isinstance(raster_input, Path):
try:
rasterio.open(raster_input)
except IOError:
msg = (
f"Input raster file '{raster_input}' should be a valid raster "
f"file (e.g. IDRISI raster, geotiff, SAGA-GRID, ..)"
)
raise IOError(msg)
rp = self.rp if flag_clip else None
raster = RasterFile(
raster_input, rp, arr_mask, allow_nodata_array=allow_nodata_array
)
elif isinstance(raster_input, np.ndarray):
if raster_input.ndim == 2:
raster = RasterMemory(
raster_input,
self.rp,
arr_mask,
allow_nodata_array=allow_nodata_array,
)
else:
raster = TemporalRaster(raster_input, self.rp, arr_mask)
else:
m = inspect.currentframe()
calframe = inspect.getouterframes(m, 2)
[cal.function for cal in calframe]
msg = (
f"Input '{raster_input}' should be a numpy array or raster file, "
f"current type is '{type(raster_input)}'"
)
raise IOError(msg)
return raster
[docs]
@valid_mask_factory
def vector_factory(
self, vector_input, geometry_type, allow_empty=False, flag_clip=True
):
"""Vector factory to load vectors in memory
Parameters
----------
vector_input: str, pathlib.Path or geopandas.GeoDataFrame
Input vector file or geopandas dataframe
mask: bool, default True
Mask vector (True), nodata value will be that one of
`pywatemsedem.geo.factory.Factory.rp`.
allow_empty: bool, default False
Allow vector to be empty, see
:class:`pywatemsedem.geo.vectors.AbstractVector`
flag_clip: bool, default True
Clip vector to mask
Returns
-------
vector: pywatemsedem.geo.rasters.AbstractVector
See :class:`pywatemsedem.geo.vectors.AbstractVector`
"""
if isinstance(vector_input, str):
vector_input = Path(vector_input)
if isinstance(vector_input, Path):
try:
pyogrio.read_info(vector_input)
except pyogrio.errors.DataSourceError:
msg = (
f"Input vector file '{vector_input}' should be a valid "
f"vector file (e.g. ESRI shape file)."
)
raise IOError(msg)
if flag_clip:
clip_mask = self.vct_mask.file_path
else:
clip_mask = None
vector = VectorFile(
vector_input,
geometry_type,
vct_clip=clip_mask,
allow_empty=allow_empty,
)
elif isinstance(vector_input, gpd.GeoDataFrame):
if flag_clip:
clip_mask = self.vct_mask.geodata
else:
clip_mask = None
vector = VectorMemory(
vector_input,
geometry_type,
clip_mask=clip_mask,
allow_empty=allow_empty,
)
else:
msg = (
f"Input '{vector_input}' should be a geopandas GeoDataFrame or vector"
" file."
)
raise IOError(msg)
return vector