From c7187895c0b864f764327576ba76839f7f8c45ec Mon Sep 17 00:00:00 2001 From: LWR Date: Sat, 12 Aug 2023 11:06:18 +0800 Subject: [PATCH] feat: Backup live push support --- starbot/core/bot.py | 67 +++++++++++++++++++++++++++++++++------- starbot/core/live.py | 40 ++++++++++++++---------- starbot/core/room.py | 23 ++++++++++---- starbot/utils/config.py | 3 ++ starbot/utils/network.py | 9 +++++- starbot/utils/utils.py | 19 ++++++++++++ 6 files changed, 125 insertions(+), 36 deletions(-) diff --git a/starbot/core/bot.py b/starbot/core/bot.py index 3f5e409..bf5e603 100644 --- a/starbot/core/bot.py +++ b/starbot/core/bot.py @@ -19,7 +19,7 @@ from ..exception.DataSourceException import DataSourceException from ..exception.RedisException import RedisException from ..utils import redis, config from ..utils.network import request, get_session -from ..utils.utils import split_list, get_credential +from ..utils.utils import get_credential, get_live_info_by_uids class StarBot: @@ -50,10 +50,52 @@ class StarBot: self.__datasource = datasource Ariadne.options["StarBotDataSource"] = datasource + async def __backup_live_push(self): + infos_before = await get_live_info_by_uids(self.__datasource.get_uid_list()) + status_before = {} + for uid in infos_before: + status = infos_before[uid]["live_status"] + status_before[int(uid)] = status + + logger.success("备用直播推送模块已启动") + + while True: + await asyncio.sleep(10) + try: + infos_after = await get_live_info_by_uids(self.__datasource.get_uid_list()) + except Exception as ex: + logger.warning(f"备用直播推送模块数据抓取异常, 已忽略并继续 {ex}") + continue + for uid in infos_after: + now_status = infos_after[uid]["live_status"] + uid = int(uid) + if uid not in status_before: + status_before[uid] = now_status + continue + last_status = status_before[uid] + status_before[uid] = now_status + if now_status != last_status: + up = self.__datasource.get_up(uid) + if (not config.get("ONLY_CONNECT_NECESSARY_ROOM")) or up.is_need_connect(): + if now_status == 1: + # logger.warning(f"备用: {up.uname}({up.room_id}) 开播") + param = { + "data": { + "live_time": 0 + } + } + up.dispatch("LIVE", param) + if last_status == 1: + # logger.warning(f"备用: {up.uname}({up.room_id}) 下播") + param = {} + up.dispatch("PREPARING", param) + async def __main(self): """ StarBot 入口 """ + core_tasks = set() + logger.opt(colors=True, raw=True).info(f"{self.STARBOT_ASCII_LOGO}") if config.get("CHECK_VERSION"): try: @@ -129,14 +171,9 @@ class StarBot: return 5 # 通过 UID 列表批量获取信息 - info = {} - info_url = "https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=" - uids = [str(u) for u in self.__datasource.get_uid_list()] - uid_lists = split_list(uids, 100) - for lst in uid_lists: - info.update(await request("GET", info_url + "&uids[]=".join(lst))) - for uid in info: - base = info[uid] + infos = await get_live_info_by_uids(self.__datasource.get_uid_list()) + for uid in infos: + base = infos[uid] uid = int(uid) up = self.__datasource.get_up(uid) up.uname = base["uname"] @@ -171,12 +208,16 @@ class StarBot: except asyncio.exceptions.TimeoutError: logger.warning("等待连接所有直播间超时, 请检查是否存在未连接成功的直播间") + # 启动备用直播推送 + if config.get("BACKUP_LIVE_PUSH"): + core_tasks.add(asyncio.get_event_loop().create_task(self.__backup_live_push())) + # 启动动态推送模块 - asyncio.get_event_loop().create_task(dynamic_spider(self.__datasource)) + core_tasks.add(asyncio.get_event_loop().create_task(dynamic_spider(self.__datasource))) # 启动 HTTP API 服务 if config.get("USE_HTTP_API"): - asyncio.get_event_loop().create_task(http_init(self.__datasource)) + core_tasks.add(asyncio.get_event_loop().create_task(http_init(self.__datasource))) # 载入命令 logger.info("开始载入命令模块") @@ -229,7 +270,9 @@ class StarBot: except Exception as e: logger.exception(f"自动关注任务异常", e) - asyncio.create_task(auto_follow_task()) + follow_task = asyncio.create_task(auto_follow_task()) + core_tasks.add(follow_task) + follow_task.add_done_callback(lambda t: core_tasks.remove(t)) # 检测消息补发配置完整性 if config.get("BAN_RESEND") and config.get("MASTER_QQ") is None: diff --git a/starbot/core/live.py b/starbot/core/live.py index d48679e..9d9a61a 100644 --- a/starbot/core/live.py +++ b/starbot/core/live.py @@ -4,6 +4,7 @@ """ import asyncio +import base64 import json import struct import time @@ -21,7 +22,7 @@ from ..utils.AsyncEvent import AsyncEvent from ..utils.Credential import Credential from ..utils.Danmaku import Danmaku from ..utils.network import get_session, request -from ..utils.utils import get_api +from ..utils.utils import get_api, get_credential API = get_api("live") @@ -610,11 +611,12 @@ class LiveDanmaku(AsyncEvent): self.room_display_id = room_display_id self.live_time = 0 self.retry_after = retry_after + self.__uid = config.get("LOGIN_UID") self.__room_real_id = None self.__status = 0 self.__ws = None self.__tasks = [] - self.__heartbeat_timer = 30.0 + self.__heartbeat_timer = 60.0 self.err_reason = "" def get_status(self) -> int: @@ -670,6 +672,7 @@ class LiveDanmaku(AsyncEvent): # 获取真实房间号和开播时间 logger.debug(f"正在获取直播间 {self.room_display_id} 的真实房间号") info = await room.get_room_play_info() + self.__uid = info["uid"] self.__room_real_id = info["room_id"] self.live_time = info["live_time"] logger.debug(f"获取成功, 真实房间号: {self.__room_real_id}") @@ -711,14 +714,13 @@ class LiveDanmaku(AsyncEvent): logger.debug(f"正在尝试连接直播间 {self.room_display_id} 的主机: {uri}") try: - async with session.ws_connect(uri) as ws: + async with session.ws_connect(uri, headers={"User-Agent": "Mozilla/5.0"}) as ws: @self.on('VERIFICATION_SUCCESSFUL') async def on_verification_successful(data): """ 连接成功,新建心跳任务 """ - self.__tasks.append( - asyncio.create_task(self.__heartbeat(ws))) + self.__tasks.append(asyncio.create_task(self.__heartbeat(ws))) self.__ws = ws logger.debug(f"连接直播间 {self.room_display_id} 的主机成功, 准备发送认证信息") @@ -726,7 +728,7 @@ class LiveDanmaku(AsyncEvent): async for msg in ws: if msg.type == aiohttp.WSMsgType.BINARY: - logger.debug(f'收到直播间 {self.room_display_id} 的原始数据: {msg.data}') + # logger.debug(f'收到直播间 {self.room_display_id} 的原始数据: {msg.data}') await self.__handle_data(msg.data) elif msg.type == aiohttp.WSMsgType.ERROR: @@ -744,8 +746,7 @@ class LiveDanmaku(AsyncEvent): # 正常断开情况下跳出循环 if self.__status != self.STATUS_CLOSED or self.err_reason: # 非用户手动调用关闭,触发重连 - raise LiveException( - '非正常关闭连接' if not self.err_reason else self.err_reason) + raise LiveException('非正常关闭连接' if not self.err_reason else self.err_reason) else: break @@ -766,7 +767,7 @@ class LiveDanmaku(AsyncEvent): 处理数据 """ data = self.__unpack(data) - logger.debug(f"直播间 {self.room_display_id} 收到信息: {data}") + # logger.debug(f"直播间 {self.room_display_id} 收到信息: {data}") for info in data: callback_info = { @@ -786,7 +787,7 @@ class LiveDanmaku(AsyncEvent): elif info["datapack_type"] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE: # 心跳包反馈,返回直播间人气 - logger.debug(f"直播间 {self.room_display_id} 收到心跳包反馈") + # logger.debug(f"直播间 {self.room_display_id} 收到心跳包反馈") # 重置心跳计时器 self.__heartbeat_timer = 30.0 callback_info["type"] = 'VIEW' @@ -811,9 +812,9 @@ class LiveDanmaku(AsyncEvent): logger.warning(f"直播间 {self.room_display_id} 检测到未知的数据包类型, 无法处理") async def __send_verify_data(self, ws: ClientWebSocketResponse, token: str): - uid = config.get("LOGIN_UID") - verify_data = {"uid": uid, "roomid": self.__room_real_id, - "protover": 3, "platform": "web", "type": 2, "key": token} + # uid = config.get("LOGIN_UID") + verify_data = {"uid": self.__uid, "roomid": self.__room_real_id, "protover": 3, + "buvid": config.get("BUVID3"), "platform": "web", "type": 2, "key": token} data = json.dumps(verify_data).encode() await self.__send(data, self.PROTOCOL_VERSION_HEARTBEAT, self.DATAPACK_TYPE_VERIFY, ws) @@ -821,12 +822,17 @@ class LiveDanmaku(AsyncEvent): """ 定时发送心跳包 """ - heartbeat = self.__pack(b'[object Object]', - self.PROTOCOL_VERSION_HEARTBEAT, self.DATAPACK_TYPE_HEARTBEAT) + heartbeat = self.__pack(b'[object Object]', self.PROTOCOL_VERSION_HEARTBEAT, self.DATAPACK_TYPE_HEARTBEAT) + heartbeat_url = "https://live-trace.bilibili.com/xlive/rdata-interface/v1/heartbeat/webHeartBeat?pf=web&hb=" + hb = str(base64.b64encode(f"60|{self.room_display_id}|1|0".encode("utf-8")), "utf-8") while True: if self.__heartbeat_timer == 0: - logger.debug(f"直播间 {self.room_display_id} 发送心跳包") + # logger.debug(f"直播间 {self.room_display_id} 发送心跳包") await ws.send_bytes(heartbeat) + try: + await request("GET", heartbeat_url, {"hb": hb, "pf": "web"}, credential=get_credential()) + except Exception as ex: + pass elif self.__heartbeat_timer <= -30: # 视为已异常断开连接,发布 TIMEOUT 事件 self.dispatch('TIMEOUT') @@ -841,7 +847,7 @@ class LiveDanmaku(AsyncEvent): 自动打包并发送数据 """ data = self.__pack(data, protocol_version, datapack_type) - logger.debug(f'直播间 {self.room_display_id} 发送原始数据: {data}') + # logger.debug(f'直播间 {self.room_display_id} 发送原始数据: {data}') await ws.send_bytes(data) @staticmethod diff --git a/starbot/core/room.py b/starbot/core/room.py index 275ce74..55aa2e1 100644 --- a/starbot/core/room.py +++ b/starbot/core/room.py @@ -72,6 +72,9 @@ class Up(BaseModel): def status(self): return 6 if not self.__room else self.__room.get_status() + def dispatch(self, event, data): + self.__room.dispatch(event, data) + def inject_bot(self, bot): self.__bot = bot @@ -82,6 +85,9 @@ class Up(BaseModel): def is_connecting(self): return (self.__room is not None) and (self.__room.get_status() != 2) + def is_need_connect(self): + return any([self.__any_live_on_enabled(), self.__any_live_off_enabled(), self.__any_live_report_enabled()]) + def __any_live_on_enabled(self): return any(map(lambda conf: conf.enabled, map(lambda group: group.live_on, self.targets))) @@ -109,11 +115,9 @@ class Up(BaseModel): self.room_id = user_info["room_id"] # 开播推送开关和下播推送开关均处于关闭状态时跳过连接直播间,以节省性能 - if config.get("ONLY_CONNECT_NECESSARY_ROOM"): - if not any([self.__any_live_on_enabled(), self.__any_live_off_enabled(), - self.__any_live_report_enabled()]): - logger.warning(f"{self.uname} 的开播, 下播和直播报告开关均处于关闭状态, 跳过连接直播间") - return + if config.get("ONLY_CONNECT_NECESSARY_ROOM") and not self.is_need_connect(): + logger.warning(f"{self.uname} 的开播, 下播和直播报告开关均处于关闭状态, 跳过连接直播间") + return if self.__connecting: logger.warning(f"{self.uname} ( UID: {self.uid} ) 的直播间正在连接中, 跳过重复连接") @@ -144,7 +148,6 @@ class Up(BaseModel): now_status = room_info["live_status"] if now_status != last_status: - await redis.set_live_status(self.room_id, now_status) if now_status == 1: logger.warning(f"直播间 {self.room_id} 断线期间开播") param = { @@ -176,12 +179,16 @@ class Up(BaseModel): 开播事件 """ logger.debug(f"{self.uname} (LIVE): {event}") + # logger.warning(f"{self.uname}: 开播事件") locked = False room_info = {} # 是否为真正开播 if "live_time" in event["data"]: + if await redis.get_live_status(self.room_id) == 1: + return + await redis.set_live_status(self.room_id, 1) # 是否为主播网络波动断线重连 @@ -248,6 +255,10 @@ class Up(BaseModel): 下播事件 """ logger.debug(f"{self.uname} (PREPARING): {event}") + # logger.warning(f"{self.uname}: 下播事件") + + if await redis.get_live_status(self.room_id) == 0: + return await redis.set_live_status(self.room_id, 0) await redis.set_live_end_time(self.room_id, int(time.time())) diff --git a/starbot/utils/config.py b/starbot/utils/config.py index c7c37a2..bdaca77 100644 --- a/starbot/utils/config.py +++ b/starbot/utils/config.py @@ -40,6 +40,9 @@ DEFAULT_CONFIG = { "BILI_JCT": None, "BUVID3": None, + # 是否启用备用轮询式直播推送,建议仅在默认直播推送出现问题时启用 + "BACKUP_LIVE_PUSH": False, + # 是否自动关注打开了动态推送但没有关注的用户,推荐打开,否则无法获取未关注用户的动态更新信息 "AUTO_FOLLOW_OPENED_DYNAMIC_UPDATE_UP": True, diff --git a/starbot/utils/network.py b/starbot/utils/network.py index e0270f9..9a69df1 100644 --- a/starbot/utils/network.py +++ b/starbot/utils/network.py @@ -170,6 +170,9 @@ async def request(method: str, except ServerDisconnectedError: await asyncio.sleep(0.5) continue + except NetworkException: + await asyncio.sleep(0.5) + continue def get_session() -> aiohttp.ClientSession: @@ -182,7 +185,11 @@ def get_session() -> aiohttp.ClientSession: loop = asyncio.get_running_loop() session = __session_pool.get(loop, None) if session is None: - session = aiohttp.ClientSession(loop=loop, connector=TCPConnector(loop=loop, limit=0)) + session = aiohttp.ClientSession( + headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36 Core/1.94.201.400 QQBrowser/11.9.5325.400" + }, loop=loop, connector=TCPConnector(loop=loop, limit=0, verify_ssl=False) + ) __session_pool[loop] = session return session diff --git a/starbot/utils/utils.py b/starbot/utils/utils.py index 9477139..38cc9ed 100644 --- a/starbot/utils/utils.py +++ b/starbot/utils/utils.py @@ -151,6 +151,25 @@ def mask_rounded_rectangle(img: Image.Image, radius: int = 10) -> Image.Image: return img +async def get_live_info_by_uids(uids: List[int]) -> Dict[str, Any]: + """ + 根据 UID 列表批量获取直播间信息 + + Args: + uids: UID 列表 + + Returns: + 直播间信息 + """ + infos = {} + info_url = "https://api.live.bilibili.com/room/v1/Room/get_status_info_by_uids?uids[]=" + uids = [str(u) for u in uids] + uid_lists = split_list(uids, 100) + for lst in uid_lists: + infos.update(await request("GET", info_url + "&uids[]=".join(lst))) + return infos + + async def get_unames_and_faces_by_uids(uids: List[str]) -> Tuple[List[str], List[Image.Image]]: """ 根据 UID 列表批量获取昵称和头像图片