Coverage for registry.py: 56%

203 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-20 20:51 +0100

1import json 

2import logging 

3import os 

4import re 

5from collections import defaultdict, namedtuple 

6from multiprocessing import Manager, Process 

7from pathlib import Path 

8 

9from numtools.serializer import Serializer 

10 

11from nastranio import cards as cards_mod 

12from nastranio.constants import ( 

13 BULK, 

14 CASE, 

15 COMMENTS, 

16 EXEC, 

17 META, 

18 PARAMS, 

19 SUMMARY, 

20 UNKNOWN, 

21) 

22from nastranio.decorators import timeit 

23from nastranio.mesh_api import Mesh 

24from nastranio.mesh_api.mod import Mod 

25from nastranio.readers.bulk import read_buffer, read_bulk 

26from nastranio.utils import calcdiff 

27 

28# ---------------------------------------------------------------------------- 

29# optional imports... 

30 

31# fmt: off 

32try: 

33 import msgpack 

34 ISMSGPACK = True 

35except ImportError: 

36 ISMSGPACK = False 

37 logging.debug('`msgpack` not available') 

38 

39try: 

40 from deepdiff import DeepDiff 

41 ISDEEPDIFF = True 

42except ImportError: 

43 ISDEEPDIFF = False 

44 logging.debug('`deepdiff` not available') 

45# fmt: on 

46 

47 

48class Registry(Serializer): 

49 """ 

50 Main public entry point to Nastran Bulk file parsing. 

51 """ 

52 

53 def __init__(self): 

54 self.container = None 

55 

56 def _build_summary(self): 

57 """(re-)build from scratch a summary dict. Use full when 

58 registry is manually created 

59 """ 

60 _summary = defaultdict(set) 

61 for cardname, card in self.container["bulk"].items(): 

62 _summary[card.type].add(cardname) 

63 if hasattr(card, "shape"): 

64 _summary[card.shape].add(cardname) 

65 if hasattr(card, "dim"): 

66 _summary[card.dim].add(cardname) 

67 self.container["summary"] = dict(_summary) 

68 

69 def diff(self, other, exclude_meta=False): 

70 return calcdiff((self.container, other.container), exclude_meta=exclude_meta) 

71 

72 def __eq__(self, other): 

73 diff = calcdiff((self.container, other.container), exclude_meta=True) 

74 return len(diff) == 0 

75 

76 def comments(self): 

77 """associate cards individuals to comments""" 

78 # -------------------------------------------------------------------- 

79 # search for FEMAP comments 

80 if COMMENTS.title not in self.container: 

81 return {} 

82 comments = self.container[COMMENTS.title] 

83 femap_tags = defaultdict(dict) 

84 regex = re.compile( 

85 r"^\$\s+Femap\s*(?P<what>(\w+\s+)+)\s*(?P<id>\d+)\s*:\s*(?P<title>.*)$" 

86 ) 

87 for coms in comments: 

88 # skip block comment 

89 for com in coms.split("\n"): 

90 # if "\n" in com: 

91 # continue 

92 m = regex.match(com) 

93 if m: 

94 femap_tags[m.group("what").replace("with NX Nastran", "").strip()][ 

95 int(m.group("id")) 

96 ] = m.group("title").strip() 

97 return dict(femap_tags) 

98 

99 def check_consistancy(self, verbose=True): 

100 """introspect myself and display some useful infos and warnings""" 

101 issues = [] 

102 # -------------------------------------------------------------------- 

103 # warning on unknown cards 

104 ucards = list(self.container.get("summary", {}).get(UNKNOWN, ())) 

105 if ucards: 

106 issues.append(("warning", "Unknown cards: %s" % ucards)) 

107 if verbose: 

108 for level, msg in issues: 

109 getattr(logging, level)(msg) 

110 return issues 

111 

112 def bind_mesh_api(self): 

113 self.mesh = Mesh() 

114 self.mesh.set_registry(self, calc_csys=True) 

115 

116 def bind_mod_api(self, index_elems=("0d", "1d", "2d", "3d")): 

117 """bind nastranio.mesh_api.mod.Mod class""" 

118 self.mesh.mod = Mod() 

119 self.mesh.mod.set_registry(self, index_elems=index_elems) 

120 logging.info("Modification Module loaded under `reg.mesh.mod`") 

