Coverage for readers/gmsh/study.py: 12%
301 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 20:51 +0100
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 20:51 +0100
1import json
2import logging
3import shutil
4import subprocess
5from collections import defaultdict
6from copy import deepcopy
7from pathlib import Path
9import yaml
11import nastranio.cards as cards
12from nastranio.constants import BULK, CASE, COMMENTS, EXEC, META, PARAMS, SUMMARY
13from nastranio.readers.gmsh import GmsHParser, user_input
14from nastranio.registry import Registry
17def _convert_keys_to_int(d: dict):
18 """recursively converts dictionnary keys to integer when possible.
19 Usefull for json unserialization
20 """
21 new_dict = {}
22 for k, v in d.items():
23 try:
24 new_key = int(k)
25 except ValueError:
26 new_key = k
27 if type(v) == dict:
28 v = _convert_keys_to_int(v)
29 new_dict[new_key] = v
30 return new_dict
33class Study:
34 def __init__(self, meshfile, autobuild=True):
35 """
36 Create a NASTRAN study based on GMSH mesh file.
37 If autobuild is True (default), build the NASTRAN model.
38 Setting `autobuild` to False allow to intercept GMSH
39 mesh for modification (eg. nodes renumbering).
40 """
41 self._meshfile = Path(meshfile).expanduser()
42 self.gmsh = GmsHParser(self._meshfile)
43 logging.info("\n" + self.gmsh.info())
44 if autobuild:
45 self.build()
47 def run(self, output="run", exist_ok=True):
48 target = self._meshfile.parent / output
49 target.mkdir(exist_ok=exist_ok)
50 nasfile = target / (self._meshfile.stem + ".nas")
51 self.to_nastran(nasfile)
52 logging.info(f"wrote {nasfile}")
53 MYSTRAN = shutil.which("mystran")
54 try:
55 subprocess.check_output([MYSTRAN, nasfile], stderr=subprocess.STDOUT)
56 except subprocess.CalledProcessError:
57 logging.error("!!!")
58 return target
60 def load_user_params(self, params=None):
61 """
62 `params` can be:
63 * a valid dictionnary
64 * a path to JSON/YAML parameters file
65 * a jsonified string
66 * None (in which case, <meshfile>.params.json will be searched for)
67 """
68 self.ui = user_input.UserInput()
69 if isinstance(params, dict):
70 return self.load_user_params_from_dict(params)
71 # None, string or Path. It may be either a JSON string, either a path
72 # to parameters file.
73 try:
74 self.load_user_params_from_file(params)
75 except (FileNotFoundError, OSError):
76 self.load_user_params_from_json_txt(params)
78 def load_user_params_from_dict(self, dic):
79 return self.ui.load_parameters(dic.copy())
81 def load_user_params_from_json_txt(self, json_txt, fmt, verbose=True):
82 if fmt == ".json":
83 params = json.loads(json_txt)
84 params = _convert_keys_to_int(params)
85 elif fmt in (".yaml", ".yml"):
86 params = yaml.load(json_txt, Loader=yaml.FullLoader)
87 if verbose:
88 print(f"loading configuration from {fmt[1:]} string '{json_txt[:20]}...'")
89 return self.load_user_params_from_dict(params)
91 def _create_params_file(self, filepath):
92 logging.warning(f"create {filepath} template file")
93 params_data = self.get_ui_input_template()
94 with open(filepath, "w") as fh:
95 if filepath.suffix == ".json":
96 fh.write(json.dumps(params_data, indent=4, sort_keys=False))
97 if filepath.suffix in (".yaml", ".yml"):
98 fh.write(yaml.dump(params_data, indent=4, sort_keys=False))
100 def load_user_params_from_file(self, filepath, verbose=False):
101 if filepath is None:
102 extensions = (".json", ".yml", ".yaml")
103 for suffix in extensions:
104 filepath = self._meshfile.parent / (
105 self._meshfile.stem + ".params" + suffix
106 )
107 if filepath.exists():
108 break
109 else:
110 # no parameters file found
111 filepath = self._meshfile.parent / (
112 self._meshfile.stem + ".params.json"
113 )
114 self._create_params_file(filepath)
115 with open(filepath, "r") as fh:
116 json_txt = fh.read()
117 if verbose:
118 print(f"loading configuration from {filepath}")
119 return self.load_user_params_from_json_txt(
120 json_txt, fmt=filepath.suffix, verbose=False
121 )
123 def orphan_nodes(self, clean=False):
124 """delete orphan nodes from GmsH mesh"""
125 used_gids = set(self.reg.mesh.gid2eids())
126 defined_gids = set(self.gmsh.nodes)
127 orphans = defined_gids - used_gids
128 if orphans:
129 logging.warning(f"{orphans=}")
130 if clean:
131 for orphan_gid in orphans:
132 self.gmsh.nodes.pop(orphan_gid)
133 self.build()
135 return orphans
137 def get_ui_input_template(self):
138 default = self.ui.get_default_template()
139 # -------------------------------------------------------------
140 # process boundaries. Everything found go to SID 1
141 default["attributes"]["boundaries"][1] = {}
142 _default_value = self.ui.get_default_boundary_input()
143 master_key = "boundaries"
144 find_in_grpname = "spc"
145 self._update_default_master_key(
146 master_key, find_in_grpname, _default_value, default
147 )
148 # -------------------------------------------------------------
149 # process loading. Everithing found go to SID 1
150 default["attributes"]["loading"][1] = {}
151 _default_value = self.ui.get_default_loading_input()
152 master_key = "loading"
153 find_in_grpname = "load"
154 self._update_default_master_key(
155 master_key, find_in_grpname, _default_value, default
156 )
157 # -------------------------------------------------------------
158 # process affectations
159 for dim, grps in self.gmsh.dim2physical_group.items():
160 if dim == 0:
161 continue
162 for grp in grps:
163 if dim == 1:
164 default["attributes"]["affectations"][grp.name] = {
165 "PID": 1,
166 "GO/X1": 1.0,
167 "CID": 0,
168 "X2": 0.0,
169 "X3": 0.0,
170 }
171 elif dim == 2:
172 default["attributes"]["affectations"][grp.name] = {"PID": 3}
173 return default
175 def build(self, clean_orphan_nodes=False, bind_mesh_api=True):
176 """
177 mesh file (.msh) ━───────┬───────> NASTRAN BULK
178 params file (.yml) ━──────┘
179 """
180 logging.info("building NASTRAN model")
181 if not hasattr(self, "ui"):
182 self.load_user_params()
183 if clean_orphan_nodes:
184 self.orphan_nodes(clean=True)
186 self.reg = Registry()
187 self.reg.container = {
188 META.title: {"source": self._meshfile},
189 BULK.title: {},
190 }
191 self._make_exec()
192 self._make_cases()
193 self._make_params()
194 self._make_materials()
195 self._make_properties()
196 self._make_boundaries()
197 self._make_loading()
198 self._make_nodes()
199 self._make_elements()
200 # finalize registry
201 self.reg.bind_mesh_api()
202 self.reg._build_summary()
204 def to_nastran(self, target=None):
205 if not target:
206 target = self._meshfile.parent / (self._meshfile.stem + ".nas")
207 elif target == str:
208 target = None
209 return self.reg.to_nastran(target)
211 def _update_default_master_key(self, master_key, find, default_value, default):
212 all_groups = self.gmsh.physical_groups
213 for (grpdim, grpid), grp in all_groups.items():
214 if find in grp.name.lower():
215 bound = {grp.name: default_value}
216 default["attributes"][master_key][1].update(bound)
217 # no group was named with SPC, pick-up a random one
218 if not default["attributes"][master_key][1]:
219 grp = next(iter(all_groups.values())) # pick up randomly
220 bound = {grp.name: default_value}
221 default["attributes"][master_key][1].update(bound)
223 def _make_nodes(self):
224 self.reg.container[BULK.title]["GRID"] = cards.GRID()
225 for gid, grid in self.gmsh.nodes.items():
226 self.reg.container["bulk"]["GRID"].append_sparams(
227 {"ID": gid, "X1": grid.x, "X2": grid.y, "X3": grid.z}
228 )
230 def _finish_make_items(self, carddata):
231 card = carddata["card"]
232 cardname = card.__name__
233 params = carddata["params"]
234 over, grpname = carddata.get("over", (None, None))
235 repeat = {}
236 if over and (repeated := carddata.get("repeated")):
237 if over == "nodes":
238 ids = self.gmsh.physical_group2gids[grpname]
239 else:
240 ids = self.gmsh.physical_group2eids[grpname]
241 repeat = {repeated[0]: ids}
242 elif repeated := carddata.get("repeated"):
243 # eg. PCOMP card has repeated fields without over
244 repeat = repeated
245 if cardname not in self.reg.container["bulk"]:
246 self.reg.container["bulk"][cardname] = card()
247 self.reg.bulk[cardname].append_sparams(params, **repeat)
249 def _make_materials(self):
250 # apply boundaries
251 materials = self.ui.get_materials()
252 for mid, material in materials.items():
253 self._finish_make_items(material)
255 def _make_properties(self):
256 # apply boundaries
257 properties = self.ui.get_properties()
258 for pid, prop in properties.items():
259 self._finish_make_items(prop)
261 def _make_boundaries(self):
262 # apply boundaries
263 boundaries = self.ui.get_boundaries()
264 for sid, constraints in boundaries.items():
265 for constraint in constraints:
266 self._finish_make_items(constraint)
268 def _make_loading(self):
269 self._loaded_gids = {}
270 # apply boundaries
271 loading = self.ui.get_loading()
272 for sid, items in loading.items():
273 sid2loaded_gids = defaultdict(list)
274 self._loaded_gids[sid] = sid2loaded_gids
275 for item in items:
276 as_sum = item.get("unknown", {}).get("as_sum", False)
277 if "over" in item:
278 gids = self.gmsh.physical_group2gids[item["over"][1]]
279 # only FORCE are allowed to be devided
280 if item["card"].__name__ == "FORCE" and as_sum:
281 item["params"]["N1"] /= len(gids)
282 item["params"]["N2"] /= len(gids)
283 item["params"]["N3"] /= len(gids)
284 for gid in gids:
285 _item = deepcopy(item)
286 _item["params"]["G"] = gid
287 _item["missing"].remove("G")
288 sid2loaded_gids[gid].append(_item)
289 # delegate _finish_make_item(_item) to deduplicator
290 else:
291 # eg. GRAV
292 self._finish_make_items(item)
293 self._loaded_gids[sid] = dict(sid2loaded_gids)
294 self._make_loading_inspect_loaded_gids()
296 def _make_loading_inspect_loaded_gids(self):
297 duplicated_loading = defaultdict(dict)
298 for sid, loaded_nodes in self._loaded_gids.items():
299 for gid, items in loaded_nodes.items():
300 if len(items) > 1:
301 duplicated_loading[sid][gid] = []
302 for item in items:
303 grpname = item["over"][1]
304 is_exclusive = item.get("unknown", {}).get("exclusive", False)
305 duplicated_loading[sid][gid].append((grpname, is_exclusive))
306 duplicated_loading = dict(duplicated_loading)
307 # ---------------------------------------------------------------------
308 # process exclusivity
309 if duplicated_loading:
310 for sid, gids2loading_grp in list(duplicated_loading.items())[:]:
311 for gid, grpnames in gids2loading_grp.copy().items():
312 dedup = defaultdict(set)
313 for grpname, is_exclusive in grpnames:
314 dedup[is_exclusive].add(grpname)
315 if True not in dedup:
316 continue
317 grpname_to_keep = dedup[True]
318 if len(grpname_to_keep) > 1:
319 raise ValueError(
320 f"several exclusivities detected for SID {sid} and GID {gid}: {grpname_to_keep}"
321 )
322 grpname_to_keep = next(iter(grpname_to_keep))
323 # remove unwanted items form _loaded_gids
324 to_pop = []
325 for unwanted_loading_grp in dedup[False]:
326 for i, item in enumerate(self._loaded_gids[sid][gid][:]):
327 item_grpname = item["over"][1]
328 if item_grpname == unwanted_loading_grp:
329 logging.info(
330 f"loading SID{sid} GRID{gid}: ensure exclusivity for group '{grpname_to_keep}' by removing loading group '{item_grpname}'"
331 )
332 to_pop.append(i)
333 for i in to_pop[::-1]:
334 self._loaded_gids[sid][gid].pop(i)
335 duplicated_loading[sid][gid].pop(i)
336 if len(duplicated_loading[sid][gid]) == 1:
337 duplicated_loading[sid].pop(gid)
338 # ---------------------------------------------------------------------
339 # create loading
340 for sid, loaded_nodes in self._loaded_gids.items():
341 for gid, items in loaded_nodes.items():
342 for item in items:
343 self._finish_make_items(item)
344 # finally clean duplicated_loading
345 duplicated_loading = {
346 sid: dup for sid, dup in duplicated_loading.items() if dup
347 }
348 # ---------------------------------------------------------------------
349 # feedback
350 if duplicated_loading:
351 logging.debug(f"{duplicated_loading=}")
352 logging.warning("duplicated loading (combined) is detected")
353 logging.info("you may want `exclusive` to avoid loads combination.")
354 for sid, gids2loading_grp in duplicated_loading.items():
355 msg = f"Duplicated loaded grids for SID {sid}:\n"
356 for gid, grpnames in gids2loading_grp.items():
357 txt = " & ".join(["'%s'" % grp[0] for grp in grpnames])
358 msg += f" * GRID {gid}: {txt}\n"
359 logging.info(msg)
360 self._dup_gids_loading = duplicated_loading
362 def _make_elements(self):
363 affect = self.ui.get_affectations()
364 affected = set()
365 for physicalName, item in affect.items():
366 eids = self.gmsh.physical_group2eids[physicalName]
367 for eid in eids:
368 elt = self.gmsh.elements[eid]
369 eid_item = deepcopy(item[elt.element_type])
370 XID = eid_item["card"].XID_FIELDNAME
371 eid_item["params"][XID] = eid
372 eid_item["missing"].remove(XID)
373 eid_item.pop("over")
374 grids = dict(zip(eid_item["card"]._gids_header(), elt.gids))
375 eid_item["params"].update(grids)
376 eid_item.pop("missing")
377 self._finish_make_items(eid_item)
378 affected.add(eid)
379 # give a feedback about physicalNames not set
380 unset = {}
381 for physicalName, eids in self.gmsh.physical_group2eids.items():
382 nb_unset = len(eids - affected)
383 if nb_unset:
384 unset[physicalName] = (nb_unset, len(eids))
385 for physicalName, (nb_unset, nb_total) in unset.items():
386 msg = f"physical group '{physicalName}' has {nb_unset} non-affected elements out of {nb_total}"
387 if "spc" not in physicalName.lower() and "load" not in physicalName.lower():
388 logging.info(msg)
389 else:
390 logging.debug(msg)
392 def _make_exec(self):
393 self.reg.container["exec"] = self.ui.parameters["exec"]
395 def _make_cases(self):
396 cases = self.ui.parameters["cases"]
397 boundary_sids = sorted(list(self.ui.get_boundaries().keys()))
398 loading_sids = sorted(list(self.ui.get_loading().keys()))
399 default_boundary_sid = boundary_sids.pop(0)
400 default_loading_sid = loading_sids[0]
401 if len(loading_sids) > 0:
402 for sid in loading_sids:
403 cases[f"SUBCASE {sid}"] = {
404 "SUBTITLE": f"LCID#{sid}",
405 "SPC": default_boundary_sid,
406 "LOAD": sid,
407 }
408 else:
409 cases["default"]["SPC"] = default_boundary_sid
410 cases["default"]["LOAD"] = default_loading_sid
411 self.reg.container["cases"] = cases
412 pass
414 def _make_params(self):
415 self.reg.container["params"] = self.ui.parameters["params"]
416 pass