Coverage for readers/bulk.py: 36%

350 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-20 20:51 +0100

1""" 

2First level module 

3""" 

4 

5import hashlib 

6import linecache 

7import logging 

8import os 

9import re 

10import time 

11from collections import defaultdict 

12from io import StringIO 

13from multiprocessing import Pool 

14from pprint import pprint as pp 

15 

16from nastranio import cards 

17from nastranio.constants import BULK, CASE, COMMENTS, EXEC, META, PARAMS, SUMMARY 

18from nastranio.pylib import autoconvert, autoconvert_array 

19 

20 

21def coroutine(func): 

22 def starter(*args, **kwargs): 

23 gen = func(*args, **kwargs) 

24 next(gen) 

25 return gen 

26 

27 return starter 

28 

29 

30def log(txt): 

31 pp(txt) 

32 

33 

34@coroutine 

35def route_line(card, comment): 

36 """route comments and regular lines 

37 to provided branches 

38 """ 

39 in_commentblock = False 

40 while True: 

41 line = yield 

42 if line.startswith("$"): 

43 comment.send(line) 

44 if not in_commentblock: 

45 # open new comment block 

46 in_commentblock = True 

47 else: 

48 # pipe data to regular branch 

49 card.send(line) 

50 if in_commentblock: 

51 # end of previous comment block 

52 in_commentblock = False 

53 # close previous comment block 

54 comment.send(-1) 

55 

56 

57# Unfold 

58@coroutine 

59def unfold(output): 

60 """ 

61 store and unfold continuated lines before 

62 sending to output""" 

63 block = [] 

64 BLOCK_TO_BE_SORTED = False 

65 LOGFILE = None #'/home/nic/unfold.log' 

66 while True: 

67 try: 

68 line = yield 

69 # split short-format style 

70 data = [line[i : i + 8].strip() for i in range(0, len(line), 8)] 

71 nb_fields = len(data) 

72 # by default: single line 

73 # IS_CONTINUING means "current line continues block[-1]" 

74 IS_CONTINUING = False 

75 # =============================================== 

76 # check for continuation 

77 # NX Quick reference guide, page 983 

78 # =============================================== 

79 if len(block) > 0: 

80 if len(block[-1]) == 10: 

81 # small Field Format 1 

82 # line(n-1, field 10) in {'+', '*'} 

83 # and line(n, field 1) == '+' 

84 if block[-1][9] in "*+" and data[0] == "+": 

85 IS_CONTINUING = True 

86 # small Field Format 2 

87 # continuation if both line(n-1, field 10) a 

88 # and line(n, field 1) are blank 

89 elif block[-1][9] == "" and data[0] == "": 

90 IS_CONTINUING = True 

91 # small Field Format 3 

92 # continuation if both line(n-1, field 10) a 

93 # and line(n, field 1) are blank 

94 elif block[-1][9] != "" and data[0] != "": 

95 IS_CONTINUING = True 

96 BLOCK_TO_BE_SORTED = True 

97 else: 

98 # len(block[-1]) != 10: 

99 # line n-1 was clipped after n-fields 

100 # we therefore need to assume blank 10th field) 

101 # FORMAT 2 

102 if data[0] == "": 

103 IS_CONTINUING = True 

104 # ----------------------------------------------- 

105 if IS_CONTINUING: 

106 block.append(data) 

107 else: 

108 # end of block. flush it! 

109 if len(block) > 0: 

110 unfolded = _unfold(block, data, BLOCK_TO_BE_SORTED, LOGFILE) 

111 BLOCK_TO_BE_SORTED = False 

112 output.send(unfolded) 

113 block = [] 

114 # and fill it with new line 

115 block.append(data) 

116 except GeneratorExit: 

117 # flush remaining data 

118 if data: 

119 unfolded = _unfold(block, data, BLOCK_TO_BE_SORTED, LOGFILE) 

120 output.send(unfolded) 

121 raise 

122 

123 