121 

122 # ======================================================================== 

123 # input / output 

124 # ======================================================================== 

125 

126 # ------------------------------------------------------------------------ 

127 # NASTRAN BULKFILES 

128 

129 def from_bulkfile(self, filename, nbprocs="auto", progress=True): 

130 """Populate a registry from a NASTRAN bulk file""" 

131 self.container = None 

132 try: 

133 fh = filename 

134 fh.seek(0) 

135 if nbprocs not in ("auto",) and nbprocs > 1: 

136 logging.warning("reading filelike object is monoproc") 

137 containers = read_buffer(fh, progress=progress) 

138 except AttributeError: 

139 containers = read_bulk( 

140 filename=filename, nbprocs=nbprocs, progress=progress 

141 ) 

142 self.container = containers.pop(0) 

143 for cont in containers: 

144 self._merge_container(cont) 

145 # create self.container['bulk'] dictionnaries: 

146 self.bind_mesh_api() 

147 # ===================================================================== 

148 # write all grids to CSYS 0 

149 # ===================================================================== 

150 modgids = self._translate_grids_to_0() 

151 if modgids: 

152 logging.warning("translated {len(modgids)} nodes to CSYS0}") 

153 self.check_consistancy() 

154 

155 def _translate_grids_to_0(self): 

156 """ 

157 ensure IDs described in carddata are in the same order as 

158 IDs got from GRID.coords() 

159 """ 

160 if len(self.container["bulk"]["GRID"]) == 0: 

161 return set() 

162 if set(self.container["bulk"]["GRID"].carddata["main"]["CP"]) == set((0,)): 

163 return set() 

164 coordsK = self.container["bulk"]["GRID"].coords(asdf=True) # as defined 

165 coords0 = self.container["bulk"]["GRID"].coords( 

166 csysreg=self.mesh.CSys, incsys=-1 

167 ) # coordinates in CSYS0 

168 gcdata = self.container["bulk"]["GRID"].carddata["main"] 

169 ids = gcdata["ID"] 

170 ids_coords = coords0[0] 

171 assert list(ids_coords) == ids 

172 gids_to_modify = coordsK[coordsK["csys"] != 0].index 

173 for gid in gids_to_modify: 

174 rowno = self.container["bulk"]["GRID"]._id2rowno[gid] 

175 gcdata["CP"][rowno] = 0 

176 xyz0 = coords0[1][rowno] 

177 gcdata["X1"][rowno] = xyz0[0] 

178 gcdata["X2"][rowno] = xyz0[1] 

179 gcdata["X3"][rowno] = xyz0[2] 

180 self.container["bulk"]["GRID"].clear_caches() 

181 return set(gids_to_modify) 

182 

183 def to_nastran(self, filename=None): 

184 """dumps registry to NASTRAN deck file""" 

185 bulk = "" 

186 # --------------------------------------------------------------------- 

187 # EXEC 

188 if "exec" in self.container: 

189 for k, v in self.container["exec"].items(): 

190 bulk += f"{k} {v} \n" 

191 bulk += "CEND\n" 

192 # --------------------------------------------------------------------- 

193 # CASES 

194 if "cases" in self.container: 

195 for case_section, data in self.container["cases"].items(): 

196 if case_section != "default": 

197 bulk += f"{case_section}\n" 

198 for k, v in data.items(): 

199 if k.isupper(): 

200 bulk += f" {k} = {v}\n" 

201 # --------------------------------------------------------------------- 

202 # BULK (parameters) 

203 if "param" in self.container or "bulk" in self.container: 

204 bulk += "BEGIN BULK\n" 

205 for param, value in self.container.get("params", {}).items(): 

206 bulk += f"PARAM,{param},{value}\n" 

207 # --------------------------------------------------------------------- 

208 # BULK (data) 

209 for cardfamily, cards in self.container.get("bulk", {}).items(): 

210 bulk += "\n".join(cards.to_nastran(comments=self.comments())) 

211 bulk += "\n" 

212 # --------------------------------------------------------------------- 

213 # end 

214 bulk += "ENDDATA" 

215 if filename: 

216 filename = Path(filename).expanduser() 

217 with open(filename, "w") as fh: 

218 fh.write(bulk) 

219 return filename 

220 return bulk 

221 

222 # ------------------------------------------------------------------------ 

