Coverage for C:\src\imod-python\imod\mf6\hfb.py: 94%

318 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 14:15 +0200

1import abc 

2import copy 

3import textwrap 

4import typing 

5from copy import deepcopy 

6from enum import Enum 

7from typing import TYPE_CHECKING, Optional, Tuple 

8 

9import cftime 

10import numpy as np 

11import xarray as xr 

12import xugrid as xu 

13from fastcore.dispatch import typedispatch 

14 

15from imod.logging import init_log_decorator 

16from imod.mf6.boundary_condition import BoundaryCondition 

17from imod.mf6.interfaces.ilinedatapackage import ILineDataPackage 

18from imod.mf6.mf6_hfb_adapter import Mf6HorizontalFlowBarrier 

19from imod.mf6.package import Package 

20from imod.mf6.utilities.grid import broadcast_to_full_domain 

21from imod.mf6.utilities.regrid import RegridderType 

22from imod.schemata import EmptyIndexesSchema 

23from imod.typing import GridDataArray 

24from imod.util.imports import MissingOptionalModule 

25 

26if TYPE_CHECKING: 

27 import geopandas as gpd 

28else: 

29 try: 

30 import geopandas as gpd 

31 except ImportError: 

32 gpd = MissingOptionalModule("geopandas") 

33 

34try: 

35 import shapely 

36except ImportError: 

37 shapely = MissingOptionalModule("shapely") 

38 

39 

40@typedispatch # type: ignore[no-redef] 

41def _derive_connected_cell_ids( 

42 idomain: xr.DataArray, grid: xu.Ugrid2d, edge_index: np.ndarray 

43): 

44 """ 

45 Derive the cell ids of the connected cells of an edge on a structured grid. 

46 

47 Parameters 

48 ---------- 

49 idomain: xr.DataArray 

50 The active domain 

51 grid : 

52 The unstructured grid of the domain 

53 edge_index : 

54 The indices of the edges from which the connected cell ids are computed 

55 

56 Returns A dataset containing the cell_id1 and cell_id2 data variables. The cell dimensions are stored in the 

57 cell_dims coordinates. 

58 ------- 

59 

60 """ 

61 edge_faces = grid.edge_face_connectivity 

62 cell2d = edge_faces[edge_index] 

63 

64 shape = (idomain["y"].size, idomain["x"].size) 

65 row_1, column_1 = np.unravel_index(cell2d[:, 0], shape) 

66 row_2, column_2 = np.unravel_index(cell2d[:, 1], shape) 

67 

68 cell_ids = xr.Dataset() 

69 

70 cell_ids["cell_id1"] = xr.DataArray( 

71 np.array([row_1 + 1, column_1 + 1]).T, 

72 coords={ 

73 "edge_index": edge_index, 

74 "cell_dims1": ["row_1", "column_1"], 

75 }, 

76 ) 

77 

78 cell_ids["cell_id2"] = xr.DataArray( 

79 np.array([row_2 + 1, column_2 + 1]).T, 

80 coords={ 

81 "edge_index": edge_index, 

82 "cell_dims2": ["row_2", "column_2"], 

83 }, 

84 ) 

85 

86 return cell_ids 

87 

88 

89@typedispatch # type: ignore[no-redef] 

90def _derive_connected_cell_ids( 

91 _: xu.UgridDataArray, grid: xu.Ugrid2d, edge_index: np.ndarray 

92): 

93 """ 

94 Derive the cell ids of the connected cells of an edge on an unstructured grid. 

95 

96 Parameters 

97 ---------- 

98 grid : 

99 The unstructured grid of the domain 

100 edge_index : 

101 The indices of the edges from which the connected cell ids are computed 

102 

103 Returns A dataset containing the cell_id1 and cell_id2 data variables. The cell dimensions are stored in the 

104 cell_dims coordinates. 

105 ------- 

106 

107 """ 

108 edge_faces = grid.edge_face_connectivity 

109 cell2d = edge_faces[edge_index] 

110 

111 cell2d_1 = cell2d[:, 0] 

112 cell2d_2 = cell2d[:, 1] 

113 

114 cell_ids = xr.Dataset() 

115 

116 cell_ids["cell_id1"] = xr.DataArray( 

117 np.array([cell2d_1 + 1]).T, 

118 coords={ 

119 "edge_index": edge_index, 

120 "cell_dims1": ["cell2d_1"], 

121 }, 

122 ) 

123 

124 cell_ids["cell_id2"] = xr.DataArray( 

125 np.array([cell2d_2 + 1]).T, 

126 coords={ 

127 "edge_index": edge_index, 

128 "cell_dims2": ["cell2d_2"], 

129 }, 

130 ) 

131 

132 return cell_ids 

133 

134 

135def to_connected_cells_dataset( 

136 idomain: GridDataArray, 

137 grid: xu.Ugrid2d, 

138 edge_index: np.ndarray, 

139 edge_values: dict, 

140) -> xr.Dataset: 

