Coverage for C:\src\imod-python\imod\mf6\timedis.py: 83%
52 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-08 13:27 +0200
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-08 13:27 +0200
1import cftime
2import numpy as np
4from imod.logging import init_log_decorator
5from imod.mf6.package import Package
6from imod.schemata import DimsSchema, DTypeSchema
9def iso8601(datetime):
10 datetype = type(datetime)
11 if issubclass(datetype, np.datetime64):
12 return np.datetime_as_string(datetime)
13 elif issubclass(datetype, cftime.datetime):
14 return datetime.isoformat()
15 else:
16 raise TypeError(f"Expected np.datetime64 or cftime.datetime, got {datetype}")
19class TimeDiscretization(Package):
20 """
21 Timing for all models of the simulation is controlled by the Temporal
22 Discretization (TDIS) Package.
23 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.0.4.pdf#page=17
25 Parameters
26 ----------
27 timestep_duration: float
28 is the length of a stress period. (PERLEN)
29 n_timesteps: int, optional
30 is the number of time steps in a stress period (nstp).
31 Default value: 1
32 timestep_multiplier: float, optional
33 is the multiplier for the length of successive time steps. The length of
34 a time step is calculated by multiplying the length of the previous time
35 step by timestep_multiplier (TSMULT).
36 Default value: 1.0
37 validate: {True, False}
38 Flag to indicate whether the package should be validated upon
39 initialization. This raises a ValidationError if package input is
40 provided in the wrong manner. Defaults to True.
41 """
43 _pkg_id = "tdis"
44 _keyword_map = {}
45 _template = Package._initialize_template(_pkg_id)
47 _init_schemata = {
48 "timestep_duration": [DimsSchema("time"), DTypeSchema(np.floating)],
49 "n_timesteps": [DimsSchema("time") | DimsSchema(), DTypeSchema(np.integer)],
50 "timestep_multiplier": [
51 DimsSchema("time") | DimsSchema(),
52 DTypeSchema(np.floating),
53 ],
54 }
56 _write_schemata = {}
58 @init_log_decorator()
59 def __init__(
60 self,
61 timestep_duration,
62 n_timesteps=1,
63 timestep_multiplier=1.0,
64 validate: bool = True,
65 ):
66 dict_dataset = {
67 "timestep_duration": timestep_duration,
68 "n_timesteps": n_timesteps,
69 "timestep_multiplier": timestep_multiplier,
70 }
71 super().__init__(dict_dataset)
72 self._validate_init_schemata(validate)
74 def render(self):
75 start_date_time = iso8601(self.dataset["time"].values[0])
76 d = {
77 "time_units": "days",
78 "start_date_time": start_date_time,
79 }
80 timestep_duration = self.dataset["timestep_duration"]
81 n_timesteps = self.dataset["n_timesteps"]
82 timestep_multiplier = self.dataset["timestep_multiplier"]
83 nper = timestep_duration.size # scalar will also have size 1
84 d["nper"] = nper
86 # Broadcast everything to a 1D array so it's iterable
87 # also fills in a scalar value everywhere
88 broadcast_array = np.ones(nper, dtype=np.int32)
89 timestep_duration = np.atleast_1d(timestep_duration) * broadcast_array
90 n_timesteps = np.atleast_1d(n_timesteps) * broadcast_array
91 timestep_multiplier = np.atleast_1d(timestep_multiplier) * broadcast_array
93 # Zip through the arrays
94 perioddata = []
95 for perlen, nstp, tsmult in zip(
96 timestep_duration, n_timesteps, timestep_multiplier
97 ):
98 perioddata.append((perlen, nstp, tsmult))
99 d["perioddata"] = perioddata
101 return self._template.render(d)
103 def _structured_grid_dim_check(self, da):
104 if da.ndim == 0:
105 return # Scalar, no check necessary
106 elif da.ndim == 1:
107 if da.dims != ("time",):
108 raise ValueError(
109 f"1D DataArray dims can only be ('time',). "
110 f"Instead got {da.dims} for {da.name} in the "
111 f"{self.__class__.__name__} package. "
112 )
113 else:
114 raise ValueError(
115 f"Exceeded accepted amount of dimensions for "
116 f"for {da.name} in the "
117 f"{self.__class__.__name__} package. "
118 f"Got {da.dims}. Can be at max ('time', )."
119 )
121 def write(self, directory, name):
122 timedis_content = self.render()
123 timedis_path = directory / f"{name}.tdis"
124 with open(timedis_path, "w") as f:
125 f.write(timedis_content)