import time import numpy as np from typing import Dict, TypedDict from apscheduler.schedulers.background import BackgroundScheduler from bilibili_api import Credential, Danmaku, sync from bilibili_api.live import LiveDanmaku, LiveRoom import config rng = np.random.default_rng() ROOMID = config.ROOMID # 凭证 credential = Credential( sessdata=config.SESSDATA, bili_jct=config.BILI_JCT ) # 监听直播间弹幕 monitor = LiveDanmaku(ROOMID, credential=credential) # 用来发送弹幕 sender = LiveRoom(ROOMID, credential=credential) # 发送用 UID UID = 1153907392 class Gift(TypedDict): '''用户赠送礼物列表 username: 用户名 last_gift_time: 最后一次赠送礼物时时间戳 gift_list: 赠送的礼物字典,格式 礼物名: 数量''' username: str last_gift_time: int gift_list: Dict[str, int] user_list: Dict[int, Gift] = dict() sched = BackgroundScheduler(timezone="Asia/Shanghai") # 定时任务 @monitor.on("DANMU_MSG") async def recv(event): # 发送者UID uid = event["data"]["info"][2][0] # 排除自己发送的弹幕 if uid == UID: return # 弹幕文本 msg = event["data"]["info"][1] if msg == "ping": await sender.send_danmaku(Danmaku("pang!")) @monitor.on("SEND_GIFT") async def on_gift(event): info = event['data']['data'] # print(info) uid = info['uid'] user = user_list.get(uid) if user: # 如果用户列表中有该用户 则更新他的礼物字典以及礼物时间戳 num = user['gift_list'].get(info['giftName'], 0) user['gift_list'][info['giftName']] = num + info['num'] user['last_gift_time'] = int(time.time()) print(f"update {uid}") else: # 不存在则以现在时间及礼物新建 Gift 对象 user_list[uid] = Gift( username=info['uname'], last_gift_time=int(time.time()), gift_list={info['giftName']: info['num']} ) print(f"new {uid}") # 开启一个监控 sched.add_job(check, 'interval', seconds=1, args=[uid], id=str(uid)) print(f"start monitoring {uid}") @monitor.on("PRAPERING") async def on_preparing(event): await sender.send_danmaku(Danmaku("欢迎魚魚准备上班,搬好小板凳了喵~ヽ(*・ω・)ノ ")) @monitor.on("LIVE") async def on_live_start(event): beijing_tz = timezone(timedelta(hours=8)) now = datetime.now(beijing_tz) current_hour = now.hour choices = ["直播开始啦,快来看魚宝!", "欢迎魚宝进入直播间!", "上班啦,魚宝今日顺利!"] if 11 <= current_hour < 14: choices.append("魚宝中午好,吃饭了喵?") elif 18 <= current_hour < 21: choices.append("魚宝晚上好,记得早点下班睡觉!") danmu = rng.choice(choices) await sender.send_danmaku(Danmaku(danmu)) def check(uid: int): '判断是否超过阈值并输出' user = user_list.get(uid) if user: if int(time.time()) - user.get('last_gift_time', 0) > 5: # 5 秒超时开始回复 sched.remove_job(str(uid)) # 移除该监控任务 print(f"watch {uid} end") # print(user_list.pop(uid)) # 释放用户 whatever = user_list.pop(uid) danmu = '感谢' + whatever['username'] + '赠送的' + '、'.join([str(v) + '个' + k for k, v in whatever['gift_list'].items()]) + '!' sync(sender.send_danmaku(Danmaku(danmu))) else: print(f"keep watching {uid}...") else: print(f"cannot find {uid}") if __name__ == '__main__': sched.start() # 启动定时任务 sync(monitor.connect()) # 连接直播间