141 """ 

142 Converts a cell edge grid with values defined on the edges to a dataset with the cell ids of the connected cells, 

143 the layer of the cells and the value of the edge. The connected cells are returned in cellid notation e.g.(row, 

144 column) for structured grid, (mesh2d_nFaces) for unstructured grids. 

145 

146 Parameters 

147 ---------- 

148 idomain: GridDataArray 

149 active domain 

150 grid: xu.Ugrid2d, 

151 unstructured grid containing the edge to cell array 

152 edge_index: np.ndarray 

153 indices of the edges for which the edge values will be converted to values in the connected cells 

154 edge_values: dict 

155 dictionary containing the value name and the edge values that are applied on the edges identified by the 

156 edge_index 

157 

158 Returns 

159 ---------- 

160 a dataset containing: 

161 - cell_id1 

162 - cell_id2 

163 - layer 

164 - value name 

165 

166 """ 

167 barrier_dataset = _derive_connected_cell_ids(idomain, grid, edge_index) 

168 

169 for name, values in edge_values.items(): 

170 barrier_dataset[name] = xr.DataArray( 

171 values, 

172 dims=["layer", "edge_index"], 

173 coords={ 

174 "edge_index": edge_index, 

175 "layer": values.coords["layer"], 

176 }, 

177 ) 

178 

179 barrier_dataset = ( 

180 barrier_dataset.stack(cell_id=("layer", "edge_index"), create_index=False) 

181 .drop_vars("edge_index") 

182 .reset_coords() 

183 ) 

184 

185 return barrier_dataset.dropna("cell_id") 

186 

187 

188def _fraction_layer_overlap( 

189 snapped_dataset: xu.UgridDataset, 

190 edge_index: np.ndarray, 

191 top: xu.UgridDataArray, 

192 bottom: xu.UgridDataArray, 

193): 

194 """ 

195 Computes the fraction a barrier occupies inside a layer. 

196 """ 

197 left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T 

198 top_mean = _mean_left_and_right(top, left, right) 

199 bottom_mean = _mean_left_and_right(bottom, left, right) 

200 

201 n_layer, n_edge = top_mean.shape 

202 layer_bounds = np.empty((n_edge, n_layer, 2), dtype=float) 

203 layer_bounds[..., 0] = typing.cast(np.ndarray, bottom_mean.values).T 

204 layer_bounds[..., 1] = typing.cast(np.ndarray, top_mean.values).T 

205 

206 hfb_bounds = np.empty((n_edge, n_layer, 2), dtype=float) 

207 hfb_bounds[..., 0] = ( 

208 snapped_dataset["zbottom"].values[edge_index].reshape(n_edge, 1) 

209 ) 

210 hfb_bounds[..., 1] = snapped_dataset["ztop"].values[edge_index].reshape(n_edge, 1) 

211 

212 overlap = _vectorized_overlap(hfb_bounds, layer_bounds) 

213 height = layer_bounds[..., 1] - layer_bounds[..., 0] 

214 # Avoid runtime warnings when diving by 0: 

215 height[height <= 0] = np.nan 

216 fraction = (overlap / height).T 

217 

218 return xr.ones_like(top_mean) * fraction 

219 

220 

221def _mean_left_and_right( 

222 cell_values: xu.UgridDataArray, left: np.ndarray, right: np.ndarray 

223) -> xr.Dataset: 

224 """ 

225 This method computes the mean value of cell pairs. The left array specifies the first cell, the right array 

226 the second cells. The mean is computed by (first_cell+second_cell/2.0) 

227 

228 Parameters 

229 ---------- 

230 cell_values: xu.UgridDataArray 

231 The array containing the data values of all the cells 

232 left : 

233 The array containing indices to the first cells 

234 right : 

235 The array containing indices to the second cells 

236 

237 Returns 

238 ------- 

239 The means of the cells 

240 

241 """ 

242 facedim = cell_values.ugrid.grid.face_dimension 

243 uda_left = cell_values.ugrid.obj.isel({facedim: left}).drop_vars(facedim) 

244 uda_right = cell_values.ugrid.obj.isel({facedim: right}).drop_vars(facedim) 

245 

246 return xr.concat((uda_left, uda_right), dim="two").mean("two") 

247 

248 

249def _vectorized_overlap(bounds_a: np.ndarray, bounds_b: np.ndarray): 

250 """ 

251 Vectorized overlap computation. Returns the overlap of 2 vectors along the same axis. 

252 If there is no overlap zero will be returned. 

253 

254 b1 

255 

256 a1 | 

257 ▲ | 

258 | | 

259 | ▼ 

260 ▼ b0 

261 a0 

262 

263 To compute the overlap of the 2 vectors the maximum of a0,b0, is subtracted from the minimum of a1,b1. 

264 

265 Compare with: 

266 

267 overlap = max(0, min(a[1], b[1]) - max(a[0], b[0])) 

268 """ 