124def _unfold(block, data, BLOCK_TO_BE_SORTED, LOGFILE): 

125 if len(block) > 2 and BLOCK_TO_BE_SORTED: 

126 # need to sort lines as per FORMAT 3 

127 sorted_block = block[:1] # 1st line ok 

128 cont_char = block[0][-1] 

129 # build a dict with cont'd chars as keys 

130 datadic = {data[0]: data for data in block[1:]} 

131 for i in data[1:]: 

132 if cont_char not in datadic: 

133 continue # this was last line 

134 next_line = datadic[cont_char] 

135 cont_char = next_line[-1] 

136 sorted_block.append(next_line) 

137 block = sorted_block 

138 unfolded = [] 

139 for row in block: 

140 unfolded += row 

141 if LOGFILE: 

142 with open(LOGFILE, "a") as fh: 

143 fh.write(" ".join(unfolded) + "\n") 

144 return unfolded 

145 

146 

147@coroutine 

148def comments_sink(output, container): 

149 """collect block of comments 

150 a block ends when data == -1 

151 """ 

152 block = [] 

153 while True: 

154 data = yield 

155 if data == -1: 

156 output.send("\n".join(block)) 

157 container[COMMENTS.title].append("\n".join(block)) 

158 block = [] 

159 else: 

160 block.append(data) 

161 

162 

163@coroutine 

164def printer(prefix="", suffix=""): 

165 while True: 

166 line = yield 

167 msg = "{prefix}{line}{suffix}".format(prefix=prefix, line=line, suffix=suffix) 

168 print(msg, end="") 

169 

170 

171@coroutine 

172def void(): 

173 while True: 

174 line = yield 

175 

176 

177@coroutine 

178def counter(output, name, callback=None): 

179 _nb_packets = 0 

180 _packets = [] 

181 try: 

182 while True: 

183 data = yield 

184 _nb_packets += 1 

185 output.send(data) 

186 _packets.append(data) 

187 except GeneratorExit: 

188 # send the last received line 

189 # output.send(data) 

190 # print('counter [%s]: %d' % (name, i)) 

191 if callback: 

192 return callback(name, _nb_packets, _packets) 

193 return 

194 

195 

196@coroutine 

197def strip(output, chars=None): 

198 while True: 

199 data = yield 

200 output.send(data.strip(chars)) 

201 

202 

203@coroutine 

204def route_section(exec, case, bulk, bulk_only=False): 

205 if bulk_only: 

206 SECTION = "BULK" 

207 else: 

208 SECTION = "EXEC" # will go through EXEC/CASE/BULK 

209 while True: 

210 line = yield 

211 line_ = line.strip() 

212 if line_.startswith("ENDDATA"): 

213 continue 

214 if line == "CEND": 

215 # switch to CASE 

216 SECTION = "CASE" 

217 continue 

218 elif line_ == "BEGIN BULK": 

219 # switch to BULK 

220 SECTION = "BULK" 

221 continue 

222 if SECTION == "EXEC": 

223 exec.send(line) 

224 elif SECTION == "CASE": 

225 case.send(line) 

226 elif SECTION == "BULK": 

227 bulk.send(line) 

228 

229 

230@coroutine 

231def process_bulk(output, container): 

232 while True: 

233 line = yield 

234 line_ = line.strip() 

235 if line_.startswith("PARAM"): 

236 _, key, value = line_.split(",") 

237 container[PARAMS.title][key] = autoconvert(value) 

238 continue 

239 # regular bulk line 

240 output.send(line) 

241 

242 

243@coroutine 

244def process_exec(output, container): 

245 while True: 

246 line = yield 

247 line_ = line.strip() 

248 key, *value = line_.split(" ") 

249 container[EXEC.title][key] = autoconvert(" ".join(value)) 

250 

251 

252@coroutine 

253def process_case(output, container): 

254 SUBCASEKEY = "default" # default section preceeds all subcases definition 

255 SUBCASEID = -1 

