feat: MySQL datasource dynamically adds user and dynamically reloads push targets support
This commit is contained in:
+140
-63
@@ -91,6 +91,24 @@ class DataSource(metaclass=abc.ABCMeta):
|
|||||||
raise DataSourceException(f"不存在的 UID: {uid}")
|
raise DataSourceException(f"不存在的 UID: {uid}")
|
||||||
return up
|
return up
|
||||||
|
|
||||||
|
def get_bot(self, qq: int) -> Bot:
|
||||||
|
"""
|
||||||
|
根据 QQ 获取 Bot 实例
|
||||||
|
|
||||||
|
Args:
|
||||||
|
qq: 需要获取 Bot 的 QQ
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Bot 实例
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
DataSourceException: QQ 不存在
|
||||||
|
"""
|
||||||
|
bot = next((b for b in self.bots if b.qq == qq), None)
|
||||||
|
if bot is None:
|
||||||
|
raise DataSourceException(f"不存在的 QQ: {qq}")
|
||||||
|
return bot
|
||||||
|
|
||||||
def get_ups_by_target(self, target_id: int, target_type: PushType) -> List[Up]:
|
def get_ups_by_target(self, target_id: int, target_type: PushType) -> List[Up]:
|
||||||
"""
|
"""
|
||||||
根据推送目标号码和推送目标类型获取 Up 实例列表
|
根据推送目标号码和推送目标类型获取 Up 实例列表
|
||||||
@@ -272,6 +290,82 @@ class MySQLDataSource(DataSource):
|
|||||||
except pymysql.err.Error as ex:
|
except pymysql.err.Error as ex:
|
||||||
raise DataSourceException(f"从 MySQL 中读取配置时发生了错误 {ex}")
|
raise DataSourceException(f"从 MySQL 中读取配置时发生了错误 {ex}")
|
||||||
|
|
||||||
|
async def __load_targets(self, uid: int) -> List[PushTarget]:
|
||||||
|
"""
|
||||||
|
从 MySQL 中读取指定 UID 的推送配置
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid: 要读取配置的 UID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
推送目标列表
|
||||||
|
"""
|
||||||
|
live_on = await self.__query(
|
||||||
|
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` "
|
||||||
|
"FROM `groups` AS `g` LEFT JOIN `live_on` AS `l` "
|
||||||
|
"ON g.`uid` = l.`uid` AND g.`index` = l.`index` "
|
||||||
|
f"WHERE g.`uid` = {uid} "
|
||||||
|
"ORDER BY g.`index`"
|
||||||
|
)
|
||||||
|
live_off = await self.__query(
|
||||||
|
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` "
|
||||||
|
"FROM `groups` AS `g` LEFT JOIN `live_off` AS `l` "
|
||||||
|
"ON g.`uid` = l.`uid` AND g.`index` = l.`index` "
|
||||||
|
f"WHERE g.`uid` = {uid} "
|
||||||
|
"ORDER BY g.`index`"
|
||||||
|
)
|
||||||
|
live_report = await self.__query(
|
||||||
|
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, "
|
||||||
|
"`enabled`, `logo`, `logo_base64`, `time`, `fans_change`, `fans_medal_change`, `guard_change`, "
|
||||||
|
"`danmu`, `box`, `gift`, `sc`, `guard`, "
|
||||||
|
"`danmu_ranking`, `box_ranking`, `box_profit_ranking`, `gift_ranking`, `sc_ranking`, "
|
||||||
|
"`guard_list`, `box_profit_diagram`, `danmu_diagram`, `box_diagram`, `gift_diagram`, "
|
||||||
|
"`sc_diagram`, `guard_diagram`, `danmu_cloud` "
|
||||||
|
"FROM `groups` AS `g` LEFT JOIN `live_report` AS `l` "
|
||||||
|
"ON g.`uid` = l.`uid` AND g.`index` = l.`index` "
|
||||||
|
f"WHERE g.`uid` = {uid} "
|
||||||
|
"ORDER BY g.`index`"
|
||||||
|
)
|
||||||
|
dynamic_update = await self.__query(
|
||||||
|
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` "
|
||||||
|
"FROM `groups` AS `g` LEFT JOIN `dynamic_update` AS `d` "
|
||||||
|
"ON g.`uid` = d.`uid` AND g.`index` = d.`index` "
|
||||||
|
f"WHERE g.`uid` = {uid} "
|
||||||
|
"ORDER BY g.`index`"
|
||||||
|
)
|
||||||
|
|
||||||
|
targets = []
|
||||||
|
for i, target in enumerate(live_on):
|
||||||
|
if all((live_on[i]["enabled"], live_on[i]["message"])):
|
||||||
|
on = LiveOn(**live_on[i])
|
||||||
|
else:
|
||||||
|
on = LiveOn()
|
||||||
|
if all((live_off[i]["enabled"], live_off[i]["message"])):
|
||||||
|
off = LiveOff(**live_off[i])
|
||||||
|
else:
|
||||||
|
off = LiveOff()
|
||||||
|
if live_report[i]["enabled"]:
|
||||||
|
report = LiveReport(**live_report[i])
|
||||||
|
else:
|
||||||
|
report = LiveReport()
|
||||||
|
if all((dynamic_update[i]["enabled"], dynamic_update[i]["message"])):
|
||||||
|
update = DynamicUpdate(**dynamic_update[i])
|
||||||
|
else:
|
||||||
|
update = DynamicUpdate()
|
||||||
|
|
||||||
|
targets.append(
|
||||||
|
PushTarget(
|
||||||
|
id=target["num"],
|
||||||
|
type=target["type"],
|
||||||
|
live_on=on,
|
||||||
|
live_off=off,
|
||||||
|
live_report=report,
|
||||||
|
dynamic_update=update
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return targets
|
||||||
|
|
||||||
async def load(self):
|
async def load(self):
|
||||||
"""
|
"""
|
||||||
从 MySQL 中初始化配置
|
从 MySQL 中初始化配置
|
||||||
@@ -296,69 +390,7 @@ class MySQLDataSource(DataSource):
|
|||||||
for now_user in bot_users:
|
for now_user in bot_users:
|
||||||
uid = now_user.get("uid")
|
uid = now_user.get("uid")
|
||||||
|
|
||||||
live_on = await self.__query(
|
targets = await self.__load_targets(uid)
|
||||||
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` "
|
|
||||||
"FROM `groups` AS `g` LEFT JOIN `live_on` AS `l` "
|
|
||||||
"ON g.`uid` = l.`uid` AND g.`index` = l.`index` "
|
|
||||||
f"WHERE g.`uid` = {uid} "
|
|
||||||
"ORDER BY g.`index`"
|
|
||||||
)
|
|
||||||
live_off = await self.__query(
|
|
||||||
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` "
|
|
||||||
"FROM `groups` AS `g` LEFT JOIN `live_off` AS `l` "
|
|
||||||
"ON g.`uid` = l.`uid` AND g.`index` = l.`index` "
|
|
||||||
f"WHERE g.`uid` = {uid} "
|
|
||||||
"ORDER BY g.`index`"
|
|
||||||
)
|
|
||||||
live_report = await self.__query(
|
|
||||||
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, "
|
|
||||||
"`enabled`, `logo`, `logo_base64`, `time`, `fans_change`, `fans_medal_change`, `guard_change`, "
|
|
||||||
"`danmu`, `box`, `gift`, `sc`, `guard`, "
|
|
||||||
"`danmu_ranking`, `box_ranking`, `box_profit_ranking`, `gift_ranking`, `sc_ranking`, "
|
|
||||||
"`guard_list`, `box_profit_diagram`, `danmu_diagram`, `box_diagram`, `gift_diagram`, "
|
|
||||||
"`sc_diagram`, `guard_diagram`, `danmu_cloud` "
|
|
||||||
"FROM `groups` AS `g` LEFT JOIN `live_report` AS `l` "
|
|
||||||
"ON g.`uid` = l.`uid` AND g.`index` = l.`index` "
|
|
||||||
f"WHERE g.`uid` = {uid} "
|
|
||||||
"ORDER BY g.`index`"
|
|
||||||
)
|
|
||||||
dynamic_update = await self.__query(
|
|
||||||
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` "
|
|
||||||
"FROM `groups` AS `g` LEFT JOIN `dynamic_update` AS `d` "
|
|
||||||
"ON g.`uid` = d.`uid` AND g.`index` = d.`index` "
|
|
||||||
f"WHERE g.`uid` = {uid} "
|
|
||||||
"ORDER BY g.`index`"
|
|
||||||
)
|
|
||||||
|
|
||||||
targets = []
|
|
||||||
for i, target in enumerate(live_on):
|
|
||||||
if all((live_on[i]["enabled"], live_on[i]["message"])):
|
|
||||||
on = LiveOn(**live_on[i])
|
|
||||||
else:
|
|
||||||
on = LiveOn()
|
|
||||||
if all((live_off[i]["enabled"], live_off[i]["message"])):
|
|
||||||
off = LiveOff(**live_off[i])
|
|
||||||
else:
|
|
||||||
off = LiveOff()
|
|
||||||
if live_report[i]["enabled"]:
|
|
||||||
report = LiveReport(**live_report[i])
|
|
||||||
else:
|
|
||||||
report = LiveReport()
|
|
||||||
if all((dynamic_update[i]["enabled"], dynamic_update[i]["message"])):
|
|
||||||
update = DynamicUpdate(**dynamic_update[i])
|
|
||||||
else:
|
|
||||||
update = DynamicUpdate()
|
|
||||||
|
|
||||||
targets.append(
|
|
||||||
PushTarget(
|
|
||||||
id=target["num"],
|
|
||||||
type=target["type"],
|
|
||||||
live_on=on,
|
|
||||||
live_off=off,
|
|
||||||
live_report=report,
|
|
||||||
dynamic_update=update
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
ups.append(Up(uid=uid, targets=targets))
|
ups.append(Up(uid=uid, targets=targets))
|
||||||
|
|
||||||
@@ -366,3 +398,48 @@ class MySQLDataSource(DataSource):
|
|||||||
|
|
||||||
super().format_data()
|
super().format_data()
|
||||||
logger.success(f"成功从 MySQL 中导入了 {len(self.get_up_list())} 个 UP 主")
|
logger.success(f"成功从 MySQL 中导入了 {len(self.get_up_list())} 个 UP 主")
|
||||||
|
|
||||||
|
async def reload_targets(self, up: Union[int, Up]):
|
||||||
|
"""
|
||||||
|
重新从 MySQL 中读取特定 Up 的推送配置
|
||||||
|
|
||||||
|
Args:
|
||||||
|
up: 需要重载配置的 Up 实例或其 UID
|
||||||
|
"""
|
||||||
|
if isinstance(up, int):
|
||||||
|
try:
|
||||||
|
up = self.get_up(up)
|
||||||
|
except DataSourceException:
|
||||||
|
logger.warning(f"重载配置时出现异常, UID: {up} 不存在")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"开始从 MySQL 中重载 {up.uname} (UID: {up.uid}, 房间号: {up.room_id}) 的推送配置")
|
||||||
|
|
||||||
|
if not self.__pool:
|
||||||
|
await self.__connect()
|
||||||
|
|
||||||
|
up.targets = await self.__load_targets(up.uid)
|
||||||
|
|
||||||
|
super().format_data()
|
||||||
|
logger.success(f"已成功重载 {up.uname} (UID: {up.uid}, 房间号: {up.room_id}) 的推送配置")
|
||||||
|
|
||||||
|
async def load_new(self, uid: int):
|
||||||
|
"""
|
||||||
|
从 MySQL 中追加读取指定 UID 的用户
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid: 需要追加读取配置的 UID
|
||||||
|
"""
|
||||||
|
user = await self.__query(f"SELECT * FROM `bot` WHERE uid = {uid}")
|
||||||
|
if len(user) == 0:
|
||||||
|
logger.error(f"载入 UID: {uid} 的推送配置失败, UID 不存在")
|
||||||
|
raise DataSourceException(f"载入 UID: {uid} 的推送配置失败, UID 不存在")
|
||||||
|
|
||||||
|
bot = user[0].get("bot")
|
||||||
|
targets = await self.__load_targets(uid)
|
||||||
|
up = Up(uid=uid, targets=targets)
|
||||||
|
self.get_bot(bot).ups.append(up)
|
||||||
|
super().format_data()
|
||||||
|
logger.success(f"已成功载入 UID: {uid} 的推送配置")
|
||||||
|
|
||||||
|
await up.connect()
|
||||||
|
|||||||
@@ -155,6 +155,14 @@ class Up(BaseModel):
|
|||||||
else:
|
else:
|
||||||
logger.success(f"已成功连接到 {self.uname} 的直播间 {self.room_id}")
|
logger.success(f"已成功连接到 {self.uname} 的直播间 {self.room_id}")
|
||||||
|
|
||||||
|
if not await redis.exists_live_status(self.room_id):
|
||||||
|
room_info = await self.__live_room.get_room_play_info()
|
||||||
|
status = room_info["live_status"]
|
||||||
|
await redis.set_live_status(self.room_id, status)
|
||||||
|
if status == 1:
|
||||||
|
start_time = room_info["live_time"]
|
||||||
|
await redis.set_live_start_time(self.room_id, start_time)
|
||||||
|
|
||||||
self.__is_reconnect = True
|
self.__is_reconnect = True
|
||||||
|
|
||||||
@self.__room.on("LIVE")
|
@self.__room.on("LIVE")
|
||||||
|
|||||||
@@ -207,6 +207,10 @@ async def zunionstore(dest: str, source: Union[str, List[str]]):
|
|||||||
|
|
||||||
# 直播间状态,0:未开播,1:正在直播,2:轮播
|
# 直播间状态,0:未开播,1:正在直播,2:轮播
|
||||||
|
|
||||||
|
async def exists_live_status(room_id: int) -> bool:
|
||||||
|
return await hexists("LiveStatus", room_id)
|
||||||
|
|
||||||
|
|
||||||
async def get_live_status(room_id: int) -> int:
|
async def get_live_status(room_id: int) -> int:
|
||||||
return await hgeti("LiveStatus", room_id)
|
return await hgeti("LiveStatus", room_id)
|
||||||
|
|
||||||
@@ -217,6 +221,10 @@ async def set_live_status(room_id: int, status: int):
|
|||||||
|
|
||||||
# 直播开始时间
|
# 直播开始时间
|
||||||
|
|
||||||
|
async def exists_live_start_time(room_id: int) -> bool:
|
||||||
|
return await hexists("StartTime", room_id)
|
||||||
|
|
||||||
|
|
||||||
async def get_live_start_time(room_id: int) -> int:
|
async def get_live_start_time(room_id: int) -> int:
|
||||||
return await hgeti("StartTime", room_id)
|
return await hgeti("StartTime", room_id)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user