269 return np.maximum( 

270 0.0, 

271 np.minimum(bounds_a[..., 1], bounds_b[..., 1]) 

272 - np.maximum(bounds_a[..., 0], bounds_b[..., 0]), 

273 ) 

274 

275 

276class BarrierType(Enum): 

277 HydraulicCharacteristic = 0 

278 Multiplier = 1 

279 Resistance = 2 

280 

281 

282class HorizontalFlowBarrierBase(BoundaryCondition, ILineDataPackage): 

283 _pkg_id = "hfb" 

284 

285 _period_data = () 

286 _init_schemata = {} 

287 _write_schemata = {"geometry": [EmptyIndexesSchema()]} 

288 

289 _regrid_method: dict[str, Tuple[RegridderType, str]] = {} 

290 

291 def __init__( 

292 self, 

293 geometry: "gpd.GeoDataFrame", 

294 print_input: bool = False, 

295 ) -> None: 

296 dict_dataset = {"print_input": print_input} 

297 super().__init__(dict_dataset) 

298 

299 self.line_data = geometry 

300 

301 def get_regrid_methods(self) -> Optional[dict[str, Tuple[RegridderType, str]]]: 

302 return self._regrid_method 

303 

304 def _get_variable_names_for_gdf(self) -> list[str]: 

305 return [ 

306 self._get_variable_name(), 

307 "geometry", 

308 ] + self._get_vertical_variables() 

309 

310 @property 

311 def line_data(self) -> "gpd.GeoDataFrame": 

312 variables_for_gdf = self._get_variable_names_for_gdf() 

313 return gpd.GeoDataFrame( 

314 self.dataset[variables_for_gdf].to_dataframe(), 

315 geometry="geometry", 

316 ) 

317 

318 @line_data.setter 

319 def line_data(self, value: "gpd.GeoDataFrame") -> None: 

320 variables_for_gdf = self._get_variable_names_for_gdf() 

321 self.dataset = self.dataset.merge( 

322 value.to_xarray(), overwrite_vars=variables_for_gdf, join="right" 

323 ) 

324 

325 def render(self, directory, pkgname, globaltimes, binary): 

326 raise NotImplementedError( 

327 f"""{self.__class__.__name__} is a grid-agnostic package and does not have a render method. To render the 

328 package, first convert to a Modflow6 package by calling pkg.to_mf6_pkg()""" 

329 ) 

330 

331 def _netcdf_encoding(self): 

332 return {"geometry": {"dtype": "str"}} 

333 

334 @classmethod 

335 def from_file(cls, path, **kwargs): 

336 instance = super().from_file(path, **kwargs) 

337 instance.dataset["geometry"] = shapely.wkt.loads(instance.dataset["geometry"]) 

338 

339 return instance 

340 

341 def _compute_barrier_values( 

342 self, snapped_dataset, edge_index, idomain, top, bottom, k 

343 ): 

344 raise NotImplementedError() 

345 

346 def to_mf6_pkg( 

347 self, 

348 idomain: GridDataArray, 

349 top: GridDataArray, 

350 bottom: GridDataArray, 

351 k: GridDataArray, 

352 validate: bool = False, 

353 ) -> Mf6HorizontalFlowBarrier: 

354 """ 

355 Write package to Modflow 6 package. 

356 

357 Based on the model grid, top and bottoms, the layers in which the barrier belong are computed. If the 

358 barrier only partially occupies a layer an effective resistance or hydraulic conductivity for that layer is 

359 calculated. This calculation is skipped for the Multiplier type. 

360 

361 Parameters 

362 ---------- 

363 idomain: GridDataArray 

364 Grid with active cells. 

365 top: GridDataArray 

366 Grid with top of model layers. 

367 bottom: GridDataArray 

368 Grid with bottom of model layers. 

369 k: GridDataArray 

370 Grid with hydraulic conductivities. 

371 validate: bool 

372 Run validation before converting 

373 

374 Returns 

375 ------- 

376 

377 """ 

378 if validate: 

379 self._validate(self._write_schemata) 

380 

381 top, bottom = broadcast_to_full_domain(idomain, top, bottom) 

382 k = idomain * k 

383 unstructured_grid, top, bottom, k = ( 

384 self.__to_unstructured(idomain, top, bottom, k) 

385 if isinstance(idomain, xr.DataArray) 

386 else [idomain, top, bottom, k] 

387 ) 

388 snapped_dataset, edge_index = self.__snap_to_grid(idomain) 

389 edge_index = self.__remove_invalid_edges(unstructured_grid, edge_index) 

390 

391 barrier_values = self._compute_barrier_values( 

392 snapped_dataset, edge_index, idomain, top, bottom, k 

393 ) 

394 barrier_values = self.__remove_edge_values_connected_to_inactive_cells( 

395 barrier_values, unstructured_grid, edge_index 

396 ) 

397 