256 while True: 

257 line = yield 

258 # save all the `CASE` lines 

259 # interpret 

260 if line.strip().startswith("SUBCASE"): 

261 SUBCASEID = int(line.split()[-1].strip()) 

262 SUBCASEKEY = "SUBCASE %d" % SUBCASEID 

263 continue 

264 # sometimes, "TITLE = Callback Val=0" 

265 param, *value = [txt.strip() for txt in line.split("=")] 

266 if not SUBCASEKEY in container[CASE.title]: 

267 container[CASE.title][SUBCASEKEY] = {"id": SUBCASEID} 

268 try: 

269 container[CASE.title][SUBCASEKEY][param] = autoconvert("=".join(value)) 

270 except ValueError: 

271 # eg. SET 1 = 1, 1003 

272 logging.warning(f'cannot process param "{param} = {value}"') 

273 continue 

274 

275 

276@coroutine 

277def register_card(output, container): 

278 bulk = container[BULK.title] 

279 summary = container[SUMMARY.title] 

280 while True: 

281 fields = yield 

282 # remove card name from fields 

283 try: 

284 card_name, fields = fields[0], fields[1:] 

285 except IndexError: 

286 logging.critical("cannot dump fields %s", fields) 

287 raise 

288 container_card_entry = bulk.get(card_name) 

289 if not container_card_entry: 

290 # get the <CARD> object from cards module, and instantiate: 

291 card_inst = cards.__dict__.get(card_name, cards.DefaultCard)(name=card_name) 

292 # prepare a container in bulk 

293 bulk[card_name] = card_inst 

294 summary[card_inst.type].add(card_name) 

295 if hasattr(card_inst, "dim"): 

296 summary[card_inst.dim].add(card_name) 

297 if hasattr(card_inst, "shape"): 

298 summary[card_inst.shape].add(card_name) 

299 container_card_entry = bulk.get(card_name) 

300 try: 

301 fields = autoconvert_array(fields) # [autoconvert(f) for f in fields] 

302 except ValueError: 

303 raise ValueError(f"cannot convert {container_card_entry.name}: {fields=}") 

304 container_card_entry.append_fields_list(fields) 

305 

306 

307def get_nb_lines(filename): 

308 with open(filename, "r") as fh: 

309 for nb_rows, l in enumerate(fh): 

310 pass 

311 nb_rows += 1 

312 return nb_rows 

313 

314 

315def _split_filename(filename, nbprocs): 

316 """split a file and returns a list of dicts like: 

317 

318 [{'slice': 0, 'nb_lines': 215974, 'fh': <_io.StringIO at 0x7f8c069b3168>}, 

319 {'slice': 1, 'nb_lines': 215975, 'fh': <_io.StringIO at 0x7f8c069c3a68>}, 

320 {'slice': 2, 'nb_lines': 215975, 'fh': <_io.StringIO at 0x7f8c069c3e58>}, 

321 {'slice': 3, 'nb_lines': 215977, 'fh': <_io.StringIO at 0x7f8c069d7438>}] 

322 

323 The function makes sure to split the file right above a new card entry. 

324 """ 

325 

326 OKREGEX = re.compile(r"^(\$)?[A-Z]+\d*") # can wwe split above? 

327 nb_rows = get_nb_lines(filename) 

328 # calculate nb of procs depending on nb of lines 

329 if nbprocs == "auto": 

330 if nb_rows <= 100: 

331 nbprocs = 1 

332 elif nb_rows <= 500: 

333 nbprocs = 2 

334 elif nb_rows <= 1000: 

335 nbprocs = 3 

336 elif nb_rows <= 10000: 

337 nbprocs = 4 

338 elif nb_rows <= 100000: 

339 nbprocs = 8 

340 else: 

341 nbprocs = 16 

342 

343 logging.info("automatically set nb of process to %d", nbprocs) 

344 # ------------------------------------------------------------------------ 

345 # first split SHALL go at least up to "CEND" 

