import warnings
import inspect
import json
from packaging.version import parse
from urllib.request import urlopen, URLError
import logging
import os.path
import numbers
import yaml
import datetime
import sys
import numpy as np
import numpy.ma as ma
import pandas as pd
import rasterio
import rasterstats
from tqdm import tqdm
from niche_vlaanderen.vegetation import Vegetation, VegSuitable
from niche_vlaanderen.acidity import Acidity
from niche_vlaanderen.nutrient_level import NutrientLevel
from niche_vlaanderen.spatial_context import SpatialContext
from niche_vlaanderen.version import __version__
from niche_vlaanderen.flooding import Flooding
from niche_vlaanderen.exception import NicheException
from niche_vlaanderen.codetables import package_resource
_allowed_input = {
"soil_code",
"mlw",
"msw",
"mhw",
"seepage",
"inundation_acidity",
"inundation_nutrient",
"nitrogen_atmospheric",
"nitrogen_animal",
"nitrogen_fertilizer",
"management",
"minerality",
"rainwater",
"inundation_vegetation",
"management_vegetation",
"acidity",
"nutrient_level",
}
_minimal_input = {"mlw", "mhw", "soil_code"}
_input_nutrient_level = {
"msw",
"soil_code",
"nitrogen_atmospheric",
"nitrogen_fertilizer",
"nitrogen_animal",
"management",
"inundation_nutrient",
}
_input_acidity = {
"soil_code",
"mlw",
"minerality",
"seepage",
"rainwater",
"inundation_acidity",
}
_abiotic_keys = {"nutrient_level", "acidity"}
_code_tables = {
"ct_acidity",
"ct_soil_mlw_class",
"ct_soil_codes",
"lnk_acidity",
"ct_seepage",
"ct_vegetation",
"ct_management",
"ct_nutrient_level",
"ct_mineralisation",
}
_code_tables_fp = {"duration", "frequency", "lnk_potential", "potential"}
logging.basicConfig()
logger = logging.getLogger(__name__)
[docs]
class Niche(object):
"""Creates a new Niche object
A niche object can be used to predict vegetation types according to the
NICHE Vlaanderen model.
The default codetables are used unless other tables are supplied to the
constructor.
Parameters:
ct_* lnk_*: path
Optionally, paths to codetables can be provided. These will override
the standard codetables used by Niche.
"""
def __init__(
self,
ct_acidity=None,
ct_soil_mlw_class=None,
ct_soil_codes=None,
lnk_acidity=None,
ct_seepage=None,
ct_vegetation=None,
ct_management=None,
ct_nutrient_level=None,
ct_mineralisation=None,
):
self._inputfiles = dict()
self._inputvalues = dict()
self._inputarray = dict()
self._abiotic = dict()
self._code_tables = dict()
self._vegetation = dict()
self._vegetation_detail = dict()
self._deviation = dict()
self._options = dict()
self._options["name"] = ""
self._options["strict_checks"] = True
self._files_written = dict()
self._log = logging.getLogger("niche_vlaanderen")
self._context = None
self.occurrence = None
self._latest_version = self._check_latest_version()
for k in _code_tables:
ct = locals()[k]
if ct is not None:
self._set_ct(k, ct)
@property
def name(self):
return self._options["name"]
@name.setter
def name(self, name):
self._options["name"] = str(name)
def __repr__(self):
s = "# Niche Vlaanderen version: {}\n".format(__version__)
s += self._latest_version + "\n"
s += "# Run at: {}\n\n".format(datetime.datetime.now())
# Also add some versions of the major packages we use - easy for
# debugging
s += "package_versions:\n"
s += " pandas: {}\n".format(pd.__version__)
s += " numpy: {}\n".format(np.__version__)
s += " rasterio: {}\n".format(rasterio.__version__)
s += " gdal: {}\n".format(rasterio.__gdal_version__)
s += " python: '{}'\n".format(sys.version)
s += "\n"
s += "model_options:\n"
options = yaml.dump(self._options, default_flow_style=False)
s += indent(options, " ")
if len(self._code_tables) > 0:
s += "\n\n"
s += "code_tables:\n"
s += indent(yaml.dump(self._code_tables, default_flow_style=False), " ")
if self._context is not None:
s += "\nmodel_properties:\n"
s += " model_extent: " + str(self._context.extent) + "\n"
s += "\n"
s += "input_layers:\n"
input = yaml.dump(self._inputfiles, default_flow_style=False)
input += (
yaml.dump(self._inputvalues, default_flow_style=False)
if len(self._inputvalues) > 0
else ""
)
s += indent(input, " ")
if self.vegetation_calculated:
s += "# Model run completed"
else:
s += "# No model run completed."
if len(self._files_written) > 0:
s += "\nfiles_written:\n"
s += indent(yaml.dump(self._files_written, default_flow_style=False), " ")
return s
def _check_latest_version(self):
url = "https://pypi.python.org/pypi/niche_vlaanderen/json"
try:
response = urlopen(url, timeout=5)
json_response = json.loads(response.read().decode("utf-8"))
releases = json_response["releases"]
versions = sorted(releases, key=parse, reverse=True)
# remove alpha, beta, rc versions
dev = ["rc", "a", "b"]
indev = lambda a: any(devstring in a for devstring in dev)
versions = [v for v in versions if not indev(v)]
last = versions[0]
if last == __version__:
s = "# Using latest niche_vlaanderen %s" % __version__
else:
s = "# Newer niche_vlaanderen %s available" % last # pragma: no cover
except URLError: # pragma: no cover
s = "# Error determinining last upstream version"
return s
def _set_ct(self, key, value):
if key not in _code_tables and key not in _code_tables_fp:
raise NicheException("Unrecognized codetable %s" % key)
if not os.path.isfile(value):
raise NicheException("Cannot find file %s" % key)
self._code_tables[key] = value
[docs]
def read_config_file(self, config, overwrite_ct=False):
"""Sets the input based on an input file, without running it
Configures a model based on a config file
Parameters:
overwrite_ct: bool (False)
Allows the user to override the default codetables (after
the class has been initialized).
"""
with open(config, "r") as stream:
config_loaded = yaml.safe_load(stream)
if overwrite_ct and "code_tables" in config_loaded.keys():
for k in config_loaded["code_tables"].keys():
value = config_loaded["code_tables"][k]
value = os.path.join(os.path.dirname(config), value)
self._set_ct(k, value)
# parse input_layers
for k in config_loaded["input_layers"].keys():
# adjust path to be relative to the yaml file
value = config_loaded["input_layers"][k]
if not isinstance(value, numbers.Number):
value = os.path.join(os.path.dirname(config), value)
self.set_input(k, value)
if "model_options" in config_loaded.keys():
if "name" in config_loaded["model_options"].keys():
self.name = config_loaded["model_options"]["name"]
if "strict_checks" in config_loaded["model_options"].keys():
self._options["strict_checks"] = config_loaded["model_options"][
"strict_checks"
]
if "overwrite_files" in config_loaded["model_options"].keys():
self._options["overwrite_files"] = config_loaded["model_options"][
"overwrite_files"
]
if "output_dir" in config_loaded["model_options"].keys():
self._options["output_dir"] = config_loaded["model_options"][
"output_dir"
]
if "flooding" in config_loaded.keys():
self._options["flooding"] = []
for scen_nr in config_loaded["flooding"]:
scen = {
k: scen_nr[k]
for k in ["name", "depth", "period", "frequency", "duration"]
}
self._options["flooding"].append(scen)
[docs]
def run_config_file(self, config, overwrite_ct=False):
"""Runs Niche using a configuration file
This will configure the model, run and output as specified.
Parameters:
config: string
path to a config file
overwrite_ct: boolean (False)
overwrite codetables using the values specified in
the configuration file.
"""
self.read_config_file(config, overwrite_ct=overwrite_ct)
# Set input values
with open(config, "r") as stream:
config_loaded = yaml.safe_load(stream)
# Run + write according to model options
options = {
k: config_loaded["model_options"][k]
for k in inspect.getfullargspec(self.run).args
if k in config_loaded["model_options"].keys()
}
self.run(**options)
overwrite = False
if "overwrite_files" in self._options and self._options["overwrite_files"]:
overwrite = True
if "flooding" in self._options:
for scen in self._options["flooding"]:
ct_nl = dict()
keys = set(Flooding.__init__.__code__.co_varnames) & set(
self._code_tables
)
for k in keys:
ct_nl[k] = self._code_tables[k]
fp = Flooding(name=scen["name"], **ct_nl)
depth_file = os.path.join(os.path.dirname(config), scen["depth"])
fp.calculate(
depth_file=depth_file,
period=scen["period"],
frequency=scen["frequency"],
duration=scen["duration"],
)
self.fp = fp.combine(self)
if "output_dir" in self._options:
self.fp.write(self._options["output_dir"], overwrite)
self._files_written.update(self.fp._files_written)
if "output_dir" in self._options:
output_dir = self._options["output_dir"]
self.write(output_dir, overwrite)
def _check_all_lower(self, input_array, a, b):
# We ignore comparison problems with np.nan (nodata)
warnings.simplefilter(action="ignore", category=RuntimeWarning)
higher = (
(input_array[a] > input_array[b])
& (input_array[a] != -99)
& (input_array[b] != -99)
)
warnings.simplefilter("default")
if np.any(higher):
# find out which cells have invalid values
bad_points = np.where(higher)
# convert these cells into the projection system
bad_points = self._context.transform * bad_points
print("Warning: Not all {} values are lower than {}".format(a, b))
print("coordinates with invalid values are:")
print(pd.DataFrame(list(bad_points)))
if self._options["strict_checks"]:
raise NicheException(
"Error: not all {} values are lower than {}".format(a, b)
)
def _check_input_files(self, full_model):
"""basic input checks (valid files etc)"""
# Load every input_file in the input_array
inputarray = dict()
for f in self._inputfiles:
dst = rasterio.open(self._inputfiles[f], "r")
nodata = dst.nodatavals[0]
window = self._context.get_read_window(SpatialContext(dst))
band = dst.read(1, window=window)
# if we have unsigned integers - switch to signed otherwise
# no data (-99) will fail.
if band.dtype.kind == "u":
band = band.astype(int)
if f in (
"nitrogen_animal",
"nitrogen_fertilizer",
"nitrogen_atmospheric",
"mhw",
"mlw",
"msw",
):
band = band.astype("float32")
# create a mask for no-data values, taking into account data-types
if band.dtype == "float32" and nodata is not None:
band[np.isclose(nodata, band)] = np.nan
else:
band[band == nodata] = -99
# convert old soil codes to new soil codes
if f == "soil_code" and np.all((band >= 10000)|(band==-99)|np.isnan(band)):
band[band>=10000] = np.round(band[band>=10000] / 10000)
if f == "soil_code" and band.dtype == "float32":
band[np.isnan(band)] = -99
band = band.astype(int)
inputarray[f] = band
# Load in all constant inputvalues
for f in self._inputvalues:
shape = (int(self._context.height), int(self._context.width))
inputarray[f] = np.full(shape, self._inputvalues[f])
# check if valid values are used in inputarrays
# check for valid datatypes - values will be checked in the low-level
# api (eg soil_code present in codetable)
self._check_all_lower(inputarray, "mhw", "mlw")
if "msw" in inputarray.keys():
self._check_all_lower(inputarray, "msw", "mlw")
self._check_all_lower(inputarray, "mhw", "msw")
if full_model and "nutrient_level" not in inputarray.keys():
with np.errstate(invalid="ignore"): # ignore NaN comparison errors
if np.any(
(inputarray["nitrogen_animal"] < 0)
| (inputarray["nitrogen_animal"] > 10000)
| (inputarray["nitrogen_fertilizer"] < 0)
| (inputarray["nitrogen_fertilizer"] > 10000)
| (inputarray["nitrogen_atmospheric"] < 0)
| (inputarray["nitrogen_atmospheric"] > 10000)
):
raise NicheException("Error: nitrogen values must be >0 and <10000")
# if all is successful:
self._inputarray = inputarray
[docs]
def run(self, full_model=True, deviation=False, strict_checks=True):
"""Run the niche model
Runs niche Vlaanderen model. Requires that the necessary input values
have been added using set_input.
Parameters
----------
full_model: bool
Uses the full niche model. The simple model only uses mhw,
mlw and soil_code as input.
deviation: bool
Create the maps with the difference between the needed MHW
and MLW and the actual MHW/MLW for a vegetation type.
strict_checks: bool
By default running a model will fail if impossible combinations
of MxW exist somewhere in the input files. By disabling strict
checks models can still be run. It will still emit a warning.
Note that this is provided to be backwards compatibility and
it is recommended to fix the data rather than disabling this.
"""
self._options["full_model"] = full_model
self._options["deviation"] = deviation
self._options["strict_checks"] = strict_checks
if full_model:
required_input = set(_minimal_input)
given_input = set(self._inputfiles.keys()) | set(self._inputvalues.keys())
if "nutrient_level" not in given_input:
required_input |= set(_input_nutrient_level)
if "acidity" not in given_input:
required_input |= set(_input_acidity)
missing_keys = required_input - given_input
if len(missing_keys) > 0:
print("Different keys are missing: ")
print(missing_keys)
raise NicheException("Error, different obliged keys are missing")
self._check_input_files(full_model)
if full_model:
if "nutrient_level" not in self._inputarray:
ct_nl = dict()
keys = set(NutrientLevel.__init__.__code__.co_varnames) & set(
self._code_tables
)
for k in keys:
ct_nl[k] = self._code_tables[k]
nl = NutrientLevel(**ct_nl)
self._abiotic["nutrient_level"] = nl.calculate(
soil_code=self._inputarray["soil_code"],
msw=self._inputarray["msw"],
nitrogen_atmospheric=self._inputarray["nitrogen_atmospheric"],
nitrogen_animal=self._inputarray["nitrogen_animal"],
nitrogen_fertilizer=self._inputarray["nitrogen_fertilizer"],
management=self._inputarray["management"],
inundation=self._inputarray["inundation_nutrient"],
)
if "acidity" not in self._inputarray:
ct_acidity = dict()
keys = set(Acidity.__init__.__code__.co_varnames) & set(
self._code_tables
)
for k in keys:
ct_acidity[k] = self._code_tables[k]
acidity = Acidity(**ct_acidity)
self._abiotic["acidity"] = acidity.calculate(
self._inputarray["soil_code"],
self._inputarray["mlw"],
self._inputarray["inundation_acidity"],
self._inputarray["seepage"],
self._inputarray["minerality"],
self._inputarray["rainwater"],
)
ct_veg = dict()
keys = set(Vegetation.__init__.__code__.co_varnames) & set(self._code_tables)
for k in keys:
ct_veg[k] = self._code_tables[k]
vegetation = Vegetation(**ct_veg)
if "inundation_vegetation" not in self._inputarray:
self._inputarray["inundation_vegetation"] = None
if "management_vegetation" not in self._inputarray:
self._inputarray["management_vegetation"] = None
veg_arguments = dict(
soil_code=self._inputarray["soil_code"],
mhw=self._inputarray["mhw"],
mlw=self._inputarray["mlw"],
)
if full_model:
veg_arguments.update(
inundation=self._inputarray["inundation_vegetation"],
management=self._inputarray["management_vegetation"],
)
if "nutrient_level" in self._inputarray:
veg_arguments["nutrient_level"] = self._inputarray["nutrient_level"]
else:
veg_arguments["nutrient_level"] = self._abiotic["nutrient_level"]
if "acidity" in self._inputarray:
veg_arguments["acidity"] = self._inputarray["acidity"]
else:
veg_arguments["acidity"] = self._abiotic["acidity"]
(
self._vegetation,
self.occurrence,
self._vegetation_detail,
) = vegetation.calculate(full_model=full_model, **veg_arguments)
if deviation:
self._deviation = vegetation.calculate_deviation(
self._inputarray["soil_code"],
self._inputarray["mhw"],
self._inputarray["mlw"],
)
[docs]
def write(self, folder, overwrite_files=False, detailed_files=False):
"""Saves the model results to a folder
Saves the model results to a folder. Files will be written as geotiff.
Vegetation files have names V1 ... V28
Abiotic files are exported as well (nutrient_level.tif and
acidity.tif) if they were not input files.
Parameters
----------
folder: string
Output folder to which files will be written. Parent directory must
already exist.
overwrite_files: bool
Overwrite files when saving.
Note writing will fail if any of the files to be written already
exists.
detailed_files : bool
Save detailed information on factor affecting vegetation possibility
"""
if not self.vegetation_calculated:
raise NicheException("A valid run must be done before writing the output.")
folder = str(folder)
self._options["output_dir"] = folder
if not os.path.exists(folder):
os.makedirs(folder)
params = dict(
driver="GTiff",
height=self._context.height,
width=self._context.width,
crs=self._context.crs,
transform=self._context.transform,
count=1,
dtype="uint8",
nodata=255,
compress="DEFLATE",
)
prefix = ""
if self.name != "":
prefix = self.name + "_"
files = {
"summary": folder + "/" + prefix + "summary.csv",
"log": "{}/{}log.txt".format(folder, prefix),
}
for vi in self._vegetation:
path = "{}/{}V{:02d}.tif".format(folder, prefix, vi)
files[vi] = path
for vi in self._abiotic:
path = "{}/{}{}.tif".format(folder, prefix, vi)
files[vi] = path
for i in self._deviation:
path = "{}/{}{}.tif".format(folder, prefix, i)
files[i] = path
if detailed_files:
for vi in self._vegetation_detail:
path = "{}/{}V{:02d}_detail.tif".format(folder, prefix, vi)
files["%02d_detail" % vi] = path
for key in files:
if os.path.exists(files[key]):
if overwrite_files:
self._log.info("Warning: file {} already exists".format(files[key]))
else:
raise NicheException("File {} already exists".format(files[key]))
# write a summary file containing the table of the model
self.table.to_csv(files["summary"], index=False)
for vi in self._vegetation:
with rasterio.open(files[vi], "w", **params) as dst:
dst.write(self._vegetation[vi], 1)
self._files_written[vi] = os.path.normpath(files[vi])
# also save the abiotic grids
for vi in self._abiotic:
with rasterio.open(files[vi], "w", **params) as dst:
dst.write(self._abiotic[vi], 1)
self._files_written[vi] = os.path.normpath(files[vi])
if detailed_files:
# write legend file
legend = VegSuitable.legend()
pd.DataFrame({"legend": pd.Series(legend)}).to_csv(folder + "/" + prefix + "legend_detail.csv")
# and the actual grids
for vi in self._vegetation_detail:
filename = files["%02d_detail" % vi]
with rasterio.open(filename, "w", **params) as dst:
dst.write(self._vegetation_detail[vi], 1)
self._files_written["%02d_detail" % vi] = os.path.normpath(filename)
# deviation
params.update(dtype="float64", nodata=-99999)
for i in self._deviation:
with rasterio.open(files[i], "w", **params) as dst:
band = self._deviation[i]
band[band == np.nan] = -99999
dst.write(band, 1)
self._files_written[i] = os.path.normpath(files[i])
with open(files["log"], "w") as f:
f.write(self.__repr__())
[docs]
def plot(self, key, ax=None, fixed_scale=True):
"""Plots the result or input of a Niche object
Creates a plot of an input layer of a Niche object or of the result
of a Niche object.
Note that depending on your matplotlib environment you may still have
to call the matplotlib show method to actually show the plot::
>>> myniche.plot(11)
>>> import matplotlib.pyplot as plt
>>> plt.show()
Parameters
==========
key: veg_code (1..28) or input_code
key of the vegetation type that should be plotted, eg myniche.plot(7)
or key of the input layer you want to plot eg myniche("mhw").
If deviation was calculated with the model, this can also be plotted,
by using `input_code_veg_code` eg `mhw_14`
ax: `matplotlib.axes.Axes`_
optional axis parameter. Can be specified when you want to plot
different plots in one layout
fixed_scale: boolean (default: True)
Use a fixed scale
Returns
=======
ax: `matplotlib.axes.Axes`_
Can be used to update the plot (eg change the
title).
"""
try:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import Normalize
except (ImportError, RuntimeError): # pragma: no cover
msg = "Could not import matplotlib\n"
msg += "matplotlib required for plotting functions"
raise ImportError(msg)
norm = None
v = None
# Inform user that data is not available to plot
# (key is present, but value is None)
if key not in self._inputfiles and key in self._inputarray:
if not self._inputarray[key]:
raise KeyError(f"{key} data is not available to plot.")
if key in self._inputfiles and key not in self._inputarray:
# if set_input has been done, but no model run yet
# in this case we will open the file and fetch the data
with rasterio.open(self._inputfiles[key], "r") as dst:
window = self._context.get_read_window(SpatialContext(dst))
v = dst.read(1, window=window)
v = ma.masked_equal(v, dst.nodatavals[0])
title = key
if key in self._inputarray:
v = self._inputarray[key]
v = ma.masked_equal(v, -99)
title = key
if key in self._abiotic:
v = self._abiotic[key]
v = ma.masked_equal(v, 255)
title = key
if key in self._vegetation.keys():
v = self._vegetation[key]
v = ma.masked_equal(v, 255)
title = "{} ({})".format(self._vegcode2name(key), key)
norm = Normalize(0, 1)
if key in self._deviation:
v = self._deviation[key]
title = key
if fixed_scale:
norm = Normalize(-50, 50)
if key in {"mhw", "mlw", "msw"} and fixed_scale:
norm = Normalize(200, 0)
if v is None:
raise NicheException("Invalid key specified")
if ax is None:
fig, ax = plt.subplots()
((a, b), (c, d)) = self._context.extent
mpl_extent = (a, c, d, b)
im = plt.imshow(v, extent=mpl_extent, norm=norm, interpolation="none")
if self.name != "":
title = self.name + " " + title
ax.set_title(title)
if key in self._vegetation:
labels = ["not present", "present"]
values = [0, 1]
colors = [im.cmap(value / (len(values) - 1)) for value in values]
patches = [mpatches.Patch(color=colors[i], label=labels[i]) for i in values]
plt.legend(
handles=patches, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.0
)
else:
plt.colorbar()
return ax
[docs]
def plot_detail(self, key, limit_legend=True, cmap="tab20"):
"""Detailed plot for a vegetation type
key: veg_code (1..28)
key of the vegetation type that should be plotted
limit_legend: boolean
limits the legend to the types present in the figure
cmap: colormap
colormap to use for the maps. Note that 9 combinations of
presence/absence are possible, so keep this in mind if changing
from the default value.
"""
try:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib
except (ImportError, RuntimeError): # pragma: no cover
msg = "Could not import matplotlib\n"
msg += "matplotlib required for plotting functions"
raise ImportError(msg)
if key not in self._vegetation.keys():
raise NicheException("invalid key for plot, must be a number of a vegetation type")
title = "{} ({})".format(self._vegcode2name(key), key)
v = self._vegetation_detail[key]
v = ma.masked_equal(v, 255)
((a, b), (c, d)) = self._context.extent
mpl_extent = (a, c, d, b)
fig, ax = plt.subplots()
legend = VegSuitable.legend()
legend_keys = np.array(list(legend.keys()))
# convert vegetation matrix to legend item matrix
v_un = np.digitize(v, legend_keys, right=True)
v_un = ma.masked_equal(v_un, len(legend))
plt.imshow(
v_un,
extent=mpl_extent,
cmap=cmap,
interpolation="None",
vmin=0,
vmax=len(VegSuitable.possible()) + 1,
)
if limit_legend:
present = ma.unique(v_un[~v_un.mask])
patches = [
mpatches.Patch(color=matplotlib.colormaps[cmap](i), label=legend[legend_keys[i]])
for i in present
]
else:
patches = [
mpatches.Patch(color=matplotlib.colormaps[cmap](i), label=legend[j])
for i, j in enumerate(legend)
]
plt.legend(handles=patches, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.0)
if self.name != "":
title = self.name + " " + title
ax.set_title(title)
return ax
@property
def table(self):
"""Dataframe containing the potential area (ha) per vegetation type"""
return self._table()
def _table(self, detail=False):
"""Dataframe containing the potential area (ha) per vegetation type
detail: boolean
add info why vegetation is present, rather than just present/not present
"""
if not self.vegetation_calculated:
raise NicheException(
"Error: You must run niche prior to requesting the " "result table"
)
td = list()
if detail is False:
presence = dict({0: "not present", 1: "present", 255: "no data"})
for i in self._vegetation:
vi = pd.Series(self._vegetation[i].flatten())
rec = vi.value_counts() * self._context.cell_area / 10000
for a in rec.index:
td.append((i, presence[a], rec[a]))
else:
legend = VegSuitable.legend()
legend[255] = "no data"
for i in self._vegetation_detail:
vi = pd.Series(self._vegetation_detail[i].flatten())
rec = vi.value_counts() * self._context.cell_area / 10000
for a in rec.index:
td.append((i, legend[a], rec[a]))
df = pd.DataFrame(td, columns=["vegetation", "presence", "area_ha"])
return df
[docs]
def zonal_stats(
self, vectors, outside=True, attribute=None, vegetation_types=None, upscale=1
):
"""Calculates zonal statistics using vectors
Parameters
==========
vectors: path to a vector source or geo-like python objects
you can specify a path to a vector file eg "../test.shp", or pass
geo objects from other python functions.
Note that the vector should be in the same coordinate system as the
raster.
outside: bool (default: True)
report values outside shapes as well. The area which is not covered
by any shapefile will get shape_id -1.
attribute: string(default None):
attribute of the vector source that will be exported along in the
table.
vegetation_types: List | None
optional list of vegetation types (as integer number) for which the
statistics must be calculated. Calculation will happen for all
niche vegetation types by default.
upscale : int
upscaling factor: decrease the cell size by this factor to increase
the resolution
Returns
=======
table: pandas.DataFrame
"""
td = dict()
# Ignore the warnings from rasterstats - code must be adjusted
# in that package - not in our code.
#warnings.simplefilter(action="ignore", category=FutureWarning)
#warnings.simplefilter(action="ignore", category=DeprecationWarning)
presence = dict({0: "not present", 1: "present", 255: "no data"})
if vegetation_types is None:
vegetation_types = self._vegetation.keys()
if len(vegetation_types) == 0:
raise NicheException("Can not calculate zonal statistics for "
"empty vegetation list")
logger.debug(f"vegetation_types: {vegetation_types}")
logger.debug(f"upscaling to {upscale}")
for i in tqdm(vegetation_types):
# Note we use -99 as nodata value to make sure the true nodata
# value (255) is part of the result table.
if upscale == 1:
raster = self._vegetation[i]
affine = self._context.transform
else:
# based on
# https://rasterio.readthedocs.io/en/latest/topics/resampling.html
raster = (
self._vegetation[i].repeat(upscale, axis=0).repeat(upscale, axis=1)
)
affine = self._context.transform * self._context.transform.scale(
self._context.width / raster.shape[1],
self._context.height / raster.shape[0],
)
td[i] = rasterstats.zonal_stats(
vectors=vectors,
raster=raster,
affine=affine,
categorical=True,
nodata=Vegetation.nodata_veg,
geojson_out=attribute is not None,
)
warnings.simplefilter("default")
ti = []
attribute_list = []
for vi in td:
for shape_i, rec in enumerate(td[vi]):
if attribute is not None:
rec = rec["properties"]
for a in presence:
pixels = rec.get(a) if rec.get(a) is not None else 0
ti.append(
(
int(vi),
shape_i,
presence[a],
pixels * self._context.cell_area / 10000 / (upscale ** 2),
)
)
if attribute is not None:
attribute_list.append(rec[attribute])
df = pd.DataFrame(ti, columns=["vegetation", "shape_id", "presence", "area_ha"])
if outside:
# calculate area outside polygons
a = pd.concat(
[
self.table.groupby(by=["vegetation", "presence"])["area_ha"].sum(),
df.groupby(by=["vegetation", "presence"])["area_ha"].sum(),
],
axis=1,
)
a = a.reset_index().fillna(0)
a.columns = ["vegetation", "presence", "all", "inshape"]
a["area_ha"] = a["all"] - a["inshape"]
a["shape_id"] = -1
a = a[["vegetation", "shape_id", "presence", "area_ha"]]
df = pd.concat([df, a])
attribute_list.extend(a["shape_id"])
if attribute is not None:
df[attribute] = attribute_list
return df
def _vegcode2name(self, vegcode):
"""Converts a vegetation code to a name
Uses ct_vegetation columns veg_code and veg_type"""
if not hasattr(self, "_vegcode2namedict"):
if "ct_vegetation" in self._code_tables:
ct_vegetation = self._code_tables["ct_vegetation"]
else:
ct_vegetation = package_resource(
["system_tables"], "niche_vegetation.csv")
ct_vegetation = pd.read_csv(ct_vegetation)
subtable = ct_vegetation[["veg_code", "veg_type"]]
veg_dict = subtable.set_index("veg_code").to_dict()["veg_type"]
self._vegcode2namedict = veg_dict
return self._vegcode2namedict[vegcode]
@property
def vegetation_calculated(self):
return len(self._vegetation) > 0
def _clear_result(self):
"""Clears calculated vegetation"""
self._vegetation.clear()
self._deviation.clear()
def indent(s, pre):
return pre + s.replace("\n", "\n" + pre)
[docs]
class NicheDelta(object):
"""Class containing the difference between two niche runs
The difference can be visualized using the plot method. It is also
possible to derive a table with the area's according to each group.
See the tutorial at: `Comparing Niche classes`_
"""
_values = [0, 1, 2, 3, 4]
_labels = [
"not present in both models",
"present in both models",
"only in model 1",
"only in model 2",
"nodata in one model",
]
name = ""
def __init__(self, n1, n2):
self._delta = dict()
self._log = logging.getLogger("niche_vlaanderen")
if n1._context is None or n2._context is None:
raise NicheException(
"No extent in Niche object. Please run both models prior to "
"calculating a delta."
)
if n1._context != n2._context:
raise NicheException(
"Spatial contexts differ, can not make a delta\n"
"Context 1 %s\n"
"Context 2 %s" % (n1._context, n2._context)
)
self._context = n1._context
if len(n1._vegetation) == 0 or len(n2._vegetation) == 0:
raise NicheException(
"No vegetation in Niche object. Please run both models prior "
"to calculating a delta."
)
# the error below should not occur as we check the context, but
# better safe than sorry
if n1._vegetation[1].size != n2._vegetation[1].size: # pragma: no cover
raise NicheException("Arrays have different size.")
if len(n1._vegetation) != len(n2._vegetation):
raise NicheException("Niche vegetation objects have different length.")
for vi in n1._vegetation:
n1v = n1._vegetation[vi].flatten()
n2v = n2._vegetation[vi].flatten()
res = np.full(n1v.size, 4, dtype="uint8")
res[((n1v == 0) & (n2v == 0))] = 0
res[((n1v == 1) & (n2v == 1))] = 1
res[((n1v == 1) & (n2v == 0))] = 2
res[((n1v == 0) & (n2v == 1))] = 3
res[((n1v == 255) & (n2v == 255))] = 255
res.resize(n1._vegetation[vi].shape)
self._delta[vi] = ma.masked_equal(res, 255)
self._n1 = n1
[docs]
def write(self, folder, overwrite_files=False):
"""Writes the difference grids to grid files.
The differences are coded using these values:
* 0: "not present in both models"
* 1: "present in both models"
* 2: "only in model 1"
* 3: "only in model 2"
* 4: "nodata in one model"
Parameters
==========
folder: path
Path to which the output files will be written.
overwrite_files: bool
Whether files should be overwritten on save.
"""
if not os.path.exists(folder):
os.makedirs(folder)
params = dict(
driver="GTiff",
height=self._context.height,
width=self._context.width,
crs=self._context.crs,
transform=self._context.transform,
count=1,
dtype="uint8",
nodata=255,
compress="DEFLATE",
)
prefix = ""
if self.name != "":
prefix = self.name + "_"
files = {
"summary": "{}/{}delta_summary.csv".format(folder, prefix),
"legend": "{}/{}legend_delta.csv".format(folder, prefix),
}
for vi in self._delta:
files[vi] = "{}/{}D{}.tif".format(folder, prefix, vi)
for key in files:
if os.path.exists(files[key]):
if overwrite_files:
self._log.warning(
"Warning: file {} already exists".format(files[key])
)
else:
raise NicheException("File {} already exists".format(files[key]))
for vi in self._delta:
with rasterio.open(files[vi], "w", **params) as dst:
dst.write(self._delta[vi], 1)
# Also the resulting table is written
self.table.to_csv(files["summary"], index=False)
# And a small file containing the legend
legend = pd.DataFrame(dict(code=self._values, labels=self._labels))
legend.to_csv(files["legend"], index=False)
[docs]
def plot(self, key, ax=None):
"""
Plots the difference between two classes
Parameters
==========
key: number
The vegetation code (1-28)
"""
try:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import Normalize
except (ImportError, RuntimeError): # pragma: no cover
msg = "Could not import matplotlib\n"
msg += "matplotlib required for plotting functions"
raise ImportError(msg)
if ax is None:
fig, ax = plt.subplots()
((a, b), (c, d)) = self._context.extent
mpl_extent = (a, c, d, b)
im = plt.imshow(
self._delta[key],
extent=mpl_extent,
norm=Normalize(0, max(self._values)),
interpolation="none",
)
if self.name != "":
title = "{} ({}-{})".format(self.name, self._n1._vegcode2name(key), key)
else:
title = "{} ({})".format(self._n1._vegcode2name(key), key)
ax.set_title(title)
labels = self._labels
values = self._values
colors = [im.cmap(value / (len(values) - 1)) for value in values]
patches = [mpatches.Patch(color=colors[i], label=labels[i]) for i in values]
plt.legend(handles=patches, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.0)
return ax
@property
def table(self):
"""Return a summarized dataframe for a NicheDelta object
Returns
=======
df: `pandas.DataFrame`_
"""
td = list()
for i in self._delta:
vi = pd.Series(self._delta[i].flatten())
rec = vi.value_counts()
for a in rec.index:
td.append(
(i, self._labels[int(a)], rec[a] * self._context.cell_area / 10000)
)
df = pd.DataFrame(td, columns=["vegetation", "presence", "area_ha"])
return df
def conductivity2minerality(conductivity, minerality):
"""Convert a grid with conductivity to a grid of minerality
Helper function that converts a grid with conductivity values
to a grid of minerality values (where conductivity > 500).
Supplied for backwards compatibility with the original niche vlaanderen
model.
Parameters
==========
conductivity: filename
Original grid with conductivity values.
minerality: filename
New grid where minerality values are stored. This will be a geotiff, so
extension .tif is recommended.
"""
# we will ignore future warnings from rasterio
with warnings.catch_warnings():
warnings.simplefilter("ignore", FutureWarning)
with rasterio.open(conductivity, "r") as src:
band = src.read(1)
band = np.where(band > 500, 1, 0)
profile = src.profile
profile["driver"] = "GTiff"
profile["compress"] = "DEFLATE"
band = band.astype(profile["dtype"])
with rasterio.open(minerality, "w", **profile) as dst:
dst.write(band, 1)