398 if (barrier_values.size == 0) | np.isnan(barrier_values).all(): 

399 raise ValueError( 

400 textwrap.dedent( 

401 """ 

402 No barriers could be assigned to cell edges, 

403 this is caused by either one of the following: 

404 \t- Barriers fall completely outside the model grid 

405 \t- Barriers were assigned to the edge of the model domain 

406 \t- Barriers were assigned to edges of inactive cells 

407 """ 

408 ) 

409 ) 

410 

411 barrier_dataset = typing.cast( 

412 xr.Dataset, 

413 to_connected_cells_dataset( 

414 idomain, 

415 unstructured_grid.ugrid.grid, 

416 edge_index, 

417 { 

418 "hydraulic_characteristic": self.__to_hydraulic_characteristic( 

419 barrier_values 

420 ) 

421 }, 

422 ), 

423 ) 

424 

425 barrier_dataset["print_input"] = self.dataset["print_input"] 

426 

427 return Mf6HorizontalFlowBarrier(**barrier_dataset.data_vars) 

428 

429 def is_empty(self) -> bool: 

430 if super().is_empty(): 

431 return True 

432 

433 linestrings = self.dataset["geometry"] 

434 only_empty_lines = all(ls.is_empty for ls in linestrings.values) 

435 return only_empty_lines 

436 

437 def _resistance_layer( 

438 self, 

439 snapped_dataset: xu.UgridDataset, 

440 edge_index: np.ndarray, 

441 idomain: xu.UgridDataArray, 

442 ) -> xr.DataArray: 

443 """ 

444 Returns layered xarray with barrier resistance distributed over layers 

445 """ 

446 hfb_resistance = snapped_dataset[self._get_variable_name()].values[edge_index] 

447 hfb_layer = snapped_dataset["layer"].values[edge_index] 

448 nlay = idomain.layer.size 

449 model_layer = np.repeat(idomain.layer.values, hfb_resistance.size).reshape( 

450 (nlay, hfb_resistance.size) 

451 ) 

452 data = np.where(model_layer == hfb_layer, hfb_resistance, np.nan) 

453 return xr.DataArray( 

454 data=data, 

455 coords={ 

456 "layer": np.arange(nlay) + 1, 

457 }, 

458 dims=["layer", "mesh2d_nFaces"], 

459 ) 

460 

461 def _resistance_layer_overlap( 

462 self, 

463 snapped_dataset: xu.UgridDataset, 

464 edge_index: np.ndarray, 

465 top: xu.UgridDataArray, 

466 bottom: xu.UgridDataArray, 

467 k: xu.UgridDataArray, 

468 ) -> xr.DataArray: 

469 """ 

470 Computes the effective value of a barrier that partially overlaps a cell in the z direction. 

471 A barrier always lies on an edge in the xy-plane, however in doesn't have to fully extend in the z-direction to 

472 cover the entire layer. This method computes the effective resistance in that case. 

473 

474 Barrier 

475 ...................................... ▲ ▲ 

476 . @@@@ . | | 

477 . @Rb@ . | Lb | 

478 . Cell1 @@@@ Cell2 . ▼ | H 

479 . : : . | 

480 . : : . | 

481 .................:..:................. ▼ 

482 k1 k2 

483 

484 The effective value of a partially emerged barrier in a layer is computed by: 

485 c_total = 1.0 / (fraction / Rb + (1.0 - fraction) / c_aquifer) 

486 c_aquifer = 1.0 / k_mean = 1.0 / ((k1 + k2) / 2.0) 

487 fraction = Lb / H 

488 

489 """ 

490 left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T 

491 k_mean = _mean_left_and_right(k, left, right) 

492 

493 resistance = self.__to_resistance( 

494 snapped_dataset[self._get_variable_name()] 

495 ).values[edge_index] 

496 

497 fraction = _fraction_layer_overlap(snapped_dataset, edge_index, top, bottom) 

498 

499 c_aquifer = 1.0 / k_mean 

500 inverse_c = (fraction / resistance) + ((1.0 - fraction) / c_aquifer) 

501 c_total = 1.0 / inverse_c 

502 

503 return self.__from_resistance(c_total) 

504 

505 def __to_resistance(self, value: xu.UgridDataArray) -> xu.UgridDataArray: 

506 match self._get_barrier_type(): 

507 case BarrierType.HydraulicCharacteristic: 

508 return 1.0 / value # type: ignore 

509 case BarrierType.Multiplier: 

510 return -1.0 / value # type: ignore 

511 case BarrierType.Resistance: 

512 return value 

513 

514 raise ValueError(r"Unknown barrier type {barrier_type}") 

515 

516 def __from_resistance(self, resistance: xr.DataArray) -> xr.DataArray: 

517 match self._get_barrier_type(): 

518 case BarrierType.HydraulicCharacteristic: 

519 return 1.0 / resistance 

520 case BarrierType.Multiplier: 

521 return -1.0 / resistance 

