Source code for niche_vlaanderen.flooding

from __future__ import division
import logging
import rasterio
import os
import copy
from collections import OrderedDict

import pandas as pd
import numpy as np
import numpy.ma as ma

from niche_vlaanderen.spatial_context import SpatialContext
from niche_vlaanderen.codetables import (validate_tables_flooding,
                                         check_codes_used,
                                         package_resource)


class FloodingException(Exception):
    """"""


[docs] class Flooding(object): """ Predict the vegetation response to (frequent) flooding A Flooding object can be used to predict the response of vegetation to (frequent) flooding. The code tables used can be overwritten when initializing the object. Optionally also a name can be given which is used in plots and output files. """ def __init__( self, depths=None, duration=None, frequency=None, lnk_potential=None, potential=None, name="", ): self._ct = dict() self._veg = dict() self._log = logging.getLogger("niche_vlaanderen") for i in ["depths", "duration", "frequency", "lnk_potential", "potential"]: if locals()[i] is None: ct = package_resource(["system_tables", "flooding"], f"{i}.csv") else: ct = locals()[i] self._ct[i] = pd.read_csv(ct) self.name = name inner = all(v is None for v in self.__init__.__code__.co_varnames[1:]) validate_tables_flooding(inner=inner, **self._ct) # Set to true when the model is a combined niche - flooding model self._combined = False @property def vegetation_calculated(self): return len(self._veg) > 0 @property def table(self): """Dataframe containing the potential area (ha) per vegetation type""" if not self.vegetation_calculated: raise FloodingException( "Error: You must run niche prior to requesting the " "result table" ) td = list() labels = dict(self._ct["potential"].set_index("code")["description"]) if not self._combined: del labels[-1] labels[-99] = "no data" for i in self._veg: vi = pd.Series(self._veg[i].flatten()) rec = vi.value_counts() * self._context.cell_area / 10000 for a in rec.index: td.append((i, a, labels[a], rec[a])) df = pd.DataFrame( td, columns=["vegetation", "presence_code", "presence", "area_ha"] ) return df def _calculate(self, depth, frequency, duration, period): """ Low level calculation of a flooding object. Uses a numpy array for depth rather than a grid file (in calculate) """ orig_shape = depth.shape depth = depth.flatten() nodata = depth == -99 check_codes_used("depth", depth, self._ct["depths"]["code"]) check_codes_used("frequency", frequency, self._ct["frequency"]["code"]) check_codes_used("duration", duration, self._ct["duration"]["code"]) check_codes_used("period", period, ["summer", "winter"]) for veg_code, subtable_veg in self._ct["lnk_potential"].groupby("veg_code"): subtable_veg = subtable_veg.reset_index() # by default we give code 4 (no information/flooding) # https://github.com/inbo/niche_vlaanderen/issues/87 self._veg[veg_code] = np.full(depth.shape, 4, dtype="int16") self._veg[veg_code][nodata] = -99 groupby_cols = ["period", "frequency", "duration"] for index, subtable in subtable_veg.groupby(groupby_cols): if (period, frequency, duration) == index: subtable.reset_index() for row in subtable.itertuples(): veg = self._veg[veg_code] veg[row.depth == depth] = row.potential self._veg[veg_code] = np.maximum(veg, self._veg[veg_code]) self._veg[veg_code] = self._veg[veg_code].reshape(orig_shape)
[docs] def calculate(self, depth_file, frequency, period, duration): """Calculate a floodplain object Parameters ========== depth_file: filename The filename containing a classified grid with inundation dephts. The classes used must correspond to the codes in the depths.csv_ code table. frequency: code The frequency with which flooding occurs, eg T2, T50. Valid values are given in the frequency.csv_ code table. period: winter|summer period in which the flooding occurs. Must be either "summer" or "winter" duration: code Period with which the flooding occurs, from duration.csv_ """ with rasterio.open(depth_file, "r") as dst: depth = dst.read(1) self._context = SpatialContext(dst) if depth.dtype.kind == "u": depth = depth.astype(int) depth[depth == dst.nodatavals[0]] = -99 self._calculate(depth, frequency, duration, period) self.options = {"frequency": frequency, "duration": duration, "period": period}
def plot(self, key, ax=None): try: import matplotlib.pyplot as plt import matplotlib.patches as mpatches from matplotlib.colors import Normalize except (ImportError, RuntimeError): # pragma: no cover msg = "Could not import matplotlib\n" msg += "matplotlib required for plotting functions" raise ImportError(msg) if key not in self._veg.keys(): msg = "vegetation type {} not modeled".format(key) raise FloodingException(msg) if ax is None: fig, ax = plt.subplots() ((a, b), (c, d)) = self._context.extent mpl_extent = (a, c, d, b) veg = ma.masked_equal(self._veg[key], -99) im = plt.imshow(veg, extent=mpl_extent, norm=Normalize(-1, 4)) options = self.options.copy() options["duration"] = ( "< 14 days" if self.options["duration"] == 1 else "> 14 days" ) ax.set_title("{} ({})".format(key, options)) labels = ( self._ct["potential"] .set_index("code")["description"] .to_dict(into=OrderedDict) ) colors = [im.cmap(i / (len(labels) - 1)) for (i, value) in enumerate(labels)] patches = [ mpatches.Patch(color=colors[i], label=labels[code]) for (i, code) in enumerate(labels) if code > -1 or self._combined ] plt.legend(handles=patches, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.0) return ax
[docs] def write(self, folder, overwrite_files=False): """Writes the floodplain grids to grid files. The differences are coded using the values specified in the potential.csv table. Parameters ========== folder: path Path to which the output files will be written. overwrite_files: bool Overwrite files when saving. Note writing will fail if any of the files to be written already exists. """ if len(self._veg) == 0: raise FloodingException( "A valid run must be done before writing the output." ) if not os.path.exists(folder): os.makedirs(folder) params = dict( driver="GTiff", height=self._context.height, width=self._context.width, crs=self._context.crs, transform=self._context.transform, count=1, dtype="int16", nodata=-99, compress="DEFLATE", ) self._files_written = dict() name = "" if self.name != "": name = self.name + "-" files = { "summary": folder + "/" + name + "summary.csv", "log": "{}/{}log.txt".format(folder, name), } for vi in self._veg: filename = "{}/{}F{:02d}-{}-P{}-{}.tif".format( folder, name, vi, self.options["frequency"], self.options["duration"], self.options["period"], ) files[vi] = filename for key in files: if os.path.exists(files[key]): if overwrite_files: self._log.warning( "Warning: file {} already exists".format(files[key]) ) else: raise FloodingException("File {} already exists".format(files[key])) self.table.to_csv(files["summary"], index=False) for vi in self._veg: path = files[vi] with rasterio.open(path, "w", **params) as dst: dst.write(self._veg[vi], 1) self._files_written[filename] = os.path.normpath(path)
[docs] def combine(self, niche_result): """Combines a Flooding model with a Niche model Both models must be run prior to combining them. The niche model will act as a "mask" making areas where Niche predicts a vegetation type to be not present are set to "not combinable". Parameters ========== niche_result: Niche Niche model that must be run prior to running combine. Returns ======= combined: Flooding """ # check niche model has been run if not niche_result.vegetation_calculated: raise FloodingException( "Niche model must be run prior to running this module." ) if not self.vegetation_calculated: raise FloodingException( "Floodplain model must be run prior to running this module." ) if self._context != niche_result._context: raise FloodingException( "Niche model has a different spatial context:\n" + str(self._context) + str(niche_result._context) ) new = copy.copy(self) new._veg = self._veg.copy() for vi in new._veg: nodata = (niche_result._vegetation[vi] == 255) | (new._veg[vi] == -99) new._veg[vi] = niche_result._vegetation[vi] * new._veg[vi] new._veg[vi][niche_result._vegetation[vi] == 0] = -1 new._veg[vi][nodata] = -99 new._combined = True return new