Coverage for C:\src\imod-python\imod\msw\pkgbase.py: 34%

58 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-08 10:26 +0200

1import abc 

2from pathlib import Path 

3from typing import Union 

4 

5import numpy as np 

6import pandas as pd 

7import xarray as xr 

8 

9from imod.msw.fixed_format import format_fixed_width 

10 

11 

12class MetaSwapPackage(abc.ABC): 

13 """ 

14 MetaSwapPackage is used to share methods for Metaswap packages. 

15 

16 It is not meant to be used directly, only to inherit from, to implement new 

17 packages. 

18 """ 

19 

20 __slots__ = "_pkg_id" 

21 

22 def __init__(self): 

23 self.dataset = xr.Dataset() 

24 

25 def __getitem__(self, key): 

26 return self.dataset.__getitem__(key) 

27 

28 def __setitem__(self, key, value): 

29 self.dataset.__setitem__(key, value) 

30 

31 def isel(self): 

32 raise NotImplementedError( 

33 f"Selection on packages not yet supported. " 

34 f"To make a selection on the xr.Dataset, " 

35 f"call {self._pkg_id}.dataset.isel instead. " 

36 f"You can create a new package with a selection by calling: " 

37 f"{__class__.__name__}(**{self._pkg_id}.dataset.isel(**selection))" 

38 ) 

39 

40 def sel(self): 

41 raise NotImplementedError( 

42 f"Selection on packages not yet supported. " 

43 f"To make a selection on the xr.Dataset, " 

44 f"call {self._pkg_id}.dataset.sel instead. " 

45 f"You can create a new package with a selection by calling: " 

46 f"{__class__.__name__}(**{self._pkg_id}.dataset.sel(**selection))" 

47 ) 

48 

49 def write(self, directory: Union[str, Path], index: np.ndarray, svat: xr.DataArray): 

50 """ 

51 Write MetaSWAP package to its corresponding fixed format file. This has 

52 the `.inp` extension. 

53 """ 

54 directory = Path(directory) 

55 

56 filename = directory / self._file_name 

57 with open(filename, "w") as f: 

58 self._render(f, index, svat) 

59 

60 def _check_range(self, dataframe): 

61 """ 

62 Check if provided data does not exceeds MetaSWAPs ranges. These ranges 

63 are specified in the ``self._metadata_dict`` for each variable. 

64 """ 

65 for varname in dataframe: 

66 min_value = self._metadata_dict[varname].min_value 

67 max_value = self._metadata_dict[varname].max_value 

68 if (dataframe[varname] < min_value).any() or ( 

69 dataframe[varname] > max_value 

70 ).any(): 

71 raise ValueError( 

72 f"{varname}: not all values are within range ({min_value}-{max_value})." 

73 ) 

74 

75 def write_dataframe_fixed_width(self, file, dataframe): 

76 """Write dataframe to fixed format file.""" 

77 for row in dataframe.itertuples(): 

78 for index, metadata in enumerate(self._metadata_dict.values()): 

79 content = format_fixed_width(row[index + 1], metadata) 

80 file.write(content) 

81 file.write("\n") 

82 

83 def _index_da(self, da, index): 

84 """ 

85 Helper method that converts a DataArray to a 1d numpy array, and 

86 consequently applies boolean indexing. 

87 """ 

88 return da.values.ravel()[index] 

89 

90 def _render(self, file, index, svat): 

91 """ 

92 Collect to be written data in a DataFrame and call 

93 ``self.write_dataframe_fixed_width`` 

94 """ 

95 data_dict = {"svat": svat.values.ravel()[index]} 

96 

97 subunit = svat.coords["subunit"] 

98 

99 for var in self._with_subunit: 

100 data_dict[var] = self._index_da(self.dataset[var], index) 

101 

102 for var in self._without_subunit: 

103 da = self.dataset[var].expand_dims(subunit=subunit) 

104 data_dict[var] = self._index_da(da, index) 

105 

106 for var in self._to_fill: 

107 data_dict[var] = "" 

108 

109 dataframe = pd.DataFrame( 

110 data=data_dict, columns=list(self._metadata_dict.keys()) 

111 ) 

112 

113 self._check_range(dataframe) 

114 

115 return self.write_dataframe_fixed_width(file, dataframe) 

116 

117 def _pkgcheck(self): 

118 """ 

119 Method to do package checks. The base class version checks if provided 

120 data has a subunit coordinate or not. 

121 """ 

122 for var in self._with_subunit: 

123 if "subunit" not in self.dataset[var].coords: 

124 raise ValueError( 

125 f"Variable '{var}' in {self.__class__} should contain " 

126 "'subunit' coordinate" 

127 ) 

128 for var in self._without_subunit: 

129 if "subunit" in self.dataset[var].coords: 

130 raise ValueError( 

131 f"Variable '{var}' in {self.__class__} should not " 

132 "contain 'subunit' coordinate" 

133 )