Coverage for C:\src\imod-python\imod\mf6\multimodel\modelsplitter.py: 98%
53 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 14:15 +0200
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 14:15 +0200
1from typing import List, NamedTuple
3import numpy as np
5from imod.logging import logger
6from imod.mf6.auxiliary_variables import (
7 expand_transient_auxiliary_variables,
8 remove_expanded_auxiliary_variables_from_dataset,
9)
10from imod.mf6.boundary_condition import BoundaryCondition
11from imod.mf6.hfb import HorizontalFlowBarrierBase
12from imod.mf6.model import Modflow6Model
13from imod.mf6.utilities.clip import clip_by_grid
14from imod.mf6.utilities.grid import get_active_domain_slice
15from imod.mf6.wel import Well
16from imod.typing import GridDataArray
17from imod.typing.grid import is_unstructured, ones_like
19HIGH_LEVEL_PKGS = (HorizontalFlowBarrierBase, Well)
22class PartitionInfo(NamedTuple):
23 active_domain: GridDataArray
24 id: int
27def create_partition_info(submodel_labels: GridDataArray) -> List[PartitionInfo]:
28 """
29 A PartitionInfo is used to partition a model or package. The partition info's of a domain are created using a
30 submodel_labels array. The submodel_labels provided as input should have the same shape as a single layer of the
31 model grid (all layers are split the same way), and contains an integer value in each cell. Each cell in the
32 model grid will end up in the submodel with the index specified by the corresponding label of that cell. The
33 labels should be numbers between 0 and the number of partitions.
34 """
35 _validate_submodel_label_array(submodel_labels)
37 unique_labels = np.unique(submodel_labels.values)
39 partition_infos = []
40 for label_id in unique_labels:
41 active_domain = submodel_labels.where(submodel_labels.values == label_id)
42 active_domain = ones_like(active_domain).where(active_domain.notnull(), 0)
43 active_domain = active_domain.astype(submodel_labels.dtype)
45 submodel_partition_info = PartitionInfo(
46 id=label_id, active_domain=active_domain
47 )
48 partition_infos.append(submodel_partition_info)
50 return partition_infos
53def _validate_submodel_label_array(submodel_labels: GridDataArray) -> None:
54 unique_labels = np.unique(submodel_labels.values)
56 if not (
57 len(unique_labels) == unique_labels.max() + 1
58 and unique_labels.min() == 0
59 and np.issubdtype(submodel_labels.dtype, np.integer)
60 ):
61 raise ValueError(
62 "The submodel_label array should be integer and contain all the numbers between 0 and the number of "
63 "partitions minus 1."
64 )
67def slice_model(partition_info: PartitionInfo, model: Modflow6Model) -> Modflow6Model:
68 """
69 This function slices a Modflow6Model. A sliced model is a model that consists of packages of the original model
70 that are sliced using the domain_slice. A domain_slice can be created using the
71 :func:`imod.mf6.modelsplitter.create_domain_slices` function.
72 """
73 modelclass = type(model)
74 new_model = modelclass(**model._options)
75 domain_slice2d = get_active_domain_slice(partition_info.active_domain)
76 if is_unstructured(model.domain):
77 new_idomain = model.domain.sel(domain_slice2d)
78 else:
79 # store the original layer dimension
80 layer = model.domain.layer
82 sliced_domain_2D = partition_info.active_domain.sel(domain_slice2d)
83 # drop the dimensions we don't need from the 2D domain slice. It may have a single
84 # layer so we drop that as well
85 sliced_domain_2D = sliced_domain_2D.drop_vars(
86 ["dx", "dy", "layer"], errors="ignore"
87 )
88 # create the desired coodinates: the original layer dimension,and the x/y coordinates of the slice.
89 coords = dict(layer=layer, **sliced_domain_2D.coords)
91 # the new idomain is the selection on our coodinates and only the part active in sliced_domain_2D
92 new_idomain = model.domain.sel(coords).where(sliced_domain_2D, other=0)
94 for pkg_name, package in model.items():
95 if isinstance(package, BoundaryCondition):
96 remove_expanded_auxiliary_variables_from_dataset(package)
98 sliced_package = clip_by_grid(package, partition_info.active_domain)
100 sliced_package = sliced_package.mask(new_idomain)
101 # The masking can result in packages with AllNoData.Therefore we'll have
102 # to drop these packages.
103 if not sliced_package.is_empty():
104 new_model[pkg_name] = sliced_package
105 else:
106 logger.info(
107 f"package {pkg_name} removed in partition {partition_info.id}, because all empty"
108 )
110 if isinstance(package, BoundaryCondition):
111 expand_transient_auxiliary_variables(package)
112 return new_model