diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..18ef8b9 --- /dev/null +++ b/__init__.py @@ -0,0 +1,514 @@ +from nonebot import on_command, get_driver +from nonebot.adapters.onebot.v11 import Bot, MessageSegment, MessageEvent, Event +from nonebot.plugin import PluginMetadata +from nonebot import logger +from fastapi import APIRouter, Request + +import httpx +import io +import asyncio +from PIL import Image, ImageDraw, ImageFont, ImageFilter +from playwright.async_api import async_playwright + +# -------------------- 插件元信息 -------------------- +__plugin_meta__ = PluginMetadata( + name="maimai helper", + description="神秘功能", + usage="见 /maihelp", + extra={ + "version": "5.2", + "author": "xiaoxin" + } +) + +# -------------------- 全局配置 -------------------- +API_BASE = "" +HELP_BG_API = "" +STATUS_URL_XIN = "" +STATUS_URL_SBGA = "" + +# -------------------- 帮助 -------------------- +async def fetch_background_bytes(retries: int = 3, timeout: int = 10) -> bytes | None: + headers = {"User-Agent": "Mozilla/5.0 (MaimaiBot/5.2)"} + for _ in range(retries): + try: + async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: + r = await client.get(HELP_BG_API, headers=headers) + if r.status_code == 200 and "image" in r.headers.get("content-type", ""): + return r.content + except Exception: + await asyncio.sleep(0.3) + return None + +# -------------------- 字体加载 -------------------- +def load_font(title_size=38, text_size=24): + paths = [ + "./fonts/msyh.ttc", + "msyh.ttc", + "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", + "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.otf", + "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + ] + for p in paths: + try: + title = ImageFont.truetype(p, title_size) + text = ImageFont.truetype(p, text_size) + return title, text + except Exception: + continue + return ImageFont.load_default(), ImageFont.load_default() + +# -------------------- 帮助图渲染 -------------------- +def render_help_image(bg_bytes: bytes | None) -> bytes: + W, H = 1920, 1080 + + # 背景 + if bg_bytes: + try: + bg = Image.open(io.BytesIO(bg_bytes)).convert("RGBA") + ratio = bg.width / bg.height + target = W / H + if ratio > target: + new_h = H + new_w = int(new_h * ratio) + else: + new_w = W + new_h = int(new_w / ratio) + bg = bg.resize((new_w, new_h), Image.LANCZOS) + left = max((new_w - W) // 2, 0) + top = max((new_h - H) // 2, 0) + bg = bg.crop((left, top, left + W, top + H)) + except: + bg = Image.new("RGBA", (W, H), (50, 50, 60, 255)) + else: + bg = Image.new("RGBA", (W, H), (50, 50, 60, 255)) + + draw = ImageDraw.Draw(bg) + title_font, text_font = load_font() + + # 数据:两个列布局 + sections_left = [ + ("账号相关", [ + "/bind <二维码字符串> 绑定舞萌状态", + "/info 查看简略信息", + "/user 查看账号信息", + "/fbind 绑定水鱼 token", + "/funbind 解绑水鱼 token", + ]), + ("状态", [ + "/网站状态 网站状态", + "/舞萌状态 舞萌状态", + ]) + ] + + sections_right = [ + ("成绩相关", [ + "/up 上传成绩到水鱼", + "/del 删除成绩", + "/upload 上传成绩", + ]), + ("其他", [ + "/unban <时间> 解黑屋(测试)", + "/lock <类型> 解锁物品", + ]) + ] + + # ====== 绘制标题 ====== + title = "乌蒙DX神秘功能帮助" + bbox = draw.textbbox((0, 0), title, font=title_font) + tw = bbox[2] - bbox[0] + th = bbox[3] - bbox[1] + draw.text(((W - tw) // 2, 80), title, font=title_font, fill=(0, 0, 0)) + + # 左右列区域 + left_x = 160 + right_x = W // 2 + 60 + start_y = 200 + line_gap = 65 + block_gap = 80 + + # ====== 绘制列函数 ====== + def draw_column(sections, x, y): + for sec_title, items in sections: + # 标题(加粗效果:描边) + draw.text((x - 2, y), sec_title, font=text_font, fill=(0, 0, 0)) + draw.text((x + 2, y), sec_title, font=text_font, fill=(0, 0, 0)) + draw.text((x, y - 2), sec_title, font=text_font, fill=(0, 0, 0)) + draw.text((x, y + 2), sec_title, font=text_font, fill=(0, 0, 0)) + draw.text((x, y), sec_title, font=text_font, fill=(0, 0, 0)) + y += line_gap + + # 内容行 + for line in items: + draw.text((x + 40, y), line, font=text_font, fill=(0, 0, 0)) + y += line_gap + + y += block_gap + return y + + # 绘制左右列 + draw_column(sections_left, left_x, start_y) + draw_column(sections_right, right_x, start_y) + + # 输出 + buf = io.BytesIO() + bg.convert("RGB").save(buf, "PNG") + buf.seek(0) + return buf.getvalue() + +# -------------------- HELP 指令 -------------------- +help_cmd = on_command("maihelp", aliases={"help", "/help"}, priority=5, block=True) + +@help_cmd.handle() +async def _help(event: MessageEvent): + try: + bg = await fetch_background_bytes() + img = render_help_image(bg) + await help_cmd.send(MessageSegment.image(img)) + except Exception as e: + await help_cmd.send(f"生成帮助图失败:{e}") + + +async def call_api(name: str, qq: str, params=None, timeout=20): + if params is None: + params = {} + params["user_qq"] = qq + + url = f"{API_BASE}/{name}" + + # ========= 日志(请求前) ========= + logger.info(f"[API-REQ] → {name}") + logger.info(f"[API-REQ] URL: {url}") + logger.info(f"[API-REQ] Params: {params}") + + try: + async with httpx.AsyncClient(timeout=timeout, verify=False) as client: + r = await client.post(url, json=params) + + # ========= 日志(响应) ========= + logger.info(f"[API-RESP] {name} status={r.status_code}") + logger.info(f"[API-RESP] Raw: {r.text}") + + data = r.json() + return data + + except Exception as e: + logger.error(f"[API-ERR] {name} 调用异常: {e}") + return {"success": False, "message": f"服务器连接失败:{e}"} + + +# -------------------- info -------------------- +info_cmd = on_command("/info", priority=5, block=True) + +@info_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + r = await call_api("info", qq) + if not r.get("success"): + await info_cmd.send(r.get("message", "未知错误")) + return + ui = r.get("user_info") + if isinstance(ui, dict): + lines = [f"{k}: {v}" for k, v in ui.items()] + await info_cmd.send("\n".join(lines)) + else: + await info_cmd.send(str(ui)) + +# -------------------- 上传成绩 -------------------- +upload_cmd = on_command("/up", priority=5, block=True) + +@upload_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + r = await call_api("up", qq, {}, timeout=120) + await upload_cmd.send(r.get("message", "返回异常")) + +# -------------------- fbind / funbind -------------------- +fbind_cmd = on_command("/fbind", priority=5, block=True) + +@fbind_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + parts = str(event.get_message()).split() + if len(parts) < 2: + await fbind_cmd.send("用法:/fbind ") + return + token = parts[1] + r = await call_api("fbind", qq, {"token": token}) + await fbind_cmd.send(r.get("message", "返回异常")) + +funbind_cmd = on_command("/funbind", priority=5, block=True) + +@funbind_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + r = await call_api("funbind", qq) + await funbind_cmd.send(r.get("message", "返回异常")) + +# -------------------- del 删除成绩 -------------------- +del_cmd = on_command("/del", priority=5, block=True) + +@del_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + p = str(event.get_message()).split() + if len(p) < 2: + await del_cmd.send("用法:/del ") + return + # 如果需要 level_id,可后续扩展 + r = await call_api("delete", qq, {"music_id": p[1]}) + await del_cmd.send(r.get("message", "返回异常")) + +# -------------------- lock 解锁 -------------------- +lock_cmd = on_command("/lock", priority=5, block=True) + +@lock_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + p = str(event.get_message()).split() + if len(p) < 3: + await lock_cmd.send("用法:/lock <类型> ") + return + r = await call_api("lock", qq, {"item_type": p[1], "item_id": p[2]}) + await lock_cmd.send(r.get("message", "返回异常")) + +# -------------------- unban -------------------- +unban_cmd = on_command("/黑屋", priority=5, block=True) + +@unban_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + p = str(event.get_message()).split() + if len(p) < 2: + await unban_cmd.send("用法:/unban <时间>(例如 16:30)") + return + r = await call_api("mai_unban", qq, {"login_time": p[1]}) + await unban_cmd.send(r.get("message", "开始解黑屋失败")) + +# -------------------- account (list / switch) -------------------- +acc_cmd = on_command("/acc", priority=5, block=True) + +@acc_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + p = str(event.get_message()).split() + if len(p) < 2: + await acc_cmd.send("用法:/acc list | switch ") + return + action = p[1] + if action == "list": + r = await call_api("account", qq, {"action": "list"}) + if not r.get("success"): + await acc_cmd.send(r.get("message", "查询失败")) + return + + accounts = r.get("accounts") or r.get("result", {}).get("accounts") + active = r.get("active") + + if not accounts: + await acc_cmd.send("未绑定任何账号") + return + + lines = [] + for i, a in enumerate(accounts): + username = a.get("username") or a.get("name") or "" + user_id = a.get("user_id") + is_active = (str(user_id) == str(active)) if active is not None else False + + if is_active: + lines.append(f"*{i+1}.{username}") + else: + lines.append(f"{i+1}.{username}") + + await acc_cmd.send("\n".join(lines)) + elif action == "switch": + if len(p) < 3 or not p[2].isdigit(): + await acc_cmd.send("用法:/acc switch <编号>") + return + idx = int(p[2]) + r = await call_api("account", qq, {"action": "switch", "index": idx}) + await acc_cmd.send(r.get("message", "切换返回异常")) + else: + await acc_cmd.send("未知操作,请使用 list 或 switch") + + # -------------------- user -------------------- +user_cmd = on_command("/user", priority=5, block=True) + +@user_cmd.handle() +async def _(event: MessageEvent): + qq = event.get_user_id() + r = await call_api("user", qq) + + if not r.get("success"): + await user_cmd.send(r.get("message", "查询失败")) + return + + data = r.get("user_info") + + if isinstance(data, dict): + msg = "\n".join([f"{k}: {v}" for k, v in data.items()]) + else: + msg = str(data) + + await user_cmd.send(msg) + +sp_cmd = on_command("/sp", priority=5, block=True) + +@sp_cmd.handle() +async def _(event: MessageEvent): + qq = event.user_id + parts = str(event.get_message()).split() + + superusers = get_driver().config.superusers or [] + is_true_admin = str(qq) in superusers + + # ========== 1. 查询自己 ========== + if len(parts) == 1: + r = await call_api("permission", qq, {"action": "get"}) + if not r.get("success"): + await sp_cmd.send(r.get("message")) + return + + perms = r.get("permissions") or {} + enabled = [k for k, v in perms.items() if v == 1] + + if not enabled: + await sp_cmd.send("你没有任何权限") + else: + await sp_cmd.send("拥有以下权限:\n" + "\n".join(enabled)) + return + + # ========== 2. 查询别人 ========== + if len(parts) == 2 and parts[1].isdigit(): + target = parts[1] + r = await call_api("permission", qq, {"action": "get", "user_qq": target}) + + if not r.get("success"): + await sp_cmd.send(r.get("message")) + return + + perms = r.get("permissions") or {} + enabled = [k for k, v in perms.items() if v == 1] + + if not enabled: + await sp_cmd.send(f"{target} 没有任何权限") + else: + await sp_cmd.send(f"{target} 拥有以下权限:\n" + "\n".join(enabled)) + return + + # ========== 3. 修改权限 ========== + if len(parts) == 4: + op = parts[1] + perm = parts[2] + target = parts[3] + + if not is_true_admin: + await sp_cmd.send("❌ 只有 Bot 超级管理员可以修改权限") + return + + if op not in ("add", "del"): + await sp_cmd.send("用法:/sp add|del <权限> ") + return + + if perm.lower() == "all" and op == "add": + r = await call_api("permission", qq, { + "action": "set", + "user_qq": target, + "perms": {"all": 1} + }) + await sp_cmd.send("已赋予 all 权限" if r.get("success") else r.get("message")) + return + + if perm not in ("upload", "up", "del", "unlock", "unban", "charge", "all"): + await sp_cmd.send("权限名无效") + return + + value = 1 if op == "add" else 0 + r = await call_api("permission", qq, { + "action": "set", + "user_qq": target, + "perms": {perm: value} + }) + + await sp_cmd.send("操作成功" if r.get("success") else r.get("message")) + return + + await sp_cmd.send("用法:/sp | /sp | /sp add|del <权限> | /sp add all ") + +# -------------------- 状态 -------------------- +site_status_cmd = on_command("网站状态", aliases={"/网站状态"}, priority=5, block=True) + +@site_status_cmd.handle() +async def _(event: Event): + try: + async with async_playwright() as p: + browser = await p.chromium.launch() + page = await browser.new_page() + await page.set_viewport_size({"width": 1280, "height": 720}) + await page.goto(STATUS_URL_XIN, wait_until="networkidle") + await page.wait_for_selector("#app", timeout=10000) + full_h = await page.evaluate("document.body.scrollHeight") + await page.set_viewport_size({"width": 1280, "height": full_h}) + img = await page.screenshot(full_page=True) + await browser.close() + await site_status_cmd.send(MessageSegment.image(img)) + except Exception as e: + await site_status_cmd.send(f"获取网站状态失败:{e}\n{traceback.format_exc()}") + +mai_status_cmd = on_command("舞萌状态", aliases={"/舞萌状态"}, priority=5, block=True) + +@mai_status_cmd.handle() +async def _(event: Event): + try: + async with async_playwright() as p: + browser = await p.chromium.launch() + page = await browser.new_page() + await page.set_viewport_size({"width": 1280, "height": 720}) + await page.goto(STATUS_URL_SBGA, wait_until="networkidle") + await page.wait_for_selector("#app", timeout=10000) + full_h = await page.evaluate("document.body.scrollHeight") + await page.set_viewport_size({"width": 1280, "height": full_h}) + img = await page.screenshot(full_page=True) + await browser.close() + await mai_status_cmd.send(MessageSegment.image(img)) + except Exception as e: + await mai_status_cmd.send(f"获取舞萌状态失败:{e}\n{traceback.format_exc()}") + +# -------------------- 回调路由 -------------------- +driver = get_driver() +app = driver.server_app +router = APIRouter() + +@router.post("/maimai/callback") +async def callback_endpoint(req: Request): + try: + data = await req.json() + except Exception: + return {"success": False, "message": "无法解析 JSON"} + + user_qq = data.get("user_qq") + message = data.get("message") + channel = data.get("channel", "private") + group_id = data.get("group_id") + + if not user_qq or not message: + return {"success": False, "message": "缺少 user_qq 或 message"} + + bots = nonebot.get_bots().values() + sent_any = False + for b in bots: + try: + if channel == "group" and group_id: + await b.call_api("send_group_msg", {"group_id": int(group_id), "message": message}) + else: + await b.call_api("send_private_msg", {"user_id": str(user_qq), "message": message}) + sent_any = True + except Exception: + continue + + if not sent_any: + return {"success": False, "message": "未能向任何机器人发送消息"} + + return {"success": True} + +app.include_router(router)