522 case BarrierType.Resistance: 

523 return resistance 

524 

525 raise ValueError(r"Unknown barrier type {barrier_type}") 

526 

527 def __to_hydraulic_characteristic(self, value: xr.DataArray) -> xr.DataArray: 

528 match self._get_barrier_type(): 

529 case BarrierType.HydraulicCharacteristic: 

530 return value 

531 case BarrierType.Multiplier: 

532 return -1.0 * value 

533 case BarrierType.Resistance: 

534 return 1.0 / value 

535 

536 raise ValueError(r"Unknown barrier type {barrier_type}") 

537 

538 @abc.abstractmethod 

539 def _get_barrier_type(self) -> BarrierType: 

540 raise NotImplementedError 

541 

542 @abc.abstractmethod 

543 def _get_variable_name(self) -> str: 

544 raise NotImplementedError 

545 

546 @abc.abstractmethod 

547 def _get_vertical_variables(self) -> list: 

548 raise NotImplementedError 

549 

550 def clip_box( 

551 self, 

552 time_min: Optional[cftime.datetime | np.datetime64 | str] = None, 

553 time_max: Optional[cftime.datetime | np.datetime64 | str] = None, 

554 layer_min: Optional[int] = None, 

555 layer_max: Optional[int] = None, 

556 x_min: Optional[float] = None, 

557 x_max: Optional[float] = None, 

558 y_min: Optional[float] = None, 

559 y_max: Optional[float] = None, 

560 top: Optional[GridDataArray] = None, 

561 bottom: Optional[GridDataArray] = None, 

562 ) -> "HorizontalFlowBarrierBase": 

563 """ 

564 Clip a package by a bounding box (time, layer, y, x). 

565 

566 Slicing intervals may be half-bounded, by providing None: 

567 

568 * To select 500.0 <= x <= 1000.0: 

569 ``clip_box(x_min=500.0, x_max=1000.0)``. 

570 * To select x <= 1000.0: ``clip_box(x_min=None, x_max=1000.0)`` 

571 or ``clip_box(x_max=1000.0)``. 

572 * To select x >= 500.0: ``clip_box(x_min = 500.0, x_max=None.0)`` 

573 or ``clip_box(x_min=1000.0)``. 

574 

575 Parameters 

576 ---------- 

577 time_min: optional 

578 time_max: optional 

579 layer_min: optional, int 

580 layer_max: optional, int 

581 x_min: optional, float 

582 x_max: optional, float 

583 y_min: optional, float 

584 y_max: optional, float 

585 top: optional, GridDataArray 

586 bottom: optional, GridDataArray 

587 

588 Returns 

589 ------- 

590 sliced : Package 

591 """ 

592 cls = type(self) 

593 new = cls.__new__(cls) 

594 new.dataset = copy.deepcopy(self.dataset) 

595 return new 

596 

597 def mask(self, _) -> Package: 

598 """ 

599 The mask method is irrelevant for this package as it is 

600 grid-agnostic, instead this method retuns a copy of itself. 

601 """ 

602 return deepcopy(self) 

603 

604 @staticmethod 

605 def __to_unstructured( 

606 idomain: xr.DataArray, top: xr.DataArray, bottom: xr.DataArray, k: xr.DataArray 

607 ) -> Tuple[ 

608 xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray 

609 ]: 

610 unstruct = xu.UgridDataArray.from_structured(idomain) 

611 top = xu.UgridDataArray.from_structured(top) 

612 bottom = xu.UgridDataArray.from_structured(bottom) 

613 k = xu.UgridDataArray.from_structured(k) 

614 

615 return unstruct, top, bottom, k 

616 

617 def __snap_to_grid( 

618 self, idomain: GridDataArray 

619 ) -> Tuple[xu.UgridDataset, np.ndarray]: 

620 if "layer" in self.dataset: 

621 variable_names = [self._get_variable_name(), "geometry", "layer"] 

622 else: 

623 variable_names = [self._get_variable_name(), "geometry", "ztop", "zbottom"] 

624 barrier_dataframe = self.dataset[variable_names].to_dataframe() 

625 

626 snapped_dataset, _ = typing.cast( 

627 xu.UgridDataset, 

628 xu.snap_to_grid(barrier_dataframe, grid=idomain, max_snap_distance=0.5), 

629 ) 

630 edge_index = np.argwhere( 

631 snapped_dataset[self._get_variable_name()].notnull().values 

632 ).ravel() 

633 

634 return snapped_dataset, edge_index 

635 

636 @staticmethod 

637 def __remove_invalid_edges( 

638 unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray 

639 ) -> np.ndarray: 

640 """ 

641 Remove invalid edges indices. An edge is considered invalid when: 

642 - it lies on an exterior boundary (face_connectivity equals the grid fill value) 

643 - The corresponding connected cells are inactive 

644 """ 

645 grid = unstructured_grid.ugrid.grid 