223 # from / to file 

224 

225 def from_file(self, fname, fmt="auto"): 

226 """unserialize registry from a msgpack or json file""" 

227 if fmt == "auto": 

228 # guess file type from extension 

229 fmt = os.path.splitext(fname)[-1][1:] # "toto.json -> "json" 

230 self.container = None 

231 if fmt == "json": 

232 with open(fname, "r") as fh: 

233 txt = fh.read() 

234 self.from_json(txt) 

235 

236 elif fmt == "pack": 

237 with open(fname, "rb") as fh: 

238 txt = fh.read() 

239 self.from_msgpack(txt) 

240 else: 

241 raise ValueError(f"extension/format {fmt} not handled") 

242 self.check_consistancy() 

243 

244 def to_file(self, fname=None, fmt="pack"): 

245 """ 

246 Serialize registry to file. Default is to save to msgpack. 

247 

248 :param fname: file path 

249 :type fname: str or `None` 

250 :param fmt: file's format (and extension) 

251 :type fmt: str or `None` 

252 

253 :returns: actual file name 

254 """ 

255 if not fname: 

256 # if filename is not provided, save registry besides source file 

257 # using the passed format `fmt` as extension. 

258 source = self.meta["source"] 

259 fname, ext = os.path.splitext(source) 

260 fname += f".{fmt}" 

261 if fmt == "json": 

262 with open(fname, "w") as fh: 

263 fh.write(self.to_json()) 

264 elif fmt == "pack": 

265 with open(fname, "wb") as fh: 

266 fh.write(self.to_msgpack()) 

267 return fname 

268 

269 def __getattr__(self, attr): 

270 """ 

271 user-friendly method to access self.container 

272 """ 

273 # if attr in self.__dict__: 

274 # return getattr(self, attr) 

275 if attr in self.container: 

276 # logging.debug('proxy to self.container["%s"]', attr) 

277 return self.container[attr] 

278 for section, data in self.container.items(): 

279 if attr in data: 

280 # logging.debug('proxy to self.container["%s"]["%s"]', section, attr) 

281 return self.container[section][attr] 

282 raise AttributeError(attr) 

283 

284 def __getstate__(self): 

285 return {"container": self.container} 

286 

287 def __setstate__(self, state): 

288 self.__dict__ = state 

289 self.bind_mesh_api() 

290 # if self.container: 

291 # self.check_consistancy() 

292 

293 def _merge_container(self, container): 

294 """merge two containers into the same registry. 

295 use full for multiprocess resuming 

296 """ 

297 sections = (EXEC, PARAMS, CASE, COMMENTS, BULK, SUMMARY) 

298 for section in sections: 

299 if section.title not in container: 

300 continue 

301 # if section.title in (EXEC.title, PARAMS.title, CASE.title): 

302 # # only bulk and comments should be non-zero 

303 # if len(container[section.title]) != 0: 

304 # logging.debug("%s::%s", section.title, BULK.title) 

305 # raise ValueError("Section {0.title} not empty".format(section)) 

306 if section.title not in self.container: 

307 self.container[section.title] = {} 

308 self_d = self.container[section.title] 

309 other_d = container[section.title] 

310 logging.debug(f"merge {section.title} (%d items)", len(other_d)) 

311 if section.title in (EXEC.title, PARAMS.title, CASE.title): 

312 self_d.update(other_d) 

313 elif section.title == COMMENTS.title: 

314 # reglar lists 

315 self_d += other_d 

316 elif section.title == SUMMARY.title: 

317 for k in other_d: 

318 self_d[k] |= other_d[k] 

319 elif section.title == BULK.title: 

320 # append new bulk cards_mod family to self 

321 for bulk_entry, bulk_entries in container[BULK.title].items(): 

322 if bulk_entry not in self.container[BULK.title]: 

323 self.container[BULK.title][bulk_entry] = bulk_entries 

324 logging.debug( 

325 'added "%s" (%d items)', bulk_entry, len(bulk_entries) 

326 ) 

327 continue 

328 # entry already exists ; we need to append entries 

329 else: 

330 self.container[BULK.title][bulk_entry].merge(bulk_entries) 

331 

332 

333if __name__ == "__main__": 

334 import doctest 

335 

336 doctest.testmod(optionflags=doctest.ELLIPSIS)