Coverage for C:\src\imod-python\imod\mf6\out\cbc.py: 93%
121 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-08 13:27 +0200
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-08 13:27 +0200
1"""
2Cell-by-cell flows
3"""
5import os
6import struct
7from collections import defaultdict
8from typing import Any, BinaryIO, Dict, List, NamedTuple, Tuple, Union
10import dask
11import numpy as np
12import xarray as xr
14from .common import FilePath, FloatArray
17class Imeth1Header(NamedTuple):
18 kstp: int
19 kper: int
20 text: str
21 ndim1: int
22 ndim2: int
23 ndim3: int
24 imeth: int
25 delt: float
26 pertim: float
27 totim: float
28 pos: int
31class Imeth6Header(NamedTuple):
32 kstp: int
33 kper: int
34 text: str
35 ndim1: int
36 ndim2: int
37 ndim3: int
38 imeth: int
39 delt: float
40 pertim: float
41 totim: float
42 pos: int
43 txt1id1: str
44 txt2id1: str
45 txt1id2: str
46 txt2id2: str
47 ndat: int
48 auxtxt: List[str]
49 nlist: int
52def read_common_cbc_header(f: BinaryIO) -> Dict[str, Any]:
53 """
54 Read the common part (shared by imeth=1 and imeth6) of a CBC header section.
55 """
56 content = {}
57 content["kstp"] = struct.unpack("i", f.read(4))[0]
58 content["kper"] = struct.unpack("i", f.read(4))[0]
59 content["text"] = f.read(16).decode("utf-8").strip().lower()
60 content["ndim1"] = struct.unpack("i", f.read(4))[0]
61 content["ndim2"] = struct.unpack("i", f.read(4))[0]
62 content["ndim3"] = struct.unpack("i", f.read(4))[0]
63 content["imeth"] = struct.unpack("i", f.read(4))[0]
64 content["delt"] = struct.unpack("d", f.read(8))[0]
65 content["pertim"] = struct.unpack("d", f.read(8))[0]
66 content["totim"] = struct.unpack("d", f.read(8))[0]
67 return content
70def read_imeth6_header(f: BinaryIO) -> Dict[str, Any]:
71 """
72 Read the imeth=6 specific data of a CBC header section.
73 """
74 content = {}
75 content["txt1id1"] = f.read(16).decode("utf-8").strip().lower()
76 content["txt2id1"] = f.read(16).decode("utf-8").strip().lower()
77 content["txt1id2"] = f.read(16).decode("utf-8").strip().lower()
78 content["txt2id2"] = f.read(16).decode("utf-8").strip().lower()
79 ndat = struct.unpack("i", f.read(4))[0]
80 content["ndat"] = ndat
81 content["auxtxt"] = [
82 f.read(16).decode("utf-8").strip().lower() for _ in range(ndat - 1)
83 ]
84 content["nlist"] = struct.unpack("i", f.read(4))[0]
85 return content
88def read_cbc_headers(
89 cbc_path: FilePath,
90) -> Dict[str, List[Union[Imeth1Header, Imeth6Header]]]:
91 """
92 Read all the header data from a cell-by-cell (.cbc) budget file.
94 All budget data for a MODFLOW6 model is stored in a single file. This
95 function collects all header data, as well as the starting byte position of
96 the actual budget data.
98 This function groups the headers per TEXT record (e.g. "flow-ja-face",
99 "drn", etc.). The headers are stored as a list of named tuples.
100 flow-ja-face, storage-ss, and storage-sy are written using IMETH=1, all
101 others with IMETH=6.
103 Parameters
104 ----------
105 cbc_path: str, pathlib.Path
106 Path to the budget file.
108 Returns
109 -------
110 headers: Dict[List[UnionImeth1Header, Imeth6Header]]
111 Dictionary containing a list of headers per TEXT record in the budget
112 file.
113 """
114 headers = defaultdict(list)
115 with open(cbc_path, "rb") as f:
116 filesize = os.fstat(f.fileno()).st_size
117 while f.tell() < filesize:
118 header = read_common_cbc_header(f)
119 if header["imeth"] == 1:
120 # Multiply by -1 because ndim3 is stored as a negative for some reason.
121 # (ndim3 is the integer size of the third dimension)
122 datasize = (
123 header["ndim1"] * header["ndim2"] * header["ndim3"] * -1
124 ) * 8
125 header["pos"] = f.tell()
126 key = header["text"]
127 headers[key].append(Imeth1Header(**header))
128 elif header["imeth"] == 6:
129 imeth6_header = read_imeth6_header(f)
130 datasize = imeth6_header["nlist"] * (8 + imeth6_header["ndat"] * 8)
131 header["pos"] = f.tell()
132 key = imeth6_header["txt2id2"]
133 # npf-key can be present multiple times in cases of saved saturation + specific discharge
134 if header["text"].startswith("data-"):
135 key = key + "-" + header["text"].replace("data-", "")
136 headers[key].append(Imeth6Header(**header, **imeth6_header))
137 else:
138 raise ValueError(
139 f"Invalid imeth value in CBC file {cbc_path}. "
140 f"Should be 1 or 6, received: {header['imeth']}."
141 )
142 # Skip the data
143 f.seek(datasize, 1)
144 return headers
147def read_imeth1_budgets(cbc_path: FilePath, count: int, pos: int) -> FloatArray:
148 """
149 Read the data for an imeth=1 budget section.
151 Parameters
152 ----------
153 cbc_path: str, pathlib.Path
154 count: int
155 number of values to read
156 pos:
157 position in the file where the data for a timestep starts
159 Returns
160 -------
161 1-D array of floats
162 """
163 with open(cbc_path, "rb") as f:
164 f.seek(pos)
165 timestep_budgets = np.fromfile(f, np.float64, count)
166 return timestep_budgets
169def open_imeth1_budgets(
170 cbc_path: FilePath, header_list: List[Imeth1Header]
171) -> xr.DataArray:
172 """
173 Open the data for an imeth==1 budget section. Data is read lazily per
174 timestep. The cell data is not spatially labelled.
176 Parameters
177 ----------
178 cbc_path: str, pathlib.Path
179 header_list: List[Imeth1Header]
181 Returns
182 -------
183 xr.DataArray with dims ("time", "linear_index")
184 """
185 # Gather times from the headers
186 dask_list = []
187 time = np.empty(len(header_list), dtype=np.float64)
188 for i, header in enumerate(header_list):
189 time[i] = header.totim
190 count = header.ndim1 * header.ndim2 * header.ndim3 * -1
191 a = dask.delayed(read_imeth1_budgets)(cbc_path, count, header.pos)
192 x = dask.array.from_delayed(a, shape=(count,), dtype=np.float64)
193 dask_list.append(x)
195 return xr.DataArray(
196 data=dask.array.stack(dask_list, axis=0),
197 coords={"time": time},
198 dims=("time", "linear_index"),
199 name=header_list[0].text,
200 )
203def expand_indptr(ia) -> np.ndarray:
204 n = np.diff(ia)
205 return np.repeat(np.arange(ia.size - 1), n)
208def open_face_budgets_as_flowja(
209 cbc_path: FilePath, header_list: List[Imeth1Header], grb_content: Dict[str, Any]
210) -> Tuple[xr.DataArray, xr.DataArray]:
211 flowja = open_imeth1_budgets(cbc_path, header_list)
212 flowja = flowja.rename({"linear_index": "connection"})
213 n = expand_indptr(grb_content["ia"])
214 m = grb_content["ja"] - 1
215 nm = xr.DataArray(
216 np.column_stack([n, m]),
217 coords={"cell": ["n", "m"]},
218 dims=["connection", "cell"],
219 )
220 return flowja, nm
223def read_imeth6_budgets(
224 cbc_path: FilePath, count: int, dtype: np.dtype, pos: int
225) -> Any:
226 """
227 Read the data for an imeth==6 budget section for a single timestep.
229 Returns a numpy structured array containing:
230 * id1: the model cell number
231 * id2: the boundary condition index
232 * budget: the budget terms
233 * and assorted auxiliary columns, if present
235 Parameters
236 ----------
237 cbc_path: str, pathlib.Path
238 count: int
239 number of values to read
240 dtype: numpy dtype
241 Data type of the structured array. Contains at least "id1", "id2", and "budget".
242 Optionally contains auxiliary columns.
243 pos:
244 position in the file where the data for a timestep starts
246 Returns
247 -------
248 Numpy structured array of type dtype
249 """
250 with open(cbc_path, "rb") as f:
251 f.seek(pos)
252 table = np.fromfile(f, dtype, count)
253 return table
256def read_imeth6_budgets_dense(
257 cbc_path: FilePath,
258 count: int,
259 dtype: np.dtype,
260 pos: int,
261 size: int,
262 shape: tuple,
263 return_variable: str,
264) -> FloatArray:
265 """
266 Read the data for an imeth==6 budget section.
268 Utilizes the shape information from the DIS GRB file to create a dense numpy
269 array. Always allocates for the entire domain (all layers, rows, columns).
271 Parameters
272 ----------
273 cbc_path: str, pathlib.Path
274 count: int
275 number of values to read
276 dtype: numpy dtype
277 Data type of the structured array. Contains at least "id1", "id2", and "budget".
278 Optionally contains auxiliary columns.
279 pos: int
280 position in the file where the data for a timestep starts
281 size: int
282 size of the entire model domain
283 shape: tuple[int, int, int]
284 Shape (nlayer, nrow, ncolumn) of entire model domain.
286 Returns
287 -------
288 Three-dimensional array of floats
289 """
290 # Allocates a dense array for the entire domain
291 out = np.zeros(size, dtype=np.float64)
292 table = read_imeth6_budgets(cbc_path, count, dtype, pos)
293 id1 = table["id1"] - 1 # Convert to 0 based index
294 out[id1] = table[return_variable]
295 return out.reshape(shape)