Coverage for fss\common\util\snowflake.py: 72%
39 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-11 19:09 +0800
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-11 19:09 +0800
1"""Snowflake util to generate unique id"""
3import os
4import time
5from typing import Generator
7API_EPOCH = 1640995200000
9worker_id_bits = 5
10process_id_bits = 5
11max_worker_id = -1 ^ (-1 << worker_id_bits)
12max_process_id = -1 ^ (-1 << process_id_bits)
13sequence_bits = 12
14process_id_shift = sequence_bits + worker_id_bits
15worker_id_shift = sequence_bits
16timestamp_left_shift = sequence_bits + worker_id_bits + process_id_bits
17sequence_mask = -1 ^ (-1 << sequence_bits)
20def snowflake_to_timestamp(_id):
21 _id = _id >> 22
22 _id += API_EPOCH
23 _id /= 1000
24 return _id
27def generator(
28 worker_id: int = 1,
29 process_id: int = os.getpid() % 31,
30 sleep=lambda x: time.sleep(x),
31) -> Generator[int, None, None]:
32 assert 0 <= worker_id <= max_worker_id
33 assert 0 <= process_id <= max_process_id
35 last_timestamp = -1
36 sequence = 0
38 while True:
39 timestamp = int(time.time() * 1000)
41 if last_timestamp > timestamp:
42 sleep(last_timestamp - timestamp)
43 continue
45 if last_timestamp == timestamp:
46 sequence = (sequence + 1) & sequence_mask
47 if sequence == 0:
48 sequence = -1 & sequence_mask
49 sleep(1)
50 continue
51 else:
52 sequence = 0
54 last_timestamp = timestamp
56 yield (
57 ((timestamp - API_EPOCH) << timestamp_left_shift)
58 | (process_id << process_id_shift)
59 | (worker_id << worker_id_shift)
60 | sequence
61 )
64def snowflake_id() -> int:
65 return next(generator())