646 face_dimension = unstructured_grid.ugrid.grid.face_dimension 

647 face_connectivity = grid.edge_face_connectivity[edge_index] 

648 

649 valid_edges = np.all(face_connectivity != grid.fill_value, axis=1) 

650 

651 connected_cells = -np.ones((len(edge_index), 2)) 

652 connected_cells[valid_edges, 0] = ( 

653 unstructured_grid.sel(layer=1) 

654 .loc[{face_dimension: face_connectivity[valid_edges, 0]}] 

655 .values 

656 ) 

657 connected_cells[valid_edges, 1] = ( 

658 unstructured_grid.sel(layer=1) 

659 .loc[{face_dimension: face_connectivity[valid_edges, 1]}] 

660 .values 

661 ) 

662 

663 valid = (connected_cells > 0).all(axis=1) 

664 

665 return edge_index[valid] 

666 

667 @staticmethod 

668 def __remove_edge_values_connected_to_inactive_cells( 

669 values, unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray 

670 ): 

671 face_dimension = unstructured_grid.ugrid.grid.face_dimension 

672 

673 face_connectivity = unstructured_grid.ugrid.grid.edge_face_connectivity[ 

674 edge_index 

675 ] 

676 connected_cells_left = unstructured_grid.loc[ 

677 {face_dimension: face_connectivity[:, 0]} 

678 ] 

679 connected_cells_right = unstructured_grid.loc[ 

680 {face_dimension: face_connectivity[:, 1]} 

681 ] 

682 

683 return values.where( 

684 (connected_cells_left.drop(face_dimension) > 0) 

685 & (connected_cells_right.drop(face_dimension) > 0) 

686 ) 

687 

688 

689class HorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase): 

690 """ 

691 Horizontal Flow Barrier (HFB) package 

692 

693 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

694 that has type "HFB6" in the Name File. Only one HFB Package can be 

695 specified for a GWF model. 

696 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

697 

698 Parameters 

699 ---------- 

700 geometry: gpd.GeoDataFrame 

701 Dataframe that describes: 

702 - geometry: the geometries of the barriers, 

703 - hydraulic_characteristic: the hydraulic characteristic of the barriers 

704 - ztop: the top z-value of the barriers 

705 - zbottom: the bottom z-value of the barriers 

706 print_input: bool 

707 

708 Examples 

709 -------- 

710 

711 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

712 >>> barrier_y = [500.0, 250.0, 500.0] 

713 >>> barrier_gdf = gpd.GeoDataFrame( 

714 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

715 >>> data={ 

716 >>> "hydraulic_characteristic": [1e-3,], 

717 >>> "ztop": [10.0,], 

718 >>> "zbottom": [0.0,], 

719 >>> }, 

720 >>> ) 

721 >>> hfb = imod.mf6.HorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf) 

722 

723 """ 

724 

725 @init_log_decorator() 

726 def __init__( 

727 self, 

728 geometry: "gpd.GeoDataFrame", 

729 print_input=False, 

730 ): 

731 super().__init__(geometry, print_input) 

732 

733 def _get_barrier_type(self): 

734 return BarrierType.HydraulicCharacteristic 

735 

736 def _get_variable_name(self) -> str: 

737 return "hydraulic_characteristic" 

738 

739 def _get_vertical_variables(self) -> list: 

740 return ["ztop", "zbottom"] 

741 

742 def _compute_barrier_values( 

743 self, snapped_dataset, edge_index, idomain, top, bottom, k 

744 ): 

745 barrier_values = self._resistance_layer_overlap( 

746 snapped_dataset, edge_index, top, bottom, k 

747 ) 

748 

749 return barrier_values 

750 

751 

752class LayeredHorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase): 

753 """ 

754 Horizontal Flow Barrier (HFB) package 

755 

756 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

757 that has type "HFB6" in the Name File. Only one HFB Package can be 

758 specified for a GWF model. 

759 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

760 

761 Parameters 

762 ---------- 

763 geometry: gpd.GeoDataFrame 

764 Dataframe that describes: 

765 - geometry: the geometries of the barriers, 

766 - hydraulic_characteristic: the hydraulic characteristic of the barriers 

767 - layer: model layer for the barrier 

768 print_input: bool 

769 

770 Examples 

771 -------- 

772 

773 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

774 >>> barrier_y = [500.0, 250.0, 500.0] 

775 >>> barrier_gdf = gpd.GeoDataFrame( 

776 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

777 >>> data={ 

778 >>> "hydraulic_characteristic": [1e-3,], 

779 >>> "layer": [1,] 

780 >>> }, 

781 >>> ) 

782 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf) 

783 

784 """ 

785 

786 @init_log_decorator() 

787 def __init__( 

788 self, 

789 geometry: "gpd.GeoDataFrame", 

790 print_input=False, 

791 ): 

792 super().__init__(geometry, print_input) 

793 

794 def _get_barrier_type(self): 

