Coverage for readers/gmsh/mixins.py: 12%

82 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-20 20:51 +0100

1import logging 

2from functools import wraps 

3 

4from numtools.intzip import hzip 

5 

6 

7class Modificator: 

8 """GmsHParser mixin class dedicated to modification""" 

9 

10 def post_init(self): 

11 if not hasattr(self, "_locked_gids"): 

12 self._locked_gids = set() 

13 

14 def lock_gids(self, gids): 

15 if isinstance(gids, (int, float)): 

16 gids = set((gids,)) 

17 self._locked_gids |= set(gids) 

18 

19 def renumber_nodes(self, /, mapping_in, lockin): 

20 """renumber nodes based on a mapping_in {current_node_id -> new_node_id}""" 

21 self.post_init() 

22 

23 booked_gids = set(self.nodes) 

24 

25 def _get_free_id(): 

26 newgid = max(booked_gids) + 1 

27 booked_gids.add(newgid) 

28 return newgid 

29 

30 # discard non-existing nodes from requirements 

31 mapping_in = {n1: n2 for n1, n2 in mapping_in.items() if n1 in self.nodes} 

32 logging.info(f"requested {mapping_in=}") 

33 # discard locked nodes 

34 forbidden = set(mapping_in) & self._locked_gids 

35 if forbidden: 

36 logging.warning( 

37 f"gids {hzip(forbidden)} are locked. They won't be renumbered" 

38 ) 

39 mapping_in = {n1: n2 for n1, n2 in mapping_in.items() if n1 not in forbidden} 

40 # split dic_target into mapping_in , dic_back such 

41 buffer = {} 

42 temp_way_back = {} 

43 for n_source, n_target in mapping_in.items(): 

44 intermediate_gid = _get_free_id() 

45 buffer[n_source] = intermediate_gid 

46 temp_way_back[intermediate_gid] = n_target 

47 if n_target in self.nodes: 

48 if n_target not in buffer: 

49 intermediate_gid = _get_free_id() 

50 buffer[n_target] = intermediate_gid 

51 # temp_way_back[intermediate_gid] = intermediate_gid # we may forget this one 

52 else: 

53 pass 

54 way_back = {} 

55 for k, v in temp_way_back.items(): 

56 if k not in buffer.values(): 

57 continue 

58 way_back[k] = v 

59 assert sorted(list(set(way_back.values()))) == sorted(list(way_back.values())) 

60 assert sorted(list(set(buffer.values()))) == sorted(list(buffer.values())) 

61 mapping = {} 

62 for k, v in buffer.copy().items(): 

63 mapping[k] = way_back.get(v, v) 

64 # if mapping[k] == k: 

65 # buffer.pop(k, None) 

66 # way_back.pop(k, None) 

67 # mapping.pop(k, None) 

68 self._renumber_apply_mapping(buffer) 

69 self._renumber_apply_mapping(way_back) 

70 # provide appropriate executed mapping 

71 if lockin: 

72 to_lock = set(mapping_in.values()) 

73 self._locked_gids |= to_lock 

74 logging.info(f"locked gids {hzip(to_lock)}") 

75 return mapping 

76 

77 def _renumber_apply_mapping(self, dic): 

78 # reassign elements 

79 # print(f"apply {dic}") 

80 elts_to_modify = set() 

81 modified_elts = set() 

82 for n1 in dic: 

83 elts_to_modify = self.gid2eids[n1] 

84 # logging.info(f"node {n1} is used by {elts_to_modify}") 

85 for eid in elts_to_modify: 

86 elt = self.elements[eid] 

87 ix = elt.gids.index(n1) 

88 gids = list(elt.gids) 

89 gids[ix] = dic[n1] 

90 elt = elt._replace(gids=tuple(gids)) 

91 self.elements[eid] = elt 

92 modified_elts.add(eid) 

93 # logging.info(f"modified elements {modified_elts}") 

94 # renumber nodes 

95 nodes_buffer = {} 

96 for gid, newgid in dic.items(): 

97 node = self.nodes.pop(gid) 

98 node = node._replace(gid=newgid) 

99 nodes_buffer[newgid] = node 

100 self.nodes.update(nodes_buffer) 

101 self._make_gid_eid_type() 

102 self._make_eid_gid_physicalGroups() 

103 return modified_elts 

104 

105 def renumber_nodes_by_grp(self, physical_name, startat=1, lockin=False): 

106 gids = self.physical_group2gids[physical_name] 

107 targets = list(range(startat, startat + len(gids) + 1)) 

108 mapping = dict(zip(gids, targets)) 

109 return self.renumber_nodes(mapping_in=mapping, lockin=lockin) 

110 

111 def renumber_nodes_offset(self, offset, lockin=False): 

112 mapping = {k: k + offset for k in self.nodes} 

113 return self.renumber_nodes(mapping_in=mapping, lockin=lockin)