Coverage for fss\starter\system\service\impl\user_service_impl.py: 66%

38 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-11 19:09 +0800

1"""User domain service impl""" 

2 

3from datetime import timedelta 

4from typing import Optional 

5 

6from fss.common.cache.cache import get_cache_client, Cache 

7from fss.common.config import configs 

8from fss.common.enum.enum import TokenTypeEnum 

9from fss.common.schema.schema import Token 

10from fss.common.service.impl.service_impl import ServiceImpl 

11from fss.common.util import security 

12from fss.common.util.security import verify_password 

13from fss.starter.system.enum.system import SystemResponseCode, SystemConstantCode 

14from fss.starter.system.exception.system import SystemException 

15from fss.starter.system.mapper.user_mapper import UserMapper 

16from fss.starter.system.model.user_do import UserDO 

17from fss.starter.system.schema.user_schema import UserQuery, LoginCmd 

18from fss.starter.system.service.user_service import UserService 

19 

20 

21class UserServiceImpl(ServiceImpl[UserMapper, UserDO], UserService): 

22 def __init__(self, mapper: UserMapper): 

23 super().__init__(mapper) 

24 

25 async def find_by_id(self, id: int) -> Optional[UserQuery]: 

26 """ 

27 Retrieval user through user id 

28 :param id: user id 

29 :return: user or none 

30 """ 

31 user_do = await self.mapper.select_by_id(id=id) 

32 if user_do: 

33 return UserQuery(**user_do.model_dump()) 

34 else: 

35 return None 

36 

37 async def login(self, loginCmd: LoginCmd) -> Token: 

38 """ 

39 Do log in 

40 :param loginCmd: loginCmd 

41 :return: access token and refresh token 

42 """ 

43 username: str = loginCmd.username 

44 userDO: UserDO = await self.mapper.get_user_by_username(username=username) 

45 if not userDO or not await verify_password(loginCmd.password, userDO.password): 

46 raise SystemException( 

47 SystemResponseCode.AUTH_FAILED.code, SystemResponseCode.AUTH_FAILED.msg 

48 ) 

49 access_token_expires = timedelta(minutes=configs.access_token_expire_minutes) 

50 refresh_token_expires = timedelta(minutes=configs.refresh_token_expire_minutes) 

51 access_token = await security.create_token( 

52 subject=userDO.id, 

53 expires_delta=access_token_expires, 

54 token_type=TokenTypeEnum.access, 

55 ) 

56 refresh_token = await security.create_token( 

57 subject=userDO.id, 

58 expires_delta=refresh_token_expires, 

59 token_type=TokenTypeEnum.refresh, 

60 ) 

61 token = Token( 

62 access_token=access_token, 

63 expired_at=int(access_token_expires.total_seconds()), 

64 token_type="bearer", 

65 refresh_token=refresh_token, 

66 re_expired_at=int(refresh_token_expires.total_seconds()), 

67 ) 

68 cache_client: Cache = await get_cache_client() 

69 await cache_client.set( 

70 f"{SystemConstantCode.USER_KEY.msg}{userDO.id}", 

71 access_token, 

72 access_token_expires, 

73 ) 

74 return token 

75 

76 

77def get_user_service() -> UserService: 

78 return UserServiceImpl(UserMapper(UserDO))