import numpy as np
import pandas as pd
# import for backward compatibility
from rfactor.rain import ( # noqa
_check_path,
load_rain_file,
load_rain_file_flanders,
load_rain_file_matlab_legacy,
load_rain_folder,
)
def _days_since_start_year(series):
"""Translate datetime series to days since start of the year
Parameters
----------
series : pandas.Series
Series with Datetime values. All datetime values should be of the same year.
Returns
-------
days_since_start : pandas.Series
Days since the start of the year as a float value.
Notes
-----
Support function to provide integration with original Matlab implementation. Output
is different from Pandas datetime attribute `dayofyear` as it includes time of the
day as decimal value.
"""
current_year = series.dt.year.unique()
if not len(current_year) == 1:
raise Exception("Input data should all be in the same year.")
days_since_start = (
(series - pd.Timestamp(f"{current_year[0]}-01-01")).dt.total_seconds()
/ 60.0
/ 1440.0
)
return days_since_start
[docs]
def compute_diagnostics(rain):
"""Compute diagnostics for input rainfall.
This function computes coverage (per year, station) and missing rainfall for each
month (per year, station).
Parameters
----------
rain: pandas.DataFrame
DataFrame with rainfall time series. Contains at least the following columns:
- *rain_mm* (float): Rain in mm
- *datetime* (pandas.Timestamp): Time stamp
- *station* (str): station name
- *year* (int): year of the measurement
- *tag* (str): tag identifier, formatted as ``STATION_YEAR``
Returns
-------
diagnostics: pandas.DataFrame
Diagnostics per station, year with coverage and identifier for no-rain per
month. Computed based on non-zero rainfall timeseries.
- *station* (str): station identifier.
- *year* (int): year.
- *coverage* (float): percentage coverage non-zero timeseries (see Notes).
Added with per month (id's 1 to 12):
- *months* (int): 1: no rain observed in month, 0: rain observed.
Notes
-----
The coverage is computed as:
.. math::
C = 100*[1-\\frac{\\text{number of NULL-data}}
{\\text{length of non-zero timeseries}}]
"""
# compute coverage
diagnostics = rain.groupby([rain["datetime"].dt.year, "station"]).aggregate(
{"rain_mm": lambda x: 1 - np.sum(np.isnan(x)) / len(x)}
)
diagnostics = diagnostics.rename(columns={"rain_mm": "coverage"})
# no-rain for months
df = rain.groupby(
[rain["datetime"].dt.year, rain["datetime"].dt.month, "station"]
).aggregate({"rain_mm": np.sum})
df.index.names = ["datetime", "month", "station"]
df["norain"] = 0
df.loc[df["rain_mm"] == 0, "norain"] = 1
df = df.pivot_table(
columns=["month"], index=["station", "datetime"], values=["norain"]
)
df = df["norain"].reset_index()
# check if months are in df reported
for month in range(1, 13, 1):
if month not in df.columns:
df[month] = 1
# couple
diagnostics = diagnostics.merge(df, how="left", on=["station", "datetime"])
diagnostics = diagnostics.rename(columns={"datetime": "year"})
return diagnostics
[docs]
def write_erosivity_data(df, folder_path):
"""Write output erosivity to (legacy Matlab format) in folder.
Written data are split-up for each year and station
(file name format: ``SOURCE_STATION_YEAR.txt``) and does not contain any headers.
The columns (no header!) in the written text files represent the following:
- *days_since* (float): Days since the start of the year.
- *erosivity_cum* (float): Cumulative erosivity over events.
- *all_event_rain_cum* (float): Cumulative rain over events.
Parameters
----------
df : pandas.DataFrame
DataFrame with rfactor/erosivity time series. Can contain multiple columns,
but should have at least the following:
- *datetime* (pandas.Timestamp): Time stamp
- *station* (str): Station identifier
- *erosivity_cum* (float): Cumulative erosivity over events
- *all_event_rain_cum* (float): Cumulative rain over events
folder_path : pathlib.Path
Folder path to save data according to legacy Matlab format,
see :func:`rfactor.process.load_rain_file`.
"""
_check_path(folder_path)
folder_path.mkdir(exist_ok=True, parents=True)
for (station, year), df_group in df.groupby(["station", df["datetime"].dt.year]):
df_group = df_group.assign(
days_since=_days_since_start_year(df_group["datetime"])
)
formats = {
"days_since": "{:.3f}",
"erosivity_cum": "{:.2f}",
"all_event_rain_cum": "{:.1f}",
}
for column, fformat in formats.items():
df_group[column] = df_group[column].map(lambda x: fformat.format(x))
df_group[["days_since", "erosivity_cum", "all_event_rain_cum"]].to_csv(
folder_path / f"{station}_{year}.csv", header=None, index=None, sep=" "
)
[docs]
def get_rfactor_station_year(erosivity, stations=None, years=None):
"""Get R-factor at end of every year for each station from cumulative erosivity.
Parameters
----------
erosivity: pandas.DataFrame
See :func:`rfactor.rfactor.compute_erosivity`
stations: list
List of stations to extract R for.
years: list
List of years to extract R for.
Returns
-------
erosivity: pandas.DataFrame
Updated with:
- *year* (int): year
- *station* (str): station
- *erosivity_cum* (float): cumulative erosivity at
end of *year* and at *station*.
"""
if stations is not None:
unexisting_stations = set(stations).difference(
set(erosivity["station"].unique())
)
if unexisting_stations:
raise KeyError(
f"Station name(s): {unexisting_stations} not part of data set."
)
erosivity = erosivity.loc[erosivity["station"].isin(stations)]
if years is not None:
unexisting_years = set(years).difference(set(erosivity["year"].unique()))
if unexisting_years:
raise KeyError(f"Year(s): {unexisting_years} not part of data set.")
erosivity = erosivity.loc[erosivity["year"].isin(years)]
erosivity = erosivity.groupby(["year", "station"]).aggregate("erosivity_cum").last()
erosivity = erosivity.reset_index().sort_values(["station", "year"])
erosivity.index = range(len(erosivity))
return erosivity
[docs]
def compute_rainfall_statistics(df_rainfall, df_station_metadata=None):
"""Compute general statistics for rainfall timeseries.
Statistics (number of records, min, max, median and years data) are
computed for each measurement station
Parameters
----------
df_rainfall: pandas.DataFrame
See :func:`rfactor.process.load_rain_file`
df_station_metadata: pandas.DataFrame
Dataframe holding station metadata. This dataframe has one mandatory
column:
- *station* (str): Name or code of the measurement station
- *x* (float): X-coordinate of measurement station.
- *y* (float): Y-coordinate of measurement station.
Returns
-------
df_statistics: pandas.DataFrame
Apart from the ``station``, ``x``, ``y`` when ``df_station_metadata`` is
provided, the following columns are returned:
- *year* (list): List of the years fror which data is available for the station.
- *records* (int): Total number of records for the station.
- *min* (float): Minimal measured value for the station.
- *median* (float): Median measured value for the station.
- *max* (float): Maximal measured value for the station.
"""
df_rainfall = df_rainfall.sort_values(by="year")
df_statistics = (
df_rainfall[["year", "station", "rain_mm"]]
.groupby("station")
.aggregate(
{
"year": lambda x: sorted(set(x)),
"rain_mm": ["min", "max", "median", "count"],
}
)
).reset_index()
df_statistics.columns = df_statistics.columns.map("".join)
rename_cols = {
"year<lambda>": "year",
"rain_mmamin": "min",
"rain_mmamax": "max",
"rain_mmmin": "min",
"rain_mmmax": "max",
"rain_mmmedian": "median",
"rain_mmcount": "records",
}
df_statistics = df_statistics.rename(columns=rename_cols)
if df_station_metadata is not None:
df_statistics = df_statistics.merge(
df_station_metadata, on="station", how="left"
)
df_statistics = df_statistics[
[
"year",
"station",
"x",
"y",
"records",
"min",
"median",
"max",
]
]
else:
df_statistics = df_statistics[["year", "records", "min", "median", "max"]]
return df_statistics