Coverage for readers/bulk.py: 36%
350 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 20:51 +0100
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 20:51 +0100
1"""
2First level module
3"""
5import hashlib
6import linecache
7import logging
8import os
9import re
10import time
11from collections import defaultdict
12from io import StringIO
13from multiprocessing import Pool
14from pprint import pprint as pp
16from nastranio import cards
17from nastranio.constants import BULK, CASE, COMMENTS, EXEC, META, PARAMS, SUMMARY
18from nastranio.pylib import autoconvert, autoconvert_array
21def coroutine(func):
22 def starter(*args, **kwargs):
23 gen = func(*args, **kwargs)
24 next(gen)
25 return gen
27 return starter
30def log(txt):
31 pp(txt)
34@coroutine
35def route_line(card, comment):
36 """route comments and regular lines
37 to provided branches
38 """
39 in_commentblock = False
40 while True:
41 line = yield
42 if line.startswith("$"):
43 comment.send(line)
44 if not in_commentblock:
45 # open new comment block
46 in_commentblock = True
47 else:
48 # pipe data to regular branch
49 card.send(line)
50 if in_commentblock:
51 # end of previous comment block
52 in_commentblock = False
53 # close previous comment block
54 comment.send(-1)
57# Unfold
58@coroutine
59def unfold(output):
60 """
61 store and unfold continuated lines before
62 sending to output"""
63 block = []
64 BLOCK_TO_BE_SORTED = False
65 LOGFILE = None #'/home/nic/unfold.log'
66 while True:
67 try:
68 line = yield
69 # split short-format style
70 data = [line[i : i + 8].strip() for i in range(0, len(line), 8)]
71 nb_fields = len(data)
72 # by default: single line
73 # IS_CONTINUING means "current line continues block[-1]"
74 IS_CONTINUING = False
75 # ===============================================
76 # check for continuation
77 # NX Quick reference guide, page 983
78 # ===============================================
79 if len(block) > 0:
80 if len(block[-1]) == 10:
81 # small Field Format 1
82 # line(n-1, field 10) in {'+', '*'}
83 # and line(n, field 1) == '+'
84 if block[-1][9] in "*+" and data[0] == "+":
85 IS_CONTINUING = True
86 # small Field Format 2
87 # continuation if both line(n-1, field 10) a
88 # and line(n, field 1) are blank
89 elif block[-1][9] == "" and data[0] == "":
90 IS_CONTINUING = True
91 # small Field Format 3
92 # continuation if both line(n-1, field 10) a
93 # and line(n, field 1) are blank
94 elif block[-1][9] != "" and data[0] != "":
95 IS_CONTINUING = True
96 BLOCK_TO_BE_SORTED = True
97 else:
98 # len(block[-1]) != 10:
99 # line n-1 was clipped after n-fields
100 # we therefore need to assume blank 10th field)
101 # FORMAT 2
102 if data[0] == "":
103 IS_CONTINUING = True
104 # -----------------------------------------------
105 if IS_CONTINUING:
106 block.append(data)
107 else:
108 # end of block. flush it!
109 if len(block) > 0:
110 unfolded = _unfold(block, data, BLOCK_TO_BE_SORTED, LOGFILE)
111 BLOCK_TO_BE_SORTED = False
112 output.send(unfolded)
113 block = []
114 # and fill it with new line
115 block.append(data)
116 except GeneratorExit:
117 # flush remaining data
118 if data:
119 unfolded = _unfold(block, data, BLOCK_TO_BE_SORTED, LOGFILE)
120 output.send(unfolded)
121 raise
124def _unfold(block, data, BLOCK_TO_BE_SORTED, LOGFILE):
125 if len(block) > 2 and BLOCK_TO_BE_SORTED:
126 # need to sort lines as per FORMAT 3
127 sorted_block = block[:1] # 1st line ok
128 cont_char = block[0][-1]
129 # build a dict with cont'd chars as keys
130 datadic = {data[0]: data for data in block[1:]}
131 for i in data[1:]:
132 if cont_char not in datadic:
133 continue # this was last line
134 next_line = datadic[cont_char]
135 cont_char = next_line[-1]
136 sorted_block.append(next_line)
137 block = sorted_block
138 unfolded = []
139 for row in block:
140 unfolded += row
141 if LOGFILE:
142 with open(LOGFILE, "a") as fh:
143 fh.write(" ".join(unfolded) + "\n")
144 return unfolded
147@coroutine
148def comments_sink(output, container):
149 """collect block of comments
150 a block ends when data == -1
151 """
152 block = []
153 while True:
154 data = yield
155 if data == -1:
156 output.send("\n".join(block))
157 container[COMMENTS.title].append("\n".join(block))
158 block = []
159 else:
160 block.append(data)
163@coroutine
164def printer(prefix="", suffix=""):
165 while True:
166 line = yield
167 msg = "{prefix}{line}{suffix}".format(prefix=prefix, line=line, suffix=suffix)
168 print(msg, end="")
171@coroutine
172def void():
173 while True:
174 line = yield
177@coroutine
178def counter(output, name, callback=None):
179 _nb_packets = 0
180 _packets = []
181 try:
182 while True:
183 data = yield
184 _nb_packets += 1
185 output.send(data)
186 _packets.append(data)
187 except GeneratorExit:
188 # send the last received line
189 # output.send(data)
190 # print('counter [%s]: %d' % (name, i))
191 if callback:
192 return callback(name, _nb_packets, _packets)
193 return
196@coroutine
197def strip(output, chars=None):
198 while True:
199 data = yield
200 output.send(data.strip(chars))
203@coroutine
204def route_section(exec, case, bulk, bulk_only=False):
205 if bulk_only:
206 SECTION = "BULK"
207 else:
208 SECTION = "EXEC" # will go through EXEC/CASE/BULK
209 while True:
210 line = yield
211 line_ = line.strip()
212 if line_.startswith("ENDDATA"):
213 continue
214 if line == "CEND":
215 # switch to CASE
216 SECTION = "CASE"
217 continue
218 elif line_ == "BEGIN BULK":
219 # switch to BULK
220 SECTION = "BULK"
221 continue
222 if SECTION == "EXEC":
223 exec.send(line)
224 elif SECTION == "CASE":
225 case.send(line)
226 elif SECTION == "BULK":
227 bulk.send(line)
230@coroutine
231def process_bulk(output, container):
232 while True:
233 line = yield
234 line_ = line.strip()
235 if line_.startswith("PARAM"):
236 _, key, value = line_.split(",")
237 container[PARAMS.title][key] = autoconvert(value)
238 continue
239 # regular bulk line
240 output.send(line)
243@coroutine
244def process_exec(output, container):
245 while True:
246 line = yield
247 line_ = line.strip()
248 key, *value = line_.split(" ")
249 container[EXEC.title][key] = autoconvert(" ".join(value))
252@coroutine
253def process_case(output, container):
254 SUBCASEKEY = "default" # default section preceeds all subcases definition
255 SUBCASEID = -1
256 while True:
257 line = yield
258 # save all the `CASE` lines
259 # interpret
260 if line.strip().startswith("SUBCASE"):
261 SUBCASEID = int(line.split()[-1].strip())
262 SUBCASEKEY = "SUBCASE %d" % SUBCASEID
263 continue
264 # sometimes, "TITLE = Callback Val=0"
265 param, *value = [txt.strip() for txt in line.split("=")]
266 if not SUBCASEKEY in container[CASE.title]:
267 container[CASE.title][SUBCASEKEY] = {"id": SUBCASEID}
268 try:
269 container[CASE.title][SUBCASEKEY][param] = autoconvert("=".join(value))
270 except ValueError:
271 # eg. SET 1 = 1, 1003
272 logging.warning(f'cannot process param "{param} = {value}"')
273 continue
276@coroutine
277def register_card(output, container):
278 bulk = container[BULK.title]
279 summary = container[SUMMARY.title]
280 while True:
281 fields = yield
282 # remove card name from fields
283 try:
284 card_name, fields = fields[0], fields[1:]
285 except IndexError:
286 logging.critical("cannot dump fields %s", fields)
287 raise
288 container_card_entry = bulk.get(card_name)
289 if not container_card_entry:
290 # get the <CARD> object from cards module, and instantiate:
291 card_inst = cards.__dict__.get(card_name, cards.DefaultCard)(name=card_name)
292 # prepare a container in bulk
293 bulk[card_name] = card_inst
294 summary[card_inst.type].add(card_name)
295 if hasattr(card_inst, "dim"):
296 summary[card_inst.dim].add(card_name)
297 if hasattr(card_inst, "shape"):
298 summary[card_inst.shape].add(card_name)
299 container_card_entry = bulk.get(card_name)
300 try:
301 fields = autoconvert_array(fields) # [autoconvert(f) for f in fields]
302 except ValueError:
303 raise ValueError(f"cannot convert {container_card_entry.name}: {fields=}")
304 container_card_entry.append_fields_list(fields)
307def get_nb_lines(filename):
308 with open(filename, "r") as fh:
309 for nb_rows, l in enumerate(fh):
310 pass
311 nb_rows += 1
312 return nb_rows
315def _split_filename(filename, nbprocs):
316 """split a file and returns a list of dicts like:
318 [{'slice': 0, 'nb_lines': 215974, 'fh': <_io.StringIO at 0x7f8c069b3168>},
319 {'slice': 1, 'nb_lines': 215975, 'fh': <_io.StringIO at 0x7f8c069c3a68>},
320 {'slice': 2, 'nb_lines': 215975, 'fh': <_io.StringIO at 0x7f8c069c3e58>},
321 {'slice': 3, 'nb_lines': 215977, 'fh': <_io.StringIO at 0x7f8c069d7438>}]
323 The function makes sure to split the file right above a new card entry.
324 """
326 OKREGEX = re.compile(r"^(\$)?[A-Z]+\d*") # can wwe split above?
327 nb_rows = get_nb_lines(filename)
328 # calculate nb of procs depending on nb of lines
329 if nbprocs == "auto":
330 if nb_rows <= 100:
331 nbprocs = 1
332 elif nb_rows <= 500:
333 nbprocs = 2
334 elif nb_rows <= 1000:
335 nbprocs = 3
336 elif nb_rows <= 10000:
337 nbprocs = 4
338 elif nb_rows <= 100000:
339 nbprocs = 8
340 else:
341 nbprocs = 16
343 logging.info("automatically set nb of process to %d", nbprocs)
344 # ------------------------------------------------------------------------
345 # first split SHALL go at least up to "CEND"
346 with open(filename, "r") as fh:
347 for cend_line, line in enumerate(fh):
348 if "CEND" in line:
349 break
350 # ------------------------------------------------------------------------
351 # prepare the split
352 logging.info("split a %d lines file to %d chunks", nb_rows, nbprocs)
353 nblines_per_split = nb_rows // nbprocs
354 targets = []
355 last = 0
356 for splitnb in range(nbprocs)[:-1]:
357 last = splitnb * nblines_per_split + nblines_per_split
358 # ensure that CEND is included in first shot
359 if splitnb == 0 and last < cend_line:
360 last = cend_line
361 while True:
362 previous = linecache.getline(filename, last - 1)
363 target = linecache.getline(filename, last)
364 # append previous line as target row nb
365 if OKREGEX.match(target) and not previous.startswith("$"):
366 targets.append(last - 1) # split above
367 break
368 last += 1
369 targets.append(nb_rows)
370 fhs = []
371 prevstop = 0
372 nextstop = targets.pop(0)
373 fh_buffer = StringIO()
374 buffer = []
375 slicenb = 0
376 rel_line_nb = 0
377 with open(filename, "r") as fh:
378 for linenb, line in enumerate(fh):
379 if linenb == nextstop:
380 # we reached the last line of current buffer
381 # current line will be the first of next split
382 buffer_id = len(fhs)
383 fh_buffer.writelines(buffer)
384 fh_buffer.seek(0, 0)
385 fhs.append(fh_buffer)
386 fh_buffer = StringIO()
387 # flush buffer
388 buffer = []
389 slicenb += 1
390 rel_line_nb = 0
391 try:
392 prevstop = nextstop
393 nextstop = targets.pop(0)
394 except:
395 # last run...
396 pass
397 buffer.append(line)
398 rel_line_nb += 1
399 fh_buffer.writelines(buffer)
400 fh_buffer.seek(0, 0)
401 fhs.append(fh_buffer)
402 logging.info("prepared %d jobs" % len(fhs))
404 return nbprocs, nb_rows, fhs
407def _process_file(filehdata):
408 """
409 Pipeline creation and burning
410 """
411 # prepare and advanced counter reporting for cards
412 fileh, fileh_nb, progress = filehdata
414 def cnt_detailed_callback(name, nb_packets, packets):
415 if len(packets) > 0:
416 if isinstance(packets[0], str):
417 first = packets[0][:17]
418 elif isinstance(packets[0], list):
419 first = " ".join(packets[0][:2])
420 else:
421 first = packets[0]
422 if isinstance(packets[-1], str):
423 last = packets[-1][:17]
424 elif isinstance(packets[-1], list):
425 last = " ".join(packets[-1][:2])
426 else:
427 last = packets[-1]
428 msg = f'counter [{name}] {nb_packets} items:: "{first}" -> "{last}"'
429 else:
430 first = None
431 last = None
432 msg = f"counter [{name}] {nb_packets}"
433 logging.info(msg)
435 counter_callback = None
436 # ========================================================================
437 # initialize container
438 # ========================================================================
439 container = {}
440 for section in (EXEC, PARAMS, COMMENTS, BULK, CASE, META, SUMMARY):
441 container[section.title] = section.type(*section.args)
442 # ========================================================================
443 # build pipeline
444 # ========================================================================
445 # end-points of pipeline:
446 null = void()
447 pr1 = printer(suffix="\n\n")
448 pr2 = printer(prefix="BULK: ", suffix="\n\n")
449 # ------------------------------------------------------------------------
450 # main branches:
451 comments = counter(
452 comments_sink(null, container), f"Buffer #{fileh_nb} comments", callback=None
453 )
455 regular = route_section(
456 exec=process_exec(void, container),
457 case=process_case(void, container),
458 bulk=process_bulk(
459 unfold(
460 counter(
461 register_card(null, container),
462 f"Buffer #{fileh_nb} cards",
463 callback=cnt_detailed_callback,
464 )
465 ),
466 container,
467 ), # / process_bulk
468 bulk_only=fileh_nb > 0,
469 )
471 # ------------------------------------------------------------------------
472 # the whole pipeline (except for pid)
473 pipe = counter(
474 strip(route_line(regular, comments), "\n"),
475 f"Buffer #{fileh_nb} all packets",
476 callback=cnt_detailed_callback,
477 )
479 # pump data into the pipeline
480 for line in fileh:
481 if line.strip() != "":
482 pipe.send(line)
483 return container
486def md5(fname):
487 """credit:
488 https://stackoverflow.com/a/3431838
489 """
490 hash_md5 = hashlib.md5()
491 with open(fname, "rb") as f:
492 for chunk in iter(lambda: f.read(4096), b""):
493 hash_md5.update(chunk)
494 return hash_md5.hexdigest()
497def read_buffer(fh, progress=True):
498 # skip _split_filename
499 start = time.time()
500 nbprocs = 1
501 nb_rows = sum(1 for line in fh)
502 fh.seek(0)
503 fhs = [fh]
504 res = _read(nbprocs, nb_rows, fhs, start, progress, source="buffer")
505 return res
508def read_bulk(filename, nbprocs="auto", progress=True):
509 """
510 main function to parse a NASTRAN bulk file.
511 An existing container may be provided. If not, a new container will be created and
512 returned
513 """
514 filename = os.path.expanduser(filename)
516 start = time.time()
517 # ========================================================================
518 # prepare multi-process parsing
519 # ========================================================================
520 nbprocs, nb_total, fhs = _split_filename(filename, nbprocs)
521 return _read(
522 nbprocs, nb_total, fhs, start, progress, source=os.path.abspath(filename)
523 )
526def _read(nbprocs, nb_total, data, start, progress, source):
527 # =========================================================================
528 # # summarize first and last lines
529 # for fh in data:
530 # lines = fh.readlines()
531 # print(lines[0], lines[-1])
532 # fh.seek(0)
533 # append data ID
534 data = [(datai, i, progress) for i, datai in enumerate(data)]
535 # ========================================================================
536 # parse file
537 # ========================================================================
539 if progress:
540 import tqdm.autonotebook as tqdm
542 _progress = tqdm.tqdm
543 else:
544 _progress = lambda x, total=None, desc=None: x
546 if nbprocs == 1:
547 res = [_process_file(data[0])]
548 else:
549 with Pool(nbprocs) as pool:
550 res = list(
551 _progress(
552 pool.imap(_process_file, data), total=len(data), desc="parsing bulk"
553 )
554 )
555 stop = time.time()
556 delta = stop - start
557 # ------------------------------------------------------------------------
558 # add a few metadata
559 res[0][META.title].update(
560 {
561 "source": source,
562 # "source_md5": md5(filename),
563 "nbprocs": nbprocs,
564 "nbrows": nb_total,
565 "elapsed": round(delta, 3),
566 }
567 )
568 # ========================================================================
569 # return either a Registry, either raw results
570 # ========================================================================
571 msg = f"processed {nb_total:,} lines in {delta:.2f} sec."
572 logging.info(msg)
573 return res