Coverage for readers/op2.py: 19%
131 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 20:51 +0100
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 20:51 +0100
1"""
2interface with OP2 results
3"""
5# ============================================================================
6# ⚠ NOTE ⚠
7# ----------------------------------------------------------------------------
8# For now, use pyNastran
9# ----------------------------------------------------------------------------
10# nic@alcazar -- lundi 2 décembre 2019, 14:08:12 (UTC+0100)
11# mercurial: 36fcd4a16748 tip
12# ============================================================================
14import glob
15import logging
16import multiprocessing as mp
17import os
18import time
20import numpy as np
21import pandas as pd
23try:
24 from pyNastran.op2.op2 import OP2
25except ImportError as exc:
26 logging.warning("pyNastran not installed.")
27 import sys
29 sys.exit(0) # make pytest fails..
31PD_OPTIONS = {"precision": 3}
32NP_PRINTOPTIONS = {"precision": 3, "threshold": 20}
35def set_options(pd_options=None, np_printoptions=None):
36 if not pd_options:
37 pd_options = PD_OPTIONS
38 for k, v in pd_options.items():
39 pd.set_option(k, v)
40 if not np_printoptions:
41 np_printoptions = NP_PRINTOPTIONS
42 for k, v in np_printoptions.items():
43 np.set_printoptions(**np_printoptions)
46class MultipleOP2:
47 """basic wrapper around pyNastran OP2 cls"""
49 def __init__(self, mode="nx", autorename=True):
50 self._mode = mode
51 self._autorename = autorename
52 self.subcases = {}
53 # automatic renaming for DataFrames columns headers
54 self._op2nasca = {
55 "lc": "lc",
56 "NodeID": "gid",
57 "ElementID": "eid",
58 "ElementType": "source",
59 "f1": "t1",
60 "f2": "t2",
61 "f3": "t3",
62 "m1": "r1",
63 "m2": "r2",
64 "m3": "r3",
65 "t12": "shear_xy_mat",
66 "t1z": "shear_xz_mat",
67 "t2z": "shear_yz_mat",
68 "NodeID": "gid",
69 "ElementID": "eid",
70 "Item": "vector",
71 "Layer": "ply_id",
72 0: "FEA_value",
73 }
74 self._nasca2op = {v: k for k, v in self._op2nasca.items()}
76 def _new_op2(self, debug=False, log=None, debug_file=None):
77 """op2 boilerplate"""
78 op2 = OP2(debug=debug, log=log, debug_file=debug_file)
79 op2.set_mode(self._mode)
80 return op2
82 @property
83 def lcs(self):
84 return self.subcases.copy()
86 def read_op2(self, args):
87 """read a single op2. Can be called multiple times"""
88 if isinstance(args, str):
89 multiprocess = False
90 filename = args
91 else:
92 multiprocess = True
93 filename = args[0]
94 container = args[1]
95 fname, ext = os.path.splitext(filename)
96 rootdir, _ = os.path.split(filename)
97 op2 = self._new_op2()
98 op2.read_op2(
99 filename, combine=True, build_dataframe=False, skip_undefined_matrices=True
100 )
101 if not multiprocess:
102 # single process reading
103 # get last id from subcases
104 try:
105 id_offset = max(self.subcases.keys())
106 except:
107 id_offset = 0
108 for isubcase in op2.subcase_key.keys():
109 self.subcases[isubcase + id_offset] = (op2, isubcase, op2.title)
110 else:
111 container.append((filename, op2, tuple(op2.subcase_key.keys())))
113 def read_op2_in_dir(self, path, pattern="*.op2", multiprocess=False):
114 """
115 Read several op2 from dir, as per provided pattern. Default (``'*.op2'``) is to
116 read all op2 from path."""
117 files = glob.glob(os.path.join(path, pattern))
118 files = sorted(files)
119 start = time.time()
120 if not multiprocess:
121 for file in files:
122 self.read_op2(file)
123 stop = time.time()
124 else:
125 # multiprocess reading
126 with mp.Manager() as manager:
127 op2s = manager.list()
128 args = [(file, op2s) for file in files]
129 with manager.Pool() as pool:
130 pool.map(self.read_op2, args)
131 op2s = list(op2s)
132 # sort op2s by filename
133 op2s = sorted(op2s, key=lambda x: x[0])
134 offset = 0
135 for (fname, op2, lcids) in op2s:
136 for isubcase in lcids:
137 self.subcases[isubcase + offset] = (op2, isubcase, op2.title)
138 offset += len(lcids)
139 stop = time.time()
140 logging.info(70 * "-")
141 logging.info("read %d files in %.3f sec." % (len(files), stop - start))
142 logging.info(70 * "-")
144 def release_df(self, df):
145 """post-processing function for released DataFrames"""
146 if self._autorename:
147 df = df.reset_index().rename(columns=self._op2nasca)
148 cols = [c for c in df.columns if not c.startswith("level_")]
149 df = df[cols]
150 df.columns.name = None
151 return df
152 return df.reset_index()
154 def result(self, attr, filter_v=None, autoclean=True):
155 """read one single result from all op2s and all subcases,
156 and return a dataframe
157 """
158 _res = {}
159 for lcid, (op2, isubcase, title) in self.subcases.items():
160 logging.info(
161 f'recover "{attr}" for lcid {lcid} "{title}" (isubcase={isubcase})',
162 end="...",
163 )
164 res = getattr(op2, attr)
165 res = res.get(isubcase)
166 if not res:
167 logging.info("no results. Return")
168 return
169 if res.dataframe is None:
170 res.build_dataframe()
171 res = res.dataframe.copy().reset_index()
172 if filter_v:
173 res = res[res[list(filter_v.keys())].isin(filter_v).all(axis=1)]
174 _res[lcid] = res
175 logging.info("ok")
176 _res = pd.concat(_res, axis=0)
177 _res.index.names = ["lc"] + _res.index.names[1:]
178 if autoclean:
179 return self.release_df(_res)
180 return _res
182 def results(self, attrs, filter_v=None):
183 """sequential `self.result()`` calls"""
184 dfs = [self.result(attr, filter_v=filter_v, autoclean=False) for attr in attrs]
185 # eventually remove Nones
186 dfs = [df for df in dfs if df is not None]
187 df = pd.concat(dfs)
188 return self.release_df(df)
190 def gpf(self, filter_v=None):
191 """
192 read 'grid_point_forces'
193 """
194 filter_v = self._translate_filter(filter_v)
195 return self.result("grid_point_forces", filter_v=filter_v)
197 def composites(self, filter_v=None):
198 """"""
199 attrs = (
200 "cquad4_composite_stress",
201 "ctria3_composite_stress",
202 "cquad8_composite_stress",
203 )
204 filter_v = self._translate_filter(filter_v)
205 df = self.results(attrs, filter_v=filter_v)
206 return df
208 def _translate_filter(self, filter_v):
209 if not filter_v:
210 return
211 translated = {}
212 for k, values in filter_v.items():
213 values = [self._nasca2op.get(v, v) for v in values]
214 translated[self._nasca2op.get(k, k)] = values
215 return translated