from __future__ import division
import logging
import rasterio
import os
import copy
from collections import OrderedDict
from pathlib import Path
import pandas as pd
import numpy as np
import numpy.ma as ma
from niche_vlaanderen.vegetation import Vegetation
from niche_vlaanderen.spatial_context import SpatialContext
from niche_vlaanderen.codetables import (validate_tables_flooding,
check_codes_used,
package_resource)
_allowed_input = {
"depth": "uint8",
}
class FloodingException(Exception):
""""""
[docs]
class Flooding(object):
"""Predict the vegetation response to (frequent) flooding
A Flooding object can be used to predict the response of vegetation to
(frequent) flooding.
The code tables used can be overwritten when initializing the object.
Optionally also a name can be given which is used in plots and output
files.
"""
nodata = -99 # Keep int8 with nodata -99 to be backward compatible for 'potential'
dtype = "int8"
def __init__(
self,
depth=None,
duration=None,
frequency=None,
lnk_potential=None,
potential=None,
name="",
):
"""Create a Flooding class
Parameters
----------
depth : str, Optional
Path to the depth system table to overwrite the default.
duration : str, Optional
Path to the duration system table to overwrite the default.
frequency : str, Optional
Path to the frequency system table to overwrite the default.
lnk_potential : str, Optional
Path to the lnk_potential system table to overwrite the default.
potential : str, Optional
Path to the potential system table to overwrite the default.
name : str, Optional
Name of the model used for plotting and output files.
"""
self._ct = dict()
self._veg = dict()
self._context = None
self.options = dict()
self._log = logging.getLogger("niche_vlaanderen")
for i in ["depth", "duration", "frequency", "lnk_potential", "potential"]:
if locals()[i] is None:
ct = package_resource(["system_tables", "flooding"],
f"{i}.csv")
else:
ct = locals()[i]
self._ct[i] = pd.read_csv(ct)
self.name = name
inner = all(v is None for v in self.__init__.__code__.co_varnames[1:])
validate_tables_flooding(inner=inner, **self._ct)
# Set to true when the model is a combined niche - flooding model
self._combined = False
@property
def vegetation_calculated(self):
return len(self._veg) > 0
@property
def table(self):
"""Dataframe containing the potential area (ha) per vegetation type"""
if not self.vegetation_calculated:
raise FloodingException(
"Error: You must run niche prior to requesting the " "result table"
)
td = list()
labels = dict(self._ct["potential"].set_index("potential")["description"])
if not self._combined:
del labels[-1]
labels[self.nodata] = "no data"
for i in self._veg:
vi = pd.Series(self._veg[i].flatten())
rec = vi.value_counts() * self._context.cell_area / 10000
for a in rec.index:
td.append((i, a, labels[a], rec[a]))
df = pd.DataFrame(
td, columns=["vegetation", "presence_code", "presence", "area_ha"]
)
return df
def _calculate(self, depth, frequency, duration, period):
"""
Low level calculation of a flooding object.
Uses a numpy array for depth rather than a grid file (in calculate)
"""
orig_shape = depth.shape
depth = depth.flatten()
nodata = depth == 255 # depth is uint8
check_codes_used("depth", depth, self._ct["depth"]["depth"])
check_codes_used("frequency", frequency, self._ct["frequency"]["frequency"])
check_codes_used("duration", duration, self._ct["duration"]["duration"])
check_codes_used("period", period, ["summer", "winter"])
for veg_code, subtable_veg in self._ct["lnk_potential"].groupby("veg_code"):
subtable_veg = subtable_veg.reset_index()
# by default we give code 4 (no information/flooding)
# https://github.com/inbo/niche_vlaanderen/issues/87
vegi = np.full(depth.shape, 4, dtype=self.dtype)
vegi[nodata] = self.nodata
groupby_cols = ["period", "frequency", "duration"]
for index, subtable in subtable_veg.groupby(groupby_cols):
if (period, frequency, duration) == index:
subtable.reset_index()
for row in subtable.itertuples():
veg = vegi
veg[row.depth == depth] = row.potential
vegi = np.maximum(veg, vegi)
self._veg[veg_code] = vegi.reshape(orig_shape).astype(self.dtype)
[docs]
def read_depth_to_grid(self, file_name):
"""Read depth file using rasterio to numpy array and set extent
Parameters
----------
file_name : string | Pathlib.Path
Path to the file to be read
"""
with rasterio.open(file_name, "r") as dst:
band = dst.read(1, masked=True)
self._context = SpatialContext(dst)
# Convert to depth data type and no-data value
band = band.filled(fill_value=255).astype(_allowed_input["depth"])
return band
[docs]
def calculate(self, depth_file_path, frequency, period, duration):
"""Calculate a floodplain object
Parameters
----------
depth_file_path: pahtlib.Path | str (file_path)
The filename containing a classified grid with inundation dephts.
The classes used must correspond to the codes in the
depth.csv_ code table.
frequency: str (frequency code)
The frequency with which flooding occurs, eg T2, T10, T25 or T50. Valid
values are given in the frequency.csv_ code table.
period: str, winter | summer
Period in which the flooding occurs. Must be either "summer" or
"winter".
duration: int (duration code)
Period with which the flooding occurs (1, 2...). Check the first column
of the duration.csv_ file for the available codes.
"""
depth = self.read_depth_to_grid(depth_file_path)
self._calculate(depth, frequency, duration, period)
self.options = {"frequency": frequency, "duration": duration, "period": period}
def plot(self, key, ax=None):
try:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import Normalize
except (ImportError, RuntimeError): # pragma: no cover
msg = "Could not import matplotlib\n"
msg += "matplotlib required for plotting functions"
raise ImportError(msg)
if key not in self._veg.keys():
msg = "vegetation type {} not modeled".format(key)
raise FloodingException(msg)
if ax is None:
fig, ax = plt.subplots()
((a, b), (c, d)) = self._context.extent
mpl_extent = (a, c, d, b)
veg = ma.masked_equal(self._veg[key], self.nodata)
im = plt.imshow(veg, extent=mpl_extent, norm=Normalize(-1, 4))
options = self.options.copy()
options["duration"] = (
"< 14 days" if self.options["duration"] == 1 else "> 14 days"
)
ax.set_title("{} ({})".format(key, options))
labels = (
self._ct["potential"]
.set_index("potential")["description"]
.to_dict(into=OrderedDict)
)
colors = [im.cmap(i / (len(labels) - 1)) for (i, value) in enumerate(labels)]
patches = [
mpatches.Patch(color=colors[i], label=labels[code])
for (i, code) in enumerate(labels)
if code > -1 or self._combined
]
plt.legend(handles=patches, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.0)
return ax
[docs]
def write(self, folder, overwrite_files=False):
"""Writes the floodplain grids to grid files.
The differences are coded using the values specified in the
potential.csv table.
Parameters
----------
folder: pathlib.Path
Path to which the output files will be written.
overwrite_files: bool, default False
Overwrite files when saving.
Note writing will fail if any of the files to be written already
exists.
"""
if len(self._veg) == 0:
raise FloodingException(
"A valid run must be done before writing the output."
)
Path(folder).mkdir(parents=True, exist_ok=True)
params = dict(
driver="GTiff",
height=self._context.height,
width=self._context.width,
crs=self._context.crs,
transform=self._context.transform,
count=1,
dtype=self.dtype,
nodata=self.nodata,
compress="DEFLATE",
)
self._files_written = dict()
name = ""
if self.name != "":
name = self.name + "-"
files = {
"summary": folder + "/" + name + "summary.csv",
"log": "{}/{}log.txt".format(folder, name),
}
for vi in self._veg:
filename = "{}/{}F{:02d}-{}-P{}-{}.tif".format(
folder,
name,
vi,
self.options["frequency"],
self.options["duration"],
self.options["period"],
)
files[vi] = filename
for key in files:
if os.path.exists(files[key]):
if overwrite_files:
self._log.warning(
"Warning: file {} already exists".format(files[key])
)
else:
raise FloodingException("File {} already exists".format(files[key]))
self.table.to_csv(files["summary"], index=False)
for vi in self._veg:
path = files[vi]
with rasterio.open(path, "w", **params) as dst:
dst.write(self._veg[vi], 1)
self._files_written[filename] = os.path.normpath(path)
[docs]
def combine(self, niche_result):
"""Combines a Flooding model with a Niche model
Both models must be run prior to combining them. The niche model will
act as a "mask" making areas where Niche predicts a vegetation type
to be not present are set to "not combinable".
Parameters
----------
niche_result: Niche
Niche model that must be run prior to running combine.
Returns
-------
combined: Flooding
"""
# check niche model has been run
if not niche_result.vegetation_calculated:
raise FloodingException(
"Niche model must be run prior to running this module."
)
if not self.vegetation_calculated:
raise FloodingException(
"Floodplain model must be run prior to running this module."
)
if self._context != niche_result._context:
raise FloodingException(
"Niche model has a different spatial context:\n"
+ str(self._context)
+ str(niche_result._context)
)
new = copy.copy(self)
new._veg = self._veg.copy()
for vi in new._veg:
nodata = ((niche_result._vegetation[vi] == Vegetation.nodata) |
(new._veg[vi] == self.nodata))
new._veg[vi] = niche_result._vegetation[vi] * new._veg[vi]
new._veg[vi][niche_result._vegetation[vi] == 0] = -1
new._veg[vi][nodata] = self.nodata
new._combined = True
return new