346 with open(filename, "r") as fh: 

347 for cend_line, line in enumerate(fh): 

348 if "CEND" in line: 

349 break 

350 # ------------------------------------------------------------------------ 

351 # prepare the split 

352 logging.info("split a %d lines file to %d chunks", nb_rows, nbprocs) 

353 nblines_per_split = nb_rows // nbprocs 

354 targets = [] 

355 last = 0 

356 for splitnb in range(nbprocs)[:-1]: 

357 last = splitnb * nblines_per_split + nblines_per_split 

358 # ensure that CEND is included in first shot 

359 if splitnb == 0 and last < cend_line: 

360 last = cend_line 

361 while True: 

362 previous = linecache.getline(filename, last - 1) 

363 target = linecache.getline(filename, last) 

364 # append previous line as target row nb 

365 if OKREGEX.match(target) and not previous.startswith("$"): 

366 targets.append(last - 1) # split above 

367 break 

368 last += 1 

369 targets.append(nb_rows) 

370 fhs = [] 

371 prevstop = 0 

372 nextstop = targets.pop(0) 

373 fh_buffer = StringIO() 

374 buffer = [] 

375 slicenb = 0 

376 rel_line_nb = 0 

377 with open(filename, "r") as fh: 

378 for linenb, line in enumerate(fh): 

379 if linenb == nextstop: 

380 # we reached the last line of current buffer 

381 # current line will be the first of next split 

382 buffer_id = len(fhs) 

383 fh_buffer.writelines(buffer) 

384 fh_buffer.seek(0, 0) 

385 fhs.append(fh_buffer) 

386 fh_buffer = StringIO() 

387 # flush buffer 

388 buffer = [] 

389 slicenb += 1 

390 rel_line_nb = 0 

391 try: 

392 prevstop = nextstop 

393 nextstop = targets.pop(0) 

394 except: 

395 # last run... 

396 pass 

397 buffer.append(line) 

398 rel_line_nb += 1 

399 fh_buffer.writelines(buffer) 

400 fh_buffer.seek(0, 0) 

401 fhs.append(fh_buffer) 

402 logging.info("prepared %d jobs" % len(fhs)) 

403 

404 return nbprocs, nb_rows, fhs 

405 

406 

407def _process_file(filehdata): 

408 """ 

409 Pipeline creation and burning 

410 """ 

411 # prepare and advanced counter reporting for cards 

412 fileh, fileh_nb, progress = filehdata 

413 

414 def cnt_detailed_callback(name, nb_packets, packets): 

415 if len(packets) > 0: 

416 if isinstance(packets[0], str): 

417 first = packets[0][:17] 

418 elif isinstance(packets[0], list): 

419 first = " ".join(packets[0][:2]) 

420 else: 

421 first = packets[0] 

422 if isinstance(packets[-1], str): 

423 last = packets[-1][:17] 

424 elif isinstance(packets[-1], list): 

425 last = " ".join(packets[-1][:2]) 

426 else: 

427 last = packets[-1] 

428 msg = f'counter [{name}] {nb_packets} items:: "{first}" -> "{last}"' 

429 else: 

430 first = None 

431 last = None 

432 msg = f"counter [{name}] {nb_packets}" 

433 logging.info(msg) 

434 

435 counter_callback = None 

436 # ======================================================================== 

437 # initialize container 

438 # ======================================================================== 

439 container = {} 

440 for section in (EXEC, PARAMS, COMMENTS, BULK, CASE, META, SUMMARY): 

441 container[section.title] = section.type(*section.args) 

442 # ======================================================================== 

443 # build pipeline 

444 # ======================================================================== 

445 # end-points of pipeline: 

446 null = void() 

447 pr1 = printer(suffix="\n\n") 

448 pr2 = printer(prefix="BULK: ", suffix="\n\n") 

449 # ------------------------------------------------------------------------ 

450 # main branches: 

451 comments = counter( 

452 comments_sink(null, container), f"Buffer #{fileh_nb} comments", callback=None 

453 ) 

