Source code for rfactor.process

import re
from pathlib import Path

import numpy as np
import pandas as pd
from tqdm import tqdm


[docs] class RainfallFilesIOMsg(str): """Print message a string""" def __repr__(self): return str(self)
def _days_since_start_year(series): """Translate datetime series to days since start of the year Parameters ---------- series : pandas.Series Series with Datetime values. All datetime values should be of the same year. Returns ------- days_since_start : pandas.Series Days since the start of the year as a float value. Notes ----- Support function to provide integration with original Matlab implementation. Output is different from Pandas datetime attribute `dayofyear` as it includes time of the day as decimal value. """ current_year = series.dt.year.unique() if not len(current_year) == 1: raise Exception("Input data should all be in the same year.") days_since_start = ( (series - pd.Timestamp(f"{current_year[0]}-01-01")).dt.total_seconds() / 60.0 / 1440.0 ) return days_since_start def _extract_metadata_from_file_path(file_path): """Get metadata from file name Expects to be 'STATION_NAME_YYYY.txt' as format with ``STATION_NAME`` the measurement station and the ``YYYY`` as the year of the measurement. Parameters ---------- file_path : pathlib.Path File path of the file to extract station/year from Returns ------- station: str year : str """ if not re.fullmatch(".*_[0-9]{4}$", file_path.stem): raise ValueError( "Input file_path_format should " "match with 'STATION_NAME_YYYY.txt'" ) station = "_".join(file_path.stem.split("_")[:-1]) year = file_path.stem.split("_")[-1] return station, year def _check_path(file_path): """Provide user feedback on file_path type.""" if not isinstance(file_path, Path): if isinstance(file_path, str): raise TypeError( f"`file_path` should be a `pathlib.Path` object, use " f"`Path({file_path})` to convert string file_path to valid `Path`." ) else: raise TypeError("`file_path` should be a pathlib.Path object")
[docs] def load_rain_file(file_path, load_fun, **kwargs): """Load file format of rainfall data with a given load function Parameters ---------- file_path : pathlib.Path File path with rainfall data. Note that files in the folder should follow the input data format defined in the ``load_fun``. load_fun : Callable Please check the required input/output format for the files of the used load functions. The output of this function must comply with: - *datetime* (datetime64[ns]): timestamp, timezone naive - *station* (object): name of station, must be formatting accoring to a string. - *value* (float): in mm kwargs: Keyword arguments for load_fun Returns ------- rain : pandas.DataFrame DataFrame with rainfall time series. Contains at least the following columns: - *rain_mm* (float): Rain in mm - *datetime* (pandas.Timestamp): Time stamp - *minutes_since* (float): Minutes since start of year. - *station* (str): station name - *year* (int): year of the measurement - *tag* (str): tag identifier, formatted as ``STATION_YEAR`` """ rain = load_fun(file_path, **kwargs) if not isinstance(rain, pd.core.frame.DataFrame): msg = f"Load function must '{load_fun.__name__}' return pandas.DataFrame" raise IOError(RainfallFilesIOMsg(msg)) if not {"datetime", "station", "rain_mm"}.issubset(rain.columns): msg = ( f"Load function '{load_fun.__name__}' must return columns 'datetime', " f"'station' and 'rain_mm'." ) raise IOError(RainfallFilesIOMsg(msg)) if not pd.api.types.is_datetime64_ns_dtype(rain["datetime"]): msg = ( f"Load function '{load_fun.__name__}' must return datetime64[ns] type for " f"column 'datetime'." ) raise IOError(RainfallFilesIOMsg(msg)) if not pd.api.types.is_object_dtype(rain["station"]): msg = ( f"Load function '{load_fun.__name__}' must return object (str) type for " f"column 'station'." ) raise IOError(RainfallFilesIOMsg(msg)) if not pd.api.types.is_float_dtype(rain["rain_mm"]): msg = ( f"Load function '{load_fun.__name__}' must return float for column " f"'rain_mm'." ) raise IOError(RainfallFilesIOMsg(msg)) rain["year"] = rain["datetime"].dt.year rain["tag"] = rain["station"].astype(str) + "_" + rain["year"].astype(str) return rain
[docs] def load_rain_file_flanders(file_path, interpolate=False): """Load any txt file which is formatted in the correct format. The input files are defined by tab delimited files (extension: ``.txt``) that hold rainfall timeseries. The data are split per monitoring station and the file name should be the station identifier. The file should contain two columns: - *Date/Time* - *Value [millimeter]* Parameters ---------- file_path : pathlib.Path File path (comma delimited, .CSV-extension) with rainfall data according to defined format: - *datetime*: ``%d-%m-%Y %H:%M:%S``-format - *Value [millimeter]*: str (containing floats and '---'-identifier) Headers are not necessary for the columns. interpolate: bool Interpolate NaN yes/no Returns ------- rain : pandas.DataFrame DataFrame with rainfall time series. Contains the following columns: - *datetime* (pandas.Timestamp): Time stamp. - *minutes_since* (float): Minutes since start of year. - *station* (str): station identifier. - *rain_mm* (float): Rain in mm. Example ------- 1. Example of a rainfall file: :: 01-01-2019 00:00,"0" 01-01-2019 00:05,"0.03" 01-01-2019 00:10,"0.04" 01-01-2019 00:15,"0" 01-01-2019 00:20,"0" 01-01-2019 00:25,"---" 01-01-2019 00:30,"0" Notes ----- 1. Strings ``---`` in column *Value [millimeter]* -identifiers are converted to NaN-values (np.nan). Note that the values in string should be convertable to float (except ``---``). 2. Current function is not maintained in unit test until further notice. """ df = pd.read_csv(file_path, sep="\t", header=None, names=["datetime", "rain_mm"]) if not {"datetime", "rain_mm"}.issubset(df.columns): msg = ( f"File '{file_path}' should should contain columns 'datetime' and " f"'Value [millimeter]'" ) raise KeyError(msg) df["datetime"] = pd.to_datetime(df["datetime"]) df["start_year"] = pd.to_datetime( [f"01-01-{x} 00:00:00" for x in df["datetime"].dt.year], ) station, year = _extract_metadata_from_file_path(file_path) df["station"] = station nan = ["---", ""] df.loc[df["rain_mm"].isin(nan), "rain_mm"] = np.nan df.loc[df["rain_mm"] < 0, "rain_mm"] = np.nan if interpolate: df["rain_mm"] = df["rain_mm"].interpolate(method="linear") # remove 0 df = df[df["rain_mm"] != 0] # remove NaN df = df[~df["rain_mm"].isna()] df["rain_mm"] = df["rain_mm"].astype(np.float64) return df[["datetime", "station", "rain_mm"]]
[docs] def compute_diagnostics(rain): """Compute diagnostics for input rainfall. This function computes coverage (per year, station) and missing rainfall for each month (per year, station). Parameters ---------- rain: pandas.DataFrame DataFrame with rainfall time series. Contains at least the following columns: - *rain_mm* (float): Rain in mm - *datetime* (pandas.Timestamp): Time stamp - *station* (str): station name - *year* (int): year of the measurement - *tag* (str): tag identifier, formatted as ``STATION_YEAR`` Returns ------- diagnostics: pandas.DataFrame Diagnostics per station, year with coverage and identifier for no-rain per month. Computed based on non-zero rainfall timeseries. - *station* (str): station identifier. - *year* (int): year. - *coverage* (float): percentage coverage non-zero timeseries (see Notes). Added with per month (id's 1 to 12): - *months* (int): 1: no rain observed in month, 0: rain observed. Notes ----- The coverage is computed as: .. math:: C = 100*[1-\\frac{\\text{number of NULL-data}} {\\text{length of non-zero timeseries}}] """ # compute coverage diagnostics = rain.groupby([rain["datetime"].dt.year, "station"]).aggregate( {"rain_mm": lambda x: 1 - np.sum(np.isnan(x)) / len(x)} ) diagnostics = diagnostics.rename(columns={"rain_mm": "coverage"}) # no-rain for months df = rain.groupby( [rain["datetime"].dt.year, rain["datetime"].dt.month, "station"] ).aggregate({"rain_mm": np.sum}) df.index.names = ["datetime", "month", "station"] df["norain"] = 0 df.loc[df["rain_mm"] == 0, "norain"] = 1 df = df.pivot_table( columns=["month"], index=["station", "datetime"], values=["norain"] ) df = df["norain"].reset_index() # check if months are in df reported for month in range(1, 13, 1): if month not in df.columns: df[month] = 1 # couple diagnostics = diagnostics.merge(df, how="left", on=["station", "datetime"]) diagnostics = diagnostics.rename(columns={"datetime": "year"}) return diagnostics
[docs] def load_rain_file_matlab_legacy(file_path): """Load (legacy Matlab) file format of rainfall data of a **single station/year**. The input files are defined by text files (extension: ``.txt``) that hold non-zero rainfall timeseries. The data are split per station and per year with a specific datafile tag (file name format: ``SOURCE_STATION_YEAR.txt``). The data should not contain headers, with the first column defined as 'minutes since the start of the year' and the second as the rainfall depth during the t last minutes (t is the temporal resolution of the timeseries). Parameters ---------- file_path : pathlib.Path File path with rainfall data according to defined format, see notes. Returns ------- rain : pandas.DataFrame DataFrame with rainfall time series. Contains the following columns: - *minutes_since* (int): Minutes since the start of the year - *rain_mm* (float): Rain in mm - *datetime* (pandas.Timestamp): Time stamp - *station* (str): station name Example ------- 1. Example of a rainfall file: :: 9390 1.00 \n 9470 0.20 \n 9480 0.50 \n 10770 0.10 \n ... ... """ _check_path(file_path) if file_path.is_dir(): raise ValueError( "`file_path` need to be the path " "to a file instead of a directory" ) station, year = _extract_metadata_from_file_path(file_path) rain = pd.read_csv( file_path, delimiter=" ", header=None, names=["minutes_since", "rain_mm"] ) if np.sum(rain["minutes_since"].isnull()) > 0: msg = ( "Timestamp (i.e. minutes from start of year) column contains " "NaN-values. Input should be a (space-delimited) text file with the " "first column being the timestamp from the start of the year (minutes)," " and second the rainfall depth (in mm, non-zero series): \n \n9390 " "1.00\n9470 0.20\n9480 0.50\n... ..." ) raise IOError(RainfallFilesIOMsg(msg)) rain = rain.assign( datetime=pd.Timestamp(f"{year}-01-01") + pd.to_timedelta(pd.to_numeric(rain["minutes_since"]), unit="min") ) rain = rain.assign(station=station) rain return rain[["datetime", "station", "rain_mm"]]
[docs] def load_rain_folder(folder_path, load_fun, **kwargs): """Load all (legacy Matlab format) files of rainfall data in a folder Parameters ---------- folder_path : pathlib.Path Folder path with rainfall data, see also :func:`rfactor.process.load_rain_file`. Folder must contain txt files. load_fun : Callable Please check the required input format for the files in the above listed functions. The (custom) function must output: - *datetime* (datetime64[ns]): timestamp, timezone naive - *station* (object): name of station, must be formatting accoring to a string. - *value* (float): in mm kwargs: Keyword arguments for load_fun Returns ------- rain : pandas.DataFrame See definition in :func:`rfactor.process.load_rain_file` """ _check_path(folder_path) if not folder_path.exists(): msg = f"Input folder '{folder_path}' does not exists." raise FileNotFoundError(msg) if folder_path.is_file(): raise ValueError( "`folder_path` need to be the path " "to a directory instead of a file" ) lst_df = [] files = list(folder_path.glob("*.txt")) if len(files) == 0: msg = f"Input folder '{folder_path}' does not contain any 'txt'-files." raise FileNotFoundError(msg) for file_path in tqdm(files, desc="Processing input files"): df = load_rain_file(file_path, load_fun, **kwargs) lst_df.append(df) all_rain = pd.concat(lst_df) all_rain = all_rain.sort_values(["station", "datetime"]) all_rain.index = range(len(all_rain)) return all_rain
[docs] def write_erosivity_data(df, folder_path): """Write output erosivity to (legacy Matlab format) in folder. Written data are split-up for each year and station (file name format: ``SOURCE_STATION_YEAR.txt``) and does not contain any headers. The columns (no header!) in the written text files represent the following: - *days_since* (float): Days since the start of the year. - *erosivity_cum* (float): Cumulative erosivity over events. - *all_event_rain_cum* (float): Cumulative rain over events. Parameters ---------- df : pandas.DataFrame DataFrame with rfactor/erosivity time series. Can contain multiple columns, but should have at least the following: - *datetime* (pandas.Timestamp): Time stamp - *station* (str): Station identifier - *erosivity_cum* (float): Cumulative erosivity over events - *all_event_rain_cum* (float): Cumulative rain over events folder_path : pathlib.Path Folder path to save data according to legacy Matlab format, see :func:`rfactor.process.load_rain_file`. """ _check_path(folder_path) folder_path.mkdir(exist_ok=True, parents=True) for (station, year), df_group in df.groupby(["station", df["datetime"].dt.year]): df_group = df_group.assign( days_since=_days_since_start_year(df_group["datetime"]) ) formats = { "days_since": "{:.3f}", "erosivity_cum": "{:.2f}", "all_event_rain_cum": "{:.1f}", } for column, fformat in formats.items(): df_group[column] = df_group[column].map(lambda x: fformat.format(x)) df_group[["days_since", "erosivity_cum", "all_event_rain_cum"]].to_csv( folder_path / f"{station}_{year}.csv", header=None, index=None, sep=" " )
[docs] def get_rfactor_station_year(erosivity, stations=None, years=None): """Get R-factor at end of every year for each station from cumulative erosivity. Parameters ---------- erosivity: pandas.DataFrame See :func:`rfactor.rfactor.compute_erosivity` stations: list List of stations to extract R for. years: list List of years to extract R for. Returns ------- erosivity: pandas.DataFrame Updated with: - *year* (int): year - *station* (str): station - *erosivity_cum* (float): cumulative erosivity at end of *year* and at *station*. """ if stations is not None: unexisting_stations = set(stations).difference( set(erosivity["station"].unique()) ) if unexisting_stations: raise KeyError( f"Station name(s): {unexisting_stations} not part of data set." ) erosivity = erosivity.loc[erosivity["station"].isin(stations)] if years is not None: unexisting_years = set(years).difference(set(erosivity["year"].unique())) if unexisting_years: raise KeyError(f"Year(s): {unexisting_years} not part of data set.") erosivity = erosivity.loc[erosivity["year"].isin(years)] erosivity = erosivity.groupby(["year", "station"]).aggregate("erosivity_cum").last() erosivity = erosivity.reset_index().sort_values(["station", "year"]) erosivity.index = range(len(erosivity)) return erosivity
[docs] def compute_rainfall_statistics(df_rainfall, df_station_metadata=None): """Compute general statistics for rainfall timeseries. Statistics (number of records, min, max, median and years data) are computed for each measurement station Parameters ---------- df_rainfall: pandas.DataFrame See :func:`rfactor.process.load_rain_file` df_station_metadata: pandas.DataFrame Dataframe holding station metadata. This dataframe has one mandatory column: - *station* (str): Name or code of the measurement station - *x* (float): X-coordinate of measurement station. - *y* (float): Y-coordinate of measurement station. Returns ------- df_statistics: pandas.DataFrame Apart from the ``station``, ``x``, ``y`` when ``df_station_metadata`` is provided, the following columns are returned: - *year* (list): List of the years fror which data is available for the station. - *records* (int): Total number of records for the station. - *min* (float): Minimal measured value for the station. - *median* (float): Median measured value for the station. - *max* (float): Maximal measured value for the station. """ df_rainfall = df_rainfall.sort_values(by="year") df_statistics = ( df_rainfall[["year", "station", "rain_mm"]] .groupby("station") .aggregate( { "year": lambda x: sorted(set(x)), "rain_mm": [np.min, np.max, np.median, "count"], } ) ).reset_index() df_statistics.columns = df_statistics.columns.map("".join) rename_cols = { "year<lambda>": "year", "rain_mmamin": "min", "rain_mmamax": "max", "rain_mmmin": "min", "rain_mmmax": "max", "rain_mmmedian": "median", "rain_mmcount": "records", } df_statistics = df_statistics.rename(columns=rename_cols) if df_station_metadata is not None: df_statistics = df_statistics.merge( df_station_metadata, on="station", how="left" ) df_statistics = df_statistics[ [ "year", "station", "x", "y", "records", "min", "median", "max", ] ] else: df_statistics = df_statistics[["year", "records", "min", "median", "max"]] return df_statistics