795 return BarrierType.HydraulicCharacteristic 

796 

797 def _get_variable_name(self) -> str: 

798 return "hydraulic_characteristic" 

799 

800 def _get_vertical_variables(self) -> list: 

801 return ["layer"] 

802 

803 def _compute_barrier_values( 

804 self, snapped_dataset, edge_index, idomain, top, bottom, k 

805 ): 

806 barrier_values = self._resistance_layer( 

807 snapped_dataset, 

808 edge_index, 

809 idomain, 

810 ) 

811 

812 return barrier_values 

813 

814 

815class HorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase): 

816 """ 

817 Horizontal Flow Barrier (HFB) package 

818 

819 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

820 that has type "HFB6" in the Name File. Only one HFB Package can be 

821 specified for a GWF model. 

822 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

823 

824 If parts of the barrier overlap a layer the multiplier is applied to the entire layer. 

825 

826 Parameters 

827 ---------- 

828 geometry: gpd.GeoDataFrame 

829 Dataframe that describes: 

830 - geometry: the geometries of the barriers, 

831 - multiplier: the multiplier of the barriers 

832 - ztop: the top z-value of the barriers 

833 - zbottom: the bottom z-value of the barriers 

834 print_input: bool 

835 

836 Examples 

837 -------- 

838 

839 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

840 >>> barrier_y = [500.0, 250.0, 500.0] 

841 >>> barrier_gdf = gpd.GeoDataFrame( 

842 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

843 >>> data={ 

844 >>> "multiplier": [1.5,], 

845 >>> "ztop": [10.0,], 

846 >>> "zbottom": [0.0,], 

847 >>> }, 

848 >>> ) 

849 >>> hfb = imod.mf6.HorizontalFlowBarrierMultiplier(barrier_gdf) 

850 

851 """ 

852 

853 @init_log_decorator() 

854 def __init__( 

855 self, 

856 geometry: "gpd.GeoDataFrame", 

857 print_input=False, 

858 ): 

859 super().__init__(geometry, print_input) 

860 

861 def _get_barrier_type(self): 

862 return BarrierType.Multiplier 

863 

864 def _get_variable_name(self) -> str: 

865 return "multiplier" 

866 

867 def _get_vertical_variables(self) -> list: 

868 return ["ztop", "zbottom"] 

869 

870 def _compute_barrier_values( 

871 self, snapped_dataset, edge_index, idomain, top, bottom, k 

872 ): 

873 fraction = _fraction_layer_overlap(snapped_dataset, edge_index, top, bottom) 

874 

875 barrier_values = ( 

876 fraction.where(fraction) 

877 * snapped_dataset[self._get_variable_name()].values[edge_index] 

878 ) 

879 

880 return barrier_values 

881 

882 

883class LayeredHorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase): 

884 """ 

885 Horizontal Flow Barrier (HFB) package 

886 

887 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

888 that has type "HFB6" in the Name File. Only one HFB Package can be 

889 specified for a GWF model. 

890 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

891 

892 If parts of the barrier overlap a layer the multiplier is applied to the entire layer. 

893 

894 Parameters 

895 ---------- 

896 geometry: gpd.GeoDataFrame 

897 Dataframe that describes: 

898 - geometry: the geometries of the barriers, 

899 - multiplier: the multiplier of the barriers 

900 - layer: model layer for the barrier 

901 print_input: bool 

902 

903 Examples 

904 -------- 

905 

906 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

907 >>> barrier_y = [500.0, 250.0, 500.0] 

908 >>> barrier_gdf = gpd.GeoDataFrame( 

909 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

910 >>> data={ 

911 >>> "multiplier": [1.5,], 

912 >>> "layer": [1,], 

913 >>> }, 

914 >>> ) 

915 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierMultiplier(barrier_gdf) 

916 

917 """ 

918 

919 @init_log_decorator() 

920 def __init__( 

921 self, 

922 geometry: "gpd.GeoDataFrame", 

923 print_input=False, 

924 ): 

925 super().__init__(geometry, print_input) 

926 

927 def _get_barrier_type(self): 

928 return BarrierType.Multiplier 

929 

930 def _get_variable_name(self) -> str: 

931 return "multiplier" 

932 

933 def _get_vertical_variables(self) -> list: 

934 return ["layer"] 

935 

936 def _compute_barrier_values( 

937 self, snapped_dataset, edge_index, idomain, top, bottom, k 

938 ): 

939 barrier_values = self.__multiplier_layer(snapped_dataset, edge_index, idomain) 

940 

941 return barrier_values 

942 

943 def __multiplier_layer( 

944 self, 

945 snapped_dataset: xu.UgridDataset, 

946 edge_index: np.ndarray, 

947 idomain: xu.UgridDataArray, 

948 ) -> xr.DataArray: 

949 """ 

950 Returns layered xarray with barrier multiplier distrubuted over layers 

951 """ 

