Coverage for utils.py: 53%
145 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 20:51 +0100
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 20:51 +0100
1"""Generic helpers module for developers"""
2import datetime as dt
3import logging
4import os
5import time
6from collections import defaultdict
7from itertools import chain
9import numpy as np
11try:
12 from deepdiff import DeepDiff
14 ISDEEPDIFF = True
15except ImportError:
16 ISDEEPDIFF = False
19def project_point_fast(point, ga, gb):
20 """
21 return a tuple (<projection point>, <`t`>)
23 >>> ga = np.array([1.5, 5.5, 0])
24 >>> gb = np.array([6, -1.5, 0])
25 >>> proj, t= project_point_fast(np.array([9, -1, 0]), ga, gb)
27 `proj` is the calcualted projected point:
28 >>> proj
29 array([ 6.64981949, -2.51083032, 0. ])
31 `t` is the gagb range factor:
32 * t<0: projection is 'before' ga
33 * t>1, projection is 'after' gb.
34 >>> t
35 1.144404332129964
36 """
37 gagb_vector = gb - ga
38 t = np.sum((point - ga) * gagb_vector) / np.sum(gagb_vector**2)
39 projection = ga + t * gagb_vector
40 return projection, t
43def project_point(point, ga, gb, strategy="on_extend", return_t=False):
44 r"""
45 project a point onto a segment [sega, segb] with different strategies:
47 * 'strict': calculate projection only if projected point lies between
48 ga and gb.
49 * 'on_extend': if projection is "outside" of the segment,
50 calculate its position on the "extended" segment.
51 * 'on_end': if projection is "outside" of the segment,
52 calculate the projection as beeing the closest extremity (ga or gb)
53 of the theoretical projection.
55 + ga
56 \
57 \
58 \ + point
59 +
60 \
61 + gb
63 All of those strategies leads to the same result if the projection is
64 between [ga, gb]
66 >>> ga = np.array([1.5, 5.5, 0])
67 >>> gb = np.array([6, -1.5, 0])
70 For a point whose projection lies on ga, gb segment, strategy is skipped:
72 >>> project_point(np.array([2, 1.5, 0]), ga, gb)
73 array([3.46570397, 2.44223827, 0. ])
76 >>> project_point(np.array([9, -1, 0]), ga, gb, strategy='strict')
77 Traceback (most recent call last):
78 ...
79 ValueError: point [9, -1, 0] does not project onto ga-gb line segment
81 >>> project_point(np.array([9, -1, 0]), ga, gb, strategy='on_extend')
82 array([ 6.64981949, -2.51083032, 0. ])
84 >>> project_point(np.array([9, -1, 0]), ga, gb, strategy='on_end')
85 array([ 6. , -1.5, 0. ])
86 """
87 # distance between ga and gb
88 projection, t = project_point_fast(point, ga, gb)
89 has_changed = False
90 if strategy != "on_extend" and (t > 1 or t < 0):
91 if strategy == "strict":
92 raise ValueError(
93 f"point {point.tolist()} does not project onto ga-gb line segment"
94 )
95 elif strategy == "on_end":
96 # if you need the point to project on line segment between ga and gb or closest point of the line segment
97 has_changed = True
98 t = max(0, min(1, t))
99 if has_changed:
100 gagb_vector = gb - ga
101 projection = ga + t * gagb_vector
102 # distance = np.sum((projection - point) ** 2)
103 if not return_t:
104 return projection
105 return projection, t
108def calcdiff(args, exclude_meta=True, **kwargs):
109 d1, d2 = args
110 if not ISDEEPDIFF:
111 msg = "optional DeepDiff lib is not installed"
112 raise RuntimeError(msg)
113 else:
114 if exclude_meta:
115 exclude_paths = ["root['meta']", "root['meta']"]
116 else:
117 exclude_paths = None
118 diff = DeepDiff(
119 d1,
120 d2,
121 exclude_regex_paths=[
122 r"root\['_.*'\]",
123 r"root\[.*'\]\['_.*'\]",
124 r"_cached.*",
125 ],
126 exclude_paths=exclude_paths,
127 ignore_type_in_groups=((dict, defaultdict), (list, tuple)),
128 ignore_nan_inequality=True,
129 **kwargs,
130 )
131 return diff
134class Chronos:
135 """
136 Easily sequence timing in yout function:
138 >>> c = Chronos()
139 >>> c.click('hello world')
140 hello world: 0:00:00.0...
141 >>> time.sleep(0.05)
142 >>> c.click('another event')
143 another event: 0:00:00.05...
144 """
146 def __init__(self):
147 self._starts = dt.datetime.now()
148 self._events = [{"abstime": self._starts, "reltime": 0, "event": "starts"}]
150 def click(self, event):
151 now = dt.datetime.now()
152 reltime = now - self._events[0]["abstime"]
153 self._events.append({"abstime": now, "reltime": reltime, "event": event})
154 print("%s: %s" % (event, reltime))
157# ============================================================================
158# numpy structured arrays
159# ============================================================================
160def dic2array(data):
161 """
162 Convert a dictionnary to numpy structured array.
164 >>> data ={'x': [1.5, 3., 1., 9.7],
165 ... 'y': [2.5, 4, None, 5.2],
166 ... 'z': [1, 1, 0, 0],
167 ... 'w': ['A', 'd', 'B', None],
168 ... 'v': [1, 2, None, None],
169 ... 'u': [None, None, None, None]
170 ... }
171 >>> dic2array(data)
172 array([(1.5, 2.5, 1, 'A', 1., nan), (3. , 4. , 1, 'd', 2., nan),
173 (1. , nan, 0, 'B', nan, nan), (9.7, 5.2, 0, 'N', nan, nan)],
174 dtype=[('x', '<f8'), ('y', '<f8'), ('z', '<i8'), ('w', '<U1'), ('v', '<f8'), ('u', '<f8')])
175 >>> import pandas as pd
176 >>> pd.DataFrame(dic2array(data))
177 x y z w v u
178 0 1.5 2.5 1 A 1.0 NaN
179 1 3.0 4.0 1 d 2.0 NaN
180 2 1.0 NaN 0 B NaN NaN
181 3 9.7 5.2 0 N NaN NaN
182 >>> pd.DataFrame(dic2array(data)).dtypes
183 x float64
184 y float64
185 z int64
186 w object
187 v float64
188 u float64
189 dtype: object
190 """
191 values = []
192 names = []
193 formats = []
194 type_None = type(None)
195 for k, varray in data.items():
196 types = {type(i) for i in varray}
197 if str in types:
198 formats.append("<U1")
199 elif type_None in types or float in types or np.float64 in types:
200 formats.append("<f8")
201 elif int in types or np.int64 in types:
202 formats.append("<i8")
203 else:
204 breakpoint()
205 formats.append("<U1")
206 values.append(tuple(varray))
207 names.append(k)
208 return np.array(list(zip(*values)), dtype={"names": names, "formats": formats})
211def dic2array_legacy(data, nb_entries=None, None2NaN=True):
212 """
213 Convert a dictionnary to numpy structured array.
215 >>> data ={'x': [1.5, 3., 1., 9.7],
216 ... 'y': [2.5, 4, None, 5.2],
217 ... 'z': [1, 1, 0, 0]}
218 >>> dic2array(data)
219 array([(1.5, 2.5, 1), (3. , 4. , 1), (1. , nan, 0), (9.7, 5.2, 0)],
220 dtype=[('x', '<f8'), ('y', '<f8'), ('z', '<i8')])
221 >>> data # data has not been modified
222 {'x': [1.5, 3.0, 1.0, 9.7], 'y': [2.5, 4, None, 5.2], 'z': [1, 1, 0, 0]}
223 """
224 data = {k: v.copy() for k, v in data.items()}
225 if not nb_entries:
226 krandom = list(data.keys())[0] # pick-up a random key
227 nb_entries = len(data[krandom])
228 values = []
229 for ix in range(nb_entries):
230 col = []
231 for fieldname in data.keys():
232 val = data[fieldname][ix]
233 if val is None and None2NaN:
234 val = data[fieldname][ix] = np.NaN
235 col.append(val)
236 values.append(tuple(col))
237 _data = {k: np.array(v) for k, v in data.items()}
238 formats = [a.dtype.str for a in _data.values()]
239 names = [k for k in data.keys()]
241 array = np.array(values, dtype={"names": names, "formats": formats})
242 return array
245def array2dic(array, astype=None):
246 """convert a structured array to regular dictionnary.
247 If `astype` is not provided, returned values remain numpy 1D arrays
248 >>> data ={'x': [1.5, 3., 1., 9.7],
249 ... 'y': [2.5, 4, 3, 5.2],
250 ... 'z': [1, 1, 0, 0]}
251 >>> array2dic(dic2array(data), astype=list)
252 {'x': [1.5, 3.0, 1.0, 9.7], 'y': [2.5, 4.0, 3.0, 5.2], 'z': [1, 1, 0, 0]}
253 """
254 ret = {}
255 if not astype:
256 for col in array.dtype.names:
257 ret[col] = array[col]
258 else:
259 for col in array.dtype.names:
260 ret[col] = astype(array[col])
261 return ret
264def bunch_legacy(dic):
265 """given a dict {k: iterable}, return a set of all iterables
267 >>> bunch({'a': {4, 5, 6}, 'b': set((1, 4, 7))})
268 frozenset({1, 4, 5, 6, 7})
269 """
270 ret = set()
271 for s in dic.values():
272 if not hasattr(s, "__iter__"):
273 s = set((s,))
274 ret |= set(s)
275 return frozenset(ret)
278def bunch(dic):
279 """given a dict {k: iterable}, return a set of all iterables
281 >>> bunch({'a': {4, 5, 6}, 'b': set((1, 4, 7))})
282 frozenset({1, 4, 5, 6, 7})
283 """
284 return frozenset(chain.from_iterable(dic.values()))
287def object_attributes(obj, mode, blacklist=None):
288 """list object attributes of a given type"""
289 if not blacklist:
290 blacklist = ()
291 attrs = defaultdict(set)
292 for k, v in dir(obj):
293 if k in blacklist:
294 continue
295 if k.startswith("__"):
296 attrs["protected"].add(k)
297 elif k.startswith("_"):
298 attrs["private"].add(k)
299 else:
300 attrs["public"].add(k)
301 # ------------------------------------------------------------------------
302 if mode in attrs:
303 # public, private, protected
304 return sorted(list(attrs[mode]))
305 if mode == "both":
306 return sorted(list(attrs["public"] | attrs["private"]))
307 if mode == "all":
308 return sorted(list(attrs["public"] | attrs["private"] | attrs["protected"]))
309 raise KeyError(
310 f'mode {mode} shall be one of {"public", "private", "protected", "both", "all"}'
311 )
314def check_path(filename, name="file") -> None:
315 """checks that the file exists"""
316 try:
317 exists = os.path.exists(filename)
318 except TypeError:
319 msg = "cannot find %s=%r\n" % (name, filename)
320 raise TypeError(msg)
321 if not exists:
322 msg = "cannot find %s=%r\n%s" % (name, filename)
323 raise FileNotFoundError(msg)
326def transform_dict_of_list(data):
327 """transform a dict of list into a list of dict:
329 >>> data = {'MIDi': [1002, 1003, 1009],
330 ... 'SOUTi': ['NO', 'YES', 'YES'],
331 ... 'Ti': [0.018, 0.339, 0.018],
332 ... 'THETAi': [0.0, 0.0, 5.0]}
333 >>> expected = [{'MIDi': 1002, 'SOUTi': 'NO', 'Ti': 0.018, 'THETAi': 0.0},
334 ... {'MIDi': 1003, 'SOUTi': 'YES', 'Ti': 0.339, 'THETAi': 0.0},
335 ... {'MIDi': 1009, 'SOUTi': 'YES', 'Ti': 0.018, 'THETAi': 5.0}]
336 >>> transform_dict_of_list(data) == expected
337 True
338 """
339 ret = []
340 nb_items = len(data[next(iter(data.keys()))])
341 for item_no in range(nb_items):
342 _d = {}
343 for fieldname, seq in data.items():
344 _d[fieldname] = list(seq)[item_no]
345 ret.append(_d)
346 return ret
349if __name__ == "__main__":
350 import doctest
352 doctest.testmod(optionflags=doctest.ELLIPSIS)