feat: Add redis util

This commit is contained in:
LWR
2022-10-29 18:34:28 +08:00
parent 21b5ddbe56
commit 1f93b92dc0
4 changed files with 85 additions and 2 deletions
+11 -2
View File
@@ -2,8 +2,10 @@ import sys
from loguru import logger
from starbot.core.datasource import DataSource
from starbot.exception.DataSourceException import DataSourceException
from .datasource import DataSource
from ..exception.DataSourceException import DataSourceException
from ..exception.RedisException import RedisException
from ..utils import redis
class StarBot:
@@ -56,3 +58,10 @@ class StarBot:
except DataSourceException as ex:
logger.error(ex.msg)
return
# 连接 Redis
try:
await redis.init()
except RedisException as ex:
logger.error(ex.msg)
return
+15
View File
@@ -0,0 +1,15 @@
"""
Redis 异常
"""
from .ApiException import ApiException
class RedisException(ApiException):
"""
Redis 异常
"""
def __init__(self, msg: str):
super().__init__()
self.msg = msg
+1
View File
@@ -5,5 +5,6 @@ from .CredentialNoSessdataException import *
from .DataSourceException import *
from .LiveException import *
from .NetworkException import *
from .RedisException import *
from .ResponseCodeException import *
from .ResponseException import *
+58
View File
@@ -0,0 +1,58 @@
from typing import Any, Union
import aioredis
from loguru import logger
from ..exception.RedisException import RedisException
from ..utils import config
__redis: aioredis.client.Redis
async def init():
global __redis
logger.info("开始连接 Redis 数据库")
host = config.get("REDIS_HOST")
port = config.get("REDIS_PORT")
db = config.get("REDIS_DB")
username = config.get("REDIS_USERNAME")
password = config.get("REDIS_PASSWORD")
__redis = aioredis.from_url(f"redis://{host}:{port}/{db}", username=username, password=password)
try:
await __redis.ping()
except Exception as ex:
raise RedisException(f"连接 Redis 数据库失败, 请检查是否启动了 Redis 服务或提供的配置中连接参数是否正确 {ex}")
logger.success("成功连接 Redis 数据库")
# String
async def get(key: str) -> str:
return str(await __redis.get(key))
async def geti(key: str) -> int:
return int(await __redis.get(key))
# Hash
async def hexists(key: str, hkey: Union[str, int]) -> bool:
return await __redis.hexists(key, hkey)
async def hget(key: str, hkey: Union[str, int]) -> str:
return str(await __redis.hget(key, hkey))
async def hgeti(key: str, hkey: Union[str, int]) -> int:
return int(await __redis.hget(key, hkey))
async def hset(key: str, hkey: Union[str, int], value: Any):
await __redis.hset(key, hkey, value)
async def hset_ifnotexists(key: str, hkey: Union[str, int], value: Any):
if not await hexists(key, hkey):
await hset(key, hkey, value)