952 hfb_multiplier = snapped_dataset[self._get_variable_name()].values[edge_index] 

953 hfb_layer = snapped_dataset["layer"].values[edge_index] 

954 nlay = idomain.layer.size 

955 model_layer = np.repeat(idomain.layer.values, hfb_multiplier.shape[0]).reshape( 

956 (nlay, hfb_multiplier.shape[0]) 

957 ) 

958 data = np.where(model_layer == hfb_layer, hfb_multiplier, np.nan) 

959 return xr.DataArray( 

960 data=data, 

961 coords={ 

962 "layer": np.arange(nlay) + 1, 

963 }, 

964 dims=["layer", "mesh2d_nFaces"], 

965 ) 

966 

967 

968class HorizontalFlowBarrierResistance(HorizontalFlowBarrierBase): 

969 """ 

970 Horizontal Flow Barrier (HFB) package 

971 

972 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

973 that has type "HFB6" in the Name File. Only one HFB Package can be 

974 specified for a GWF model. 

975 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

976 

977 Parameters 

978 ---------- 

979 geometry: gpd.GeoDataFrame 

980 Dataframe that describes: 

981 - geometry: the geometries of the barriers, 

982 - resistance: the resistance of the barriers 

983 - ztop: the top z-value of the barriers 

984 - zbottom: the bottom z-value of the barriers 

985 print_input: bool 

986 

987 Examples 

988 -------- 

989 

990 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

991 >>> barrier_y = [500.0, 250.0, 500.0] 

992 >>> barrier_gdf = gpd.GeoDataFrame( 

993 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

994 >>> data={ 

995 >>> "resistance": [1e3,], 

996 >>> "ztop": [10.0,], 

997 >>> "zbottom": [0.0,], 

998 >>> }, 

999 >>> ) 

1000 >>> hfb = imod.mf6.HorizontalFlowBarrierResistance(barrier_gdf) 

1001 

1002 

1003 """ 

1004 

1005 @init_log_decorator() 

1006 def __init__( 

1007 self, 

1008 geometry: "gpd.GeoDataFrame", 

1009 print_input=False, 

1010 ): 

1011 super().__init__(geometry, print_input) 

1012 

1013 def _get_barrier_type(self): 

1014 return BarrierType.Resistance 

1015 

1016 def _get_variable_name(self) -> str: 

1017 return "resistance" 

1018 

1019 def _get_vertical_variables(self) -> list: 

1020 return ["ztop", "zbottom"] 

1021 

1022 def _compute_barrier_values( 

1023 self, snapped_dataset, edge_index, idomain, top, bottom, k 

1024 ): 

1025 barrier_values = self._resistance_layer_overlap( 

1026 snapped_dataset, edge_index, top, bottom, k 

1027 ) 

1028 

1029 return barrier_values 

1030 

1031 

1032class LayeredHorizontalFlowBarrierResistance(HorizontalFlowBarrierBase): 

1033 """ 

1034 Horizontal Flow Barrier (HFB) package 

1035 

1036 Input to the Horizontal Flow Barrier (HFB) Package is read from the file 

1037 that has type "HFB6" in the Name File. Only one HFB Package can be 

1038 specified for a GWF model. 

1039 https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf 

1040 

1041 Parameters 

1042 ---------- 

1043 geometry: gpd.GeoDataFrame 

1044 Dataframe that describes: 

1045 - geometry: the geometries of the barriers, 

1046 - resistance: the resistance of the barriers 

1047 - layer: model layer for the barrier 

1048 print_input: bool 

1049 

1050 Examples 

1051 -------- 

1052 

1053 >>> barrier_x = [-1000.0, 0.0, 1000.0] 

1054 >>> barrier_y = [500.0, 250.0, 500.0] 

1055 >>> barrier_gdf = gpd.GeoDataFrame( 

1056 >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], 

1057 >>> data={ 

1058 >>> "resistance": [1e3,], 

1059 >>> "layer": [2,], 

1060 >>> }, 

1061 >>> ) 

1062 >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierResistance(barrier_gdf) 

1063 

1064 

1065 """ 

1066 

1067 @init_log_decorator() 

1068 def __init__( 

1069 self, 

1070 geometry: "gpd.GeoDataFrame", 

1071 print_input=False, 

1072 ): 

1073 super().__init__(geometry, print_input) 

1074 

1075 def _get_barrier_type(self): 

1076 return BarrierType.Resistance 

1077 

1078 def _get_variable_name(self) -> str: 

1079 return "resistance" 

1080 

1081 def _get_vertical_variables(self) -> list: 

1082 return ["layer"] 

1083 

1084 def _compute_barrier_values( 

1085 self, snapped_dataset, edge_index, idomain, top, bottom, k 

1086 ): 

1087 barrier_values = self._resistance_layer( 

1088 snapped_dataset, 

1089 edge_index, 

1090 idomain, 

1091 ) 

1092 

1093 return barrier_values