Coverage for C:\src\imod-python\imod\flow\pkgbase.py: 78%
186 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 14:15 +0200
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 14:15 +0200
1import abc
2import pathlib
4import cftime
5import jinja2
6import numpy as np
7import xarray as xr
9import imod
10from imod.flow import timeutil
11from imod.util.nested_dict import initialize_nested_dict, set_nested
14class Package(
15 abc.ABC
16): # TODO: Abstract base class really necessary? Are we using abstract methods?
17 """
18 Base package for the different iMODFLOW packages. Package is used to share
19 methods for specific packages with no time component.
21 It is not meant to be used directly, only to inherit from, to implement new
22 packages.
24 Every package contains a ``_pkg_id`` for identification. Used to check for
25 duplicate entries, or to group multiple systems together (riv, ghb, drn).
27 The ``_template_projectfile`` attribute is the template for a section of the
28 projectfile. This is filled in based on the metadata from the DataArrays that
29 are within the Package.
30 """
32 _template_projectfile = jinja2.Template(
33 "0001, ({{pkg_id}}), 1, {{name}}, {{variable_order}}\n"
34 '{{"{:03d}".format(variable_order|length)}}, {{"{:03d}".format(n_entry)}}\n'
35 "{%- for variable in variable_order%}\n" # Preserve variable order
36 "{%- for layer, value in package_data[variable].items()%}\n"
37 "{%- if value is string %}\n"
38 # If string then assume path:
39 # 1 indicates the layer is activated
40 # 2 indicates the second element of the final two elements should be read
41 # 1.000 is the multiplication factor
42 # 0.000 is the addition factor
43 # -9999 indicates there is no data, following iMOD usual practice
44 '1, 2, {{"{:03d}".format(layer)}}, 1.000, 0.000, -9999., {{value}}\n'
45 "{%- else %}\n"
46 # Else assume a constant value is provided
47 '1, 1, {{"{:03d}".format(layer)}}, 1.000, 0.000, {{value}}, ""\n'
48 "{%- endif %}\n"
49 "{%- endfor %}\n"
50 "{%- endfor %}\n"
51 )
53 def __init__(self):
54 self.dataset = xr.Dataset()
56 def __getitem__(self, key):
57 return self.dataset.__getitem__(key)
59 @classmethod
60 def from_file(cls, path, **kwargs):
61 """
62 Loads an imod-flow package from a file (currently only netcdf and zarr are supported).
63 Note that it is expected that this file was saved with imod.flow.Package.save(),
64 as the checks upon package initialization are not done again!
66 Parameters
67 ----------
68 path : str, pathlib.Path
69 Path to the file.
70 **kwargs : keyword arguments
71 Arbitrary keyword arguments forwarded to ``xarray.open_dataset()``, or
72 ``xarray.open_zarr()``.
73 Refer to the examples.
75 Returns
76 -------
77 package : imod.flow.Package
78 Returns a package with data loaded from file.
80 Examples
81 --------
83 To load a package from a file, e.g. a River package:
85 >>> river = imod.flow.River.from_file("river.nc")
87 For large datasets, you likely want to process it in chunks. You can
88 forward keyword arguments to ``xarray.open_dataset()`` or
89 ``xarray.open_zarr()``:
91 >>> river = imod.flow.River.from_file("river.nc", chunks={"time": 1})
93 Refer to the xarray documentation for the possible keyword arguments.
94 """
96 # Throw error if user tries to use old functionality
97 if "cache" in kwargs:
98 if kwargs["cache"] is not None:
99 raise NotImplementedError(
100 "Caching functionality in pkg.from_file() is removed."
101 )
103 path = pathlib.Path(path)
105 # See https://stackoverflow.com/a/2169191
106 # We expect the data in the netcdf has been saved a a package
107 # thus the checks run by __init__ and __setitem__ do not have
108 # to be called again.
109 return_cls = cls.__new__(cls)
111 if path.suffix in (".zip", ".zarr"):
112 # TODO: seems like a bug? Remove str() call if fixed in xarray/zarr
113 return_cls.dataset = xr.open_zarr(str(path), **kwargs)
114 else:
115 return_cls.dataset = xr.open_dataset(path, **kwargs)
117 return return_cls
119 # TODO:
120 # def __getattribute__(self, name):
121 # "implement the: https://github.com/xgcm/xgcm/issues/225#issuecomment-762248339"
122 # pass
124 def _pkgcheck(self, **kwargs):
125 pass
127 def _check_if_nan_in_active_cells(self, active_cells=None, vars_to_check=None):
128 """Check if there are any nans in the active domain"""
129 for var in vars_to_check:
130 if (active_cells & np.isnan(self.dataset[var])).any():
131 raise ValueError(
132 f"Active cells in ibound may not have a nan value in {var}"
133 )
135 def _check_positive(self, varnames):
136 for var in varnames:
137 # Take care with nan values
138 if (self[var] < 0).any():
139 raise ValueError(f"{var} in {self} must be positive")
141 def _is_periodic(self):
142 # Periodic stresses are defined for all variables
143 first_var = list(self.dataset.data_vars)[0]
144 return "stress_periodic" in self.dataset[first_var].attrs
146 def _hastime(self):
147 return (self._pkg_id == "wel" and "time" in self.dataset) or (
148 "time" in self.dataset.coords
149 )
151 def isel(self):
152 raise NotImplementedError(
153 f"Selection on packages not yet supported. "
154 f"To make a selection on the xr.Dataset, call {self._pkg_id}.dataset.isel instead. "
155 f"You can create a new package with a selection by calling {__class__.__name__}(**{self._pkg_id}.dataset.isel(**selection))"
156 )
158 def sel(self):
159 raise NotImplementedError(
160 f"Selection on packages not yet supported. "
161 f"To make a selection on the xr.Dataset, call {self._pkg_id}.dataset.sel instead. "
162 f"You can create a new package with a selection by calling {__class__.__name__}(**{self._pkg_id}.dataset.sel(**selection))"
163 )
165 def compose(self, directory, globaltimes, nlayer, composition=None, **ignored):
166 """
167 Composes package, not useful for boundary conditions
169 Parameters
170 ----------
171 directory : str
172 Path to working directory, where files will be written.
173 Necessary to generate the paths for the projectfile.
174 globaltimes : list #TODO make this an *arg, change order.
175 Not used, only included to comply with BoundaryCondition.compose
176 nlayer : int
177 Number of layers
178 **ignored
179 Contains keyword arguments unused for packages
180 """
182 composition = initialize_nested_dict(3)
184 for varname in self._variable_order:
185 composition[self._pkg_id][varname] = self._compose_values_layer(
186 varname, directory, nlayer
187 )
189 return composition
191 def _compose_path(self, d, pattern=None):
192 """
193 Construct a filename, following the iMOD conventions.
194 Returns an absolute path.
196 Parameters
197 ----------
198 d : dict
199 dict of parts (time, layer) for filename.
200 pattern : string or re.pattern
201 Format to create pattern for.
203 Returns
204 -------
205 str
206 Absolute path.
208 """
209 return str(imod.util.path.compose(d, pattern).resolve())
211 def _compose_values_layer(self, varname, directory, nlayer, time=None):
212 """
213 Composes paths to files, or gets the appropriate scalar value for
214 a single variable in a dataset.
216 Parameters
217 ----------
218 varname : str
219 variable name of the DataArray
220 directory : str
221 Path to working directory, where files will be written.
222 Necessary to generate the paths for the projectfile.
223 nlayer : int
224 Amount of layers
225 time : datetime like, optional
226 Time corresponding to the value.
228 Returns
229 -------
230 values : dict
231 Dictionary containing the {layer number: path to file}.
232 Alternatively: {layer number: scalar value}. The layer number may be
233 a wildcard (e.g. '?').
234 """
235 pattern = "{name}"
237 values = initialize_nested_dict(1)
238 da = self[varname]
240 d = {"directory": directory, "name": varname, "extension": ".idf"}
242 if time is not None:
243 if isinstance(time, (np.datetime64, cftime.datetime)):
244 d["time"] = time
245 pattern += "_{time:%Y%m%d%H%M%S}"
246 else:
247 d["timestr"] = time
248 pattern += "_{timestr}"
250 # Scalar value or not?
251 # If it's a scalar value we can immediately write
252 # otherwise, we have to write a file path
253 if "y" not in da.coords and "x" not in da.coords:
254 idf = False
255 else:
256 idf = True
258 if "layer" not in da.coords:
259 if idf:
260 pattern += "{extension}"
261 for layer in range(1, nlayer + 1): # 1-based indexing
262 values[layer] = self._compose_path(d, pattern=pattern)
263 else:
264 for layer in range(1, nlayer + 1): # 1-based indexing
265 values[layer] = da.values[()]
267 else:
268 pattern += "_l{layer}{extension}"
269 for layer in np.atleast_1d(da.coords["layer"].values):
270 if idf:
271 d["layer"] = layer
272 values[layer] = self._compose_path(d, pattern=pattern)
273 else:
274 dim_sel = {}
275 if "layer" in da.dims:
276 dim_sel["layer"] = layer
277 if "time" in da.dims:
278 dim_sel["time"] = time
279 values[layer] = da.sel(**dim_sel).values[()]
281 return values
283 def _compose_values_time(self, varname, globaltimes):
284 da = self.dataset[varname]
285 values = {}
287 if "time" in da.coords:
288 package_times = da.coords["time"].values
290 starts = timeutil.forcing_starts(package_times, globaltimes)
291 for itime, start in enumerate(starts):
292 # TODO: this now fails on a non-dim time too
293 # solution roughly the same as for layer above?
294 values[start] = da.isel(time=itime).values[()]
295 else:
296 values["steady-state"] = da.values[()]
298 return values
300 def _render_projectfile(self, **kwargs):
301 """
302 Returns
303 -------
304 rendered : str
305 The rendered projfectfile part,
306 if part of PkgGroup: for a single boundary condition system.
307 """
308 return self._template_projectfile.render(**kwargs)
310 def save(self, directory):
311 for name, da in self.dataset.data_vars.items(): # pylint: disable=no-member
312 if "y" in da.coords and "x" in da.coords:
313 path = pathlib.Path(directory).joinpath(name)
314 imod.idf.save(path, da)
317class BoundaryCondition(Package, abc.ABC):
318 """
319 BoundaryCondition is used to share methods for specific stress packages
320 with a time component.
322 It is not meant to be used directly, only to inherit from, to implement new
323 packages.
324 """
326 _template_projectfile = jinja2.Template(
327 # Specify amount of timesteps for a package
328 # 1 indicates if package is active or not
329 '{{"{:04d}".format(package_data|length)}}, ({{pkg_id}}), 1, {{name}}, {{variable_order}}\n'
330 "{%- for time_key, time_data in package_data.items()%}\n"
331 # Specify stress period
332 # Specify amount of variables and entries(nlay, nsys) to be expected
333 "{{times[time_key]}}\n"
334 '{{"{:03d}".format(time_data|length)}}, {{"{:03d}".format(n_entry)}}\n'
335 "{%- for variable in variable_order%}\n" # Preserve variable order
336 "{%- for system, system_data in time_data[variable].items() %}\n"
337 "{%- for layer, value in system_data.items() %}\n"
338 "{%- if value is string %}\n"
339 # If string then assume path:
340 # 1 indicates the layer is activated
341 # 2 indicates the second element of the final two elements should be read
342 # 1.000 is the multiplication factor
343 # 0.000 is the addition factor
344 # -9999 indicates there is no data, following iMOD usual practice
345 '1, 2, {{"{:03d}".format(layer)}}, 1.000, 0.000, -9999., {{value}}\n'
346 "{%- else %}\n"
347 # Else assume a constant value is provided
348 '1, 1, {{"{:03d}".format(layer)}}, 1.000, 0.000, {{value}}, ""\n'
349 "{%- endif %}\n"
350 "{%- endfor %}\n"
351 "{%- endfor %}\n"
352 "{%- endfor %}\n"
353 "{%- endfor %}\n"
354 )
356 def repeat_stress(self, use_cftime=False, **repeats):
357 """
358 Repeat stress periods.
360 Parameters
361 ----------
362 use_cftime: bool
363 Whether to force datetimes to cftime or not.
364 **repeats: dict of datetime - datetime pairs
365 keyword argument with variable name as keyword and
366 a dict as value. This dict contains a datetime as key
367 which maps to another already existing datetime in the
368 BoundaryCondition, for which data should be repeated.
370 """
371 # This is a generic implementation of repeat_stress in iMOD-WQ.
372 # Genericity in this module is possible because
373 # of the existence of self._variable_order.
375 # Check first if all the provided repeats are actually
376 # arguments of the package
377 self._varnames_in_variable_order(repeats.keys())
379 # Loop over variable order
380 for varname in self._variable_order:
381 if varname in repeats.keys():
382 self._repeat_stress(varname, repeats[varname], use_cftime=use_cftime)
383 else: # Default to None, like in WQ implementation
384 self._repeat_stress(varname, None, use_cftime=use_cftime)
386 def _repeat_stress(self, varname, value, use_cftime):
387 if value is not None:
388 if varname not in self.dataset:
389 raise ValueError(
390 f"{varname} does not occur in {self}\n cannot add stress_repeats"
391 )
392 if "time" not in self[varname].coords:
393 raise ValueError(
394 f"{varname} in {self}\n does not have dimension time, cannot add stress_repeats."
395 )
397 # Replace both key and value by the right datetime type
398 d = {
399 imod.util.time.to_datetime_internal(
400 k, use_cftime
401 ): imod.util.time.to_datetime_internal(v, use_cftime)
402 for k, v in value.items()
403 }
404 self[varname].attrs["stress_repeats"] = d
406 def periodic_stress(
407 self,
408 periods,
409 use_cftime=False,
410 ):
411 """
412 Periodic stress periods.
414 Adds periodic stresses to each variable in the package. iMODFLOW will
415 then implicitly repeat these.
417 The iMOD manual currently states: 'A PERIOD repeats until another time
418 definition is more close to the current time step'.
420 Parameters
421 ----------
422 periods: dict of datetime - string pairs
423 contains a datetime as key which maps to a period label. This will
424 be used for the entire package.
425 use_cftime: bool
426 Whether to force datetimes to cftime or not.
428 Examples
429 --------
430 The following example assigns a higher head to the summer period than
431 winter period. iMODFLOW will switch to period "summer" once
432 'xxxx-04-01' has passed, and "winter" once 'xxxx-10-01' has passed.
434 >>> times = [np.datetime64('2000-04-01'), np.datetime64('2000-10-01')]
436 >>> head_periodic = xr.DataArray([2., 1.], coords={"time": times}, dims=["time"])
438 >>> timemap = {times[0]: "summer", times[1]: "winter"}
440 >>> ghb = GeneralHeadBoundary(head = head_periodic, conductance = 10.)
441 >>> ghb.periodic_stress(timemap)
443 """
445 if "time" not in self.dataset.coords:
446 raise ValueError(
447 f"{self} does not have dimension time, cannot add stress_periodic."
448 )
450 if self.dataset.coords["time"].size != len(periods):
451 raise ValueError(
452 f"{self} does not have the same amounnt of timesteps as number of periods."
453 )
455 # Replace both key and value by the right datetime type
456 d = {
457 imod.util.time.to_datetime_internal(k, use_cftime): v
458 for k, v in periods.items()
459 }
461 for varname in self._variable_order:
462 self.dataset[varname].attrs["stress_periodic"] = d
464 def _varnames_in_variable_order(self, varnames):
465 """Check if varname in _variable_order"""
466 for varname in varnames:
467 if varname not in self._variable_order:
468 raise ValueError(
469 f"{varname} not recognized for {self}, choose one of {self._variable_order}"
470 )
472 def _get_runfile_times(self, da, globaltimes, ds_times=None):
473 if ds_times is None:
474 ds_times = self.dataset.coords["time"].values
476 if "stress_repeats" in da.attrs:
477 stress_repeats_keys = np.array(list(da.attrs["stress_repeats"].keys()))
478 stress_repeats_values = np.array(list(da.attrs["stress_repeats"].values()))
479 package_times, inds = np.unique(
480 np.concatenate([ds_times, stress_repeats_keys]), return_index=True
481 )
482 # Times to write in the runfile
483 runfile_times = np.concatenate([ds_times, stress_repeats_values])[inds]
484 starts = timeutil.forcing_starts(package_times, globaltimes)
485 elif "stress_periodic" in da.attrs:
486 runfile_times = package_times = ds_times
487 starts = [da.attrs["stress_periodic"][time] for time in ds_times]
488 else:
489 runfile_times = package_times = ds_times
490 starts = timeutil.forcing_starts(package_times, globaltimes)
492 return runfile_times, starts
494 def compose(
495 self,
496 directory,
497 globaltimes,
498 nlayer,
499 system_index=1,
500 pkggroup_time=None,
501 ):
502 """
503 Composes all variables for one system.
505 Parameters
506 ----------
507 globaltimes : list, np.array
508 Holds the global times, i.e. the combined unique times of every
509 boundary condition that are used to define the stress periods. The
510 times of the BoundaryCondition do not have to match all the global
511 times. When a globaltime is not present in the BoundaryCondition,
512 the value of the first previous available time is filled in. The
513 effective result is a forward fill in time.
514 directory : str
515 Path to working directory, where files will be written. Necessary
516 to generate the paths for the projectfile.
517 nlayer : int
518 Number of layers
519 system_index : int
520 System number. Defaults to 1, but for package groups it is used
521 pkggroup_times : optional, list, np.array
522 Holds the package_group times. Packages in one group need to be
523 synchronized for iMODFLOW.
525 Returns
526 -------
527 composition : defaultdict
528 A nested dictionary containing following the projectfile hierarchy:
529 ``{_pkg_id : {stress_period : {varname : {system_index : {lay_nr : value}}}}}``
530 where 'value' can be a scalar or a path to a file.
531 The stress period number may be the wildcard '?'.
532 """
534 composition = initialize_nested_dict(5)
536 for data_var in self._variable_order:
537 keys_ls, values = self._compose_values_timelayer(
538 data_var,
539 globaltimes,
540 directory,
541 nlayer,
542 system_index=system_index,
543 pkggroup_times=pkggroup_time,
544 )
545 for keys, value in zip(keys_ls, values):
546 set_nested(composition, keys, value)
548 return composition
550 def _compose_values_timelayer(
551 self,
552 varname,
553 globaltimes,
554 directory,
555 nlayer,
556 system_index=1,
557 pkggroup_times=None,
558 ):
559 """
560 Composes paths to files, or gets the appropriate scalar value for a
561 single variable in a dataset.
563 Parameters
564 ----------
565 varname : str
566 variable name of the DataArray
567 globaltimes : list, np.array
568 Holds the global times, i.e. the combined unique times of every
569 boundary condition that are used to define the stress periods. The
570 times of the BoundaryCondition do not have to match all the global
571 times. When a globaltime is not present in the BoundaryCondition,
572 the value of the first previous available time is filled in. The
573 effective result is a forward fill in time.
574 directory : str
575 Path to working directory, where files will be written. Necessary
576 to generate the paths for the projectfile.
577 nlayer : int
578 Number of layers
579 system_index : int
580 System number. Defaults to 1, but for package groups it is used
581 pkggroup_times : optional, list, np.array
582 Holds the package_group times. Packages in one group need to be
583 synchronized for iMODFLOW.
585 Returns
586 -------
587 keys : list of lists
588 Contains keys for nested dict in the right order
589 values : list
590 List with composed layers
592 """
594 values = []
595 keys = []
597 da = self[varname]
599 # Check if time defined for one variable in package
600 # iMODFLOW's projectfile requires all variables for every timestep
601 if self._hastime():
602 compose_with_time = True
603 times_for_path, starts = self._get_runfile_times(
604 da, globaltimes, ds_times=pkggroup_times
605 )
606 # Catch case where the package has no time, but another
607 # package in the group has. So the path has no time, but
608 # needs a time entry in the composition
609 elif pkggroup_times is not None:
610 compose_with_time = True
611 _, starts = self._get_runfile_times(
612 da, globaltimes, ds_times=pkggroup_times
613 )
614 times_for_path = [None] * len(starts)
615 else:
616 compose_with_time = False
618 if compose_with_time:
619 for time, start in zip(times_for_path, starts):
620 composed_layers = self._compose_values_layer(
621 varname, directory, nlayer, time=time
622 )
623 values.append(composed_layers)
624 keys.append([self._pkg_id, start, varname, system_index])
626 else:
627 composed_layers = self._compose_values_layer(
628 varname, directory, nlayer, time=None
629 )
630 values.append(composed_layers)
631 keys.append([self._pkg_id, "steady-state", varname, system_index])
633 return keys, values
636class TopBoundaryCondition(BoundaryCondition, abc.ABC):
637 """
638 Abstract base class for boundary conditions that are only assigned to
639 the first layer, namely the Recharge and EvapoTranspiration package.
640 """
642 _template_projectfile = jinja2.Template(
643 # Specify amount of timesteps for a package
644 # 1 indicates if package is active or not
645 '{{"{:04d}".format(package_data|length)}}, ({{pkg_id}}), 1, {{name}}, {{variable_order}}\n'
646 "{%- for time_key, time_data in package_data.items()%}\n"
647 # Specify stress period
648 # Specify amount of variables and entries(nlay, nsys) to be expected
649 "{{times[time_key]}}\n"
650 '{{"{:03d}".format(time_data|length)}}, {{"{:03d}".format(n_entry)}}\n'
651 "{%- for variable in variable_order%}\n" # Preserve variable order
652 "{%- for system, system_data in time_data[variable].items() %}\n"
653 # Recharge only applied to first layer
654 "{%- set value = system_data[1]%}\n"
655 "{%- if value is string %}\n"
656 # If string then assume path:
657 # 1 indicates the layer is activated
658 # 2 indicates the second element of the final two elements should be read
659 # 001 indicates recharge is applied to the first layer
660 # 1.000 is the multiplication factor
661 # 0.000 is the addition factor
662 # -9999 indicates there is no data, following iMOD usual practice
663 "1, 2, 001, 1.000, 0.000, -9999., {{value}}\n"
664 "{%- else %}\n"
665 # Else assume a constant value is provided
666 '1, 1, 001, 1.000, 0.000, {{value}}, ""\n'
667 "{%- endif %}\n"
668 "{%- endfor %}\n"
669 "{%- endfor %}\n"
670 "{%- endfor %}\n"
671 )
673 def _select_first_layer_composition(self, composition):
674 """Select first layer in an exisiting composition."""
675 composition_first_layer = initialize_nested_dict(5)
677 # Loop over nested dict, it is not pretty
678 for a, aa in composition[self._pkg_id].items():
679 for b, bb in aa.items():
680 for c, cc in bb.items():
681 composition_first_layer[a][b][c][1] = cc[1]
682 return composition_first_layer
684 def compose(
685 self,
686 directory,
687 globaltimes,
688 nlayer,
689 ):
690 composition = super().compose(
691 directory,
692 globaltimes,
693 nlayer,
694 )
696 composition[self._pkg_id] = self._select_first_layer_composition(composition)
698 return composition