454 

455 regular = route_section( 

456 exec=process_exec(void, container), 

457 case=process_case(void, container), 

458 bulk=process_bulk( 

459 unfold( 

460 counter( 

461 register_card(null, container), 

462 f"Buffer #{fileh_nb} cards", 

463 callback=cnt_detailed_callback, 

464 ) 

465 ), 

466 container, 

467 ), # / process_bulk 

468 bulk_only=fileh_nb > 0, 

469 ) 

470 

471 # ------------------------------------------------------------------------ 

472 # the whole pipeline (except for pid) 

473 pipe = counter( 

474 strip(route_line(regular, comments), "\n"), 

475 f"Buffer #{fileh_nb} all packets", 

476 callback=cnt_detailed_callback, 

477 ) 

478 

479 # pump data into the pipeline 

480 for line in fileh: 

481 if line.strip() != "": 

482 pipe.send(line) 

483 return container 

484 

485 

486def md5(fname): 

487 """credit: 

488 https://stackoverflow.com/a/3431838 

489 """ 

490 hash_md5 = hashlib.md5() 

491 with open(fname, "rb") as f: 

492 for chunk in iter(lambda: f.read(4096), b""): 

493 hash_md5.update(chunk) 

494 return hash_md5.hexdigest() 

495 

496 

497def read_buffer(fh, progress=True): 

498 # skip _split_filename 

499 start = time.time() 

500 nbprocs = 1 

501 nb_rows = sum(1 for line in fh) 

502 fh.seek(0) 

503 fhs = [fh] 

504 res = _read(nbprocs, nb_rows, fhs, start, progress, source="buffer") 

505 return res 

506 

507 

508def read_bulk(filename, nbprocs="auto", progress=True): 

509 """ 

510 main function to parse a NASTRAN bulk file. 

511 An existing container may be provided. If not, a new container will be created and 

512 returned 

513 """ 

514 filename = os.path.expanduser(filename) 

515 

516 start = time.time() 

517 # ======================================================================== 

518 # prepare multi-process parsing 

519 # ======================================================================== 

520 nbprocs, nb_total, fhs = _split_filename(filename, nbprocs) 

521 return _read( 

522 nbprocs, nb_total, fhs, start, progress, source=os.path.abspath(filename) 

523 ) 

524 

525 

526def _read(nbprocs, nb_total, data, start, progress, source): 

527 # ========================================================================= 

528 # # summarize first and last lines 

529 # for fh in data: 

530 # lines = fh.readlines() 

531 # print(lines[0], lines[-1]) 

532 # fh.seek(0) 

533 # append data ID 

534 data = [(datai, i, progress) for i, datai in enumerate(data)] 

535 # ======================================================================== 

536 # parse file 

537 # ======================================================================== 

538 

539 if progress: 

540 import tqdm.autonotebook as tqdm 

541 

542 _progress = tqdm.tqdm 

543 else: 

544 _progress = lambda x, total=None, desc=None: x 

545 

546 if nbprocs == 1: 

547 res = [_process_file(data[0])] 

548 else: 

549 with Pool(nbprocs) as pool: 

550 res = list( 

551 _progress( 

552 pool.imap(_process_file, data), total=len(data), desc="parsing bulk" 

553 ) 

554 ) 

555 stop = time.time() 

556 delta = stop - start 

557 # ------------------------------------------------------------------------ 

558 # add a few metadata 

559 res[0][META.title].update( 

560 { 

561 "source": source, 

562 # "source_md5": md5(filename), 

563 "nbprocs": nbprocs, 

564 "nbrows": nb_total, 

565 "elapsed": round(delta, 3), 

566 } 

567 ) 

568 # ======================================================================== 

569 # return either a Registry, either raw results 

570 # ======================================================================== 

571 msg = f"processed {nb_total:,} lines in {delta:.2f} sec." 

572 logging.info(msg) 

573 return res