上传文件至 /

nya~
This commit is contained in:
2025-11-19 16:20:27 +08:00
parent 13cb2a5572
commit e860292754

514
__init__.py Normal file
View File

@@ -0,0 +1,514 @@
from nonebot import on_command, get_driver
from nonebot.adapters.onebot.v11 import Bot, MessageSegment, MessageEvent, Event
from nonebot.plugin import PluginMetadata
from nonebot import logger
from fastapi import APIRouter, Request
import httpx
import io
import asyncio
from PIL import Image, ImageDraw, ImageFont, ImageFilter
from playwright.async_api import async_playwright
# -------------------- 插件元信息 --------------------
__plugin_meta__ = PluginMetadata(
name="maimai helper",
description="神秘功能",
usage="见 /maihelp",
extra={
"version": "5.2",
"author": "xiaoxin"
}
)
# -------------------- 全局配置 --------------------
API_BASE = ""
HELP_BG_API = ""
STATUS_URL_XIN = ""
STATUS_URL_SBGA = ""
# -------------------- 帮助 --------------------
async def fetch_background_bytes(retries: int = 3, timeout: int = 10) -> bytes | None:
headers = {"User-Agent": "Mozilla/5.0 (MaimaiBot/5.2)"}
for _ in range(retries):
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
r = await client.get(HELP_BG_API, headers=headers)
if r.status_code == 200 and "image" in r.headers.get("content-type", ""):
return r.content
except Exception:
await asyncio.sleep(0.3)
return None
# -------------------- 字体加载 --------------------
def load_font(title_size=38, text_size=24):
paths = [
"./fonts/msyh.ttc",
"msyh.ttc",
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.otf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
]
for p in paths:
try:
title = ImageFont.truetype(p, title_size)
text = ImageFont.truetype(p, text_size)
return title, text
except Exception:
continue
return ImageFont.load_default(), ImageFont.load_default()
# -------------------- 帮助图渲染 --------------------
def render_help_image(bg_bytes: bytes | None) -> bytes:
W, H = 1920, 1080
# 背景
if bg_bytes:
try:
bg = Image.open(io.BytesIO(bg_bytes)).convert("RGBA")
ratio = bg.width / bg.height
target = W / H
if ratio > target:
new_h = H
new_w = int(new_h * ratio)
else:
new_w = W
new_h = int(new_w / ratio)
bg = bg.resize((new_w, new_h), Image.LANCZOS)
left = max((new_w - W) // 2, 0)
top = max((new_h - H) // 2, 0)
bg = bg.crop((left, top, left + W, top + H))
except:
bg = Image.new("RGBA", (W, H), (50, 50, 60, 255))
else:
bg = Image.new("RGBA", (W, H), (50, 50, 60, 255))
draw = ImageDraw.Draw(bg)
title_font, text_font = load_font()
# 数据:两个列布局
sections_left = [
("账号相关", [
"/bind <二维码字符串> 绑定舞萌状态",
"/info 查看简略信息",
"/user 查看账号信息",
"/fbind <token> 绑定水鱼 token",
"/funbind 解绑水鱼 token",
]),
("状态", [
"/网站状态 网站状态",
"/舞萌状态 舞萌状态",
])
]
sections_right = [
("成绩相关", [
"/up 上传成绩到水鱼",
"/del <musicID> 删除成绩",
"/upload 上传成绩",
]),
("其他", [
"/unban <时间> 解黑屋(测试)",
"/lock <类型> <itemID> 解锁物品",
])
]
# ====== 绘制标题 ======
title = "乌蒙DX神秘功能帮助"
bbox = draw.textbbox((0, 0), title, font=title_font)
tw = bbox[2] - bbox[0]
th = bbox[3] - bbox[1]
draw.text(((W - tw) // 2, 80), title, font=title_font, fill=(0, 0, 0))
# 左右列区域
left_x = 160
right_x = W // 2 + 60
start_y = 200
line_gap = 65
block_gap = 80
# ====== 绘制列函数 ======
def draw_column(sections, x, y):
for sec_title, items in sections:
# 标题(加粗效果:描边)
draw.text((x - 2, y), sec_title, font=text_font, fill=(0, 0, 0))
draw.text((x + 2, y), sec_title, font=text_font, fill=(0, 0, 0))
draw.text((x, y - 2), sec_title, font=text_font, fill=(0, 0, 0))
draw.text((x, y + 2), sec_title, font=text_font, fill=(0, 0, 0))
draw.text((x, y), sec_title, font=text_font, fill=(0, 0, 0))
y += line_gap
# 内容行
for line in items:
draw.text((x + 40, y), line, font=text_font, fill=(0, 0, 0))
y += line_gap
y += block_gap
return y
# 绘制左右列
draw_column(sections_left, left_x, start_y)
draw_column(sections_right, right_x, start_y)
# 输出
buf = io.BytesIO()
bg.convert("RGB").save(buf, "PNG")
buf.seek(0)
return buf.getvalue()
# -------------------- HELP 指令 --------------------
help_cmd = on_command("maihelp", aliases={"help", "/help"}, priority=5, block=True)
@help_cmd.handle()
async def _help(event: MessageEvent):
try:
bg = await fetch_background_bytes()
img = render_help_image(bg)
await help_cmd.send(MessageSegment.image(img))
except Exception as e:
await help_cmd.send(f"生成帮助图失败:{e}")
async def call_api(name: str, qq: str, params=None, timeout=20):
if params is None:
params = {}
params["user_qq"] = qq
url = f"{API_BASE}/{name}"
# ========= 日志(请求前) =========
logger.info(f"[API-REQ] → {name}")
logger.info(f"[API-REQ] URL: {url}")
logger.info(f"[API-REQ] Params: {params}")
try:
async with httpx.AsyncClient(timeout=timeout, verify=False) as client:
r = await client.post(url, json=params)
# ========= 日志(响应) =========
logger.info(f"[API-RESP] {name} status={r.status_code}")
logger.info(f"[API-RESP] Raw: {r.text}")
data = r.json()
return data
except Exception as e:
logger.error(f"[API-ERR] {name} 调用异常: {e}")
return {"success": False, "message": f"服务器连接失败:{e}"}
# -------------------- info --------------------
info_cmd = on_command("/info", priority=5, block=True)
@info_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
r = await call_api("info", qq)
if not r.get("success"):
await info_cmd.send(r.get("message", "未知错误"))
return
ui = r.get("user_info")
if isinstance(ui, dict):
lines = [f"{k}: {v}" for k, v in ui.items()]
await info_cmd.send("\n".join(lines))
else:
await info_cmd.send(str(ui))
# -------------------- 上传成绩 --------------------
upload_cmd = on_command("/up", priority=5, block=True)
@upload_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
r = await call_api("up", qq, {}, timeout=120)
await upload_cmd.send(r.get("message", "返回异常"))
# -------------------- fbind / funbind --------------------
fbind_cmd = on_command("/fbind", priority=5, block=True)
@fbind_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
parts = str(event.get_message()).split()
if len(parts) < 2:
await fbind_cmd.send("用法:/fbind <token>")
return
token = parts[1]
r = await call_api("fbind", qq, {"token": token})
await fbind_cmd.send(r.get("message", "返回异常"))
funbind_cmd = on_command("/funbind", priority=5, block=True)
@funbind_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
r = await call_api("funbind", qq)
await funbind_cmd.send(r.get("message", "返回异常"))
# -------------------- del 删除成绩 --------------------
del_cmd = on_command("/del", priority=5, block=True)
@del_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
p = str(event.get_message()).split()
if len(p) < 2:
await del_cmd.send("用法:/del <musicID>")
return
# 如果需要 level_id可后续扩展
r = await call_api("delete", qq, {"music_id": p[1]})
await del_cmd.send(r.get("message", "返回异常"))
# -------------------- lock 解锁 --------------------
lock_cmd = on_command("/lock", priority=5, block=True)
@lock_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
p = str(event.get_message()).split()
if len(p) < 3:
await lock_cmd.send("用法:/lock <类型> <itemID>")
return
r = await call_api("lock", qq, {"item_type": p[1], "item_id": p[2]})
await lock_cmd.send(r.get("message", "返回异常"))
# -------------------- unban --------------------
unban_cmd = on_command("/黑屋", priority=5, block=True)
@unban_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
p = str(event.get_message()).split()
if len(p) < 2:
await unban_cmd.send("用法:/unban <时间>(例如 16:30")
return
r = await call_api("mai_unban", qq, {"login_time": p[1]})
await unban_cmd.send(r.get("message", "开始解黑屋失败"))
# -------------------- account (list / switch) --------------------
acc_cmd = on_command("/acc", priority=5, block=True)
@acc_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
p = str(event.get_message()).split()
if len(p) < 2:
await acc_cmd.send("用法:/acc list | switch <index>")
return
action = p[1]
if action == "list":
r = await call_api("account", qq, {"action": "list"})
if not r.get("success"):
await acc_cmd.send(r.get("message", "查询失败"))
return
accounts = r.get("accounts") or r.get("result", {}).get("accounts")
active = r.get("active")
if not accounts:
await acc_cmd.send("未绑定任何账号")
return
lines = []
for i, a in enumerate(accounts):
username = a.get("username") or a.get("name") or ""
user_id = a.get("user_id")
is_active = (str(user_id) == str(active)) if active is not None else False
if is_active:
lines.append(f"*{i+1}.{username}")
else:
lines.append(f"{i+1}.{username}")
await acc_cmd.send("\n".join(lines))
elif action == "switch":
if len(p) < 3 or not p[2].isdigit():
await acc_cmd.send("用法:/acc switch <编号>")
return
idx = int(p[2])
r = await call_api("account", qq, {"action": "switch", "index": idx})
await acc_cmd.send(r.get("message", "切换返回异常"))
else:
await acc_cmd.send("未知操作,请使用 list 或 switch")
# -------------------- user --------------------
user_cmd = on_command("/user", priority=5, block=True)
@user_cmd.handle()
async def _(event: MessageEvent):
qq = event.get_user_id()
r = await call_api("user", qq)
if not r.get("success"):
await user_cmd.send(r.get("message", "查询失败"))
return
data = r.get("user_info")
if isinstance(data, dict):
msg = "\n".join([f"{k}: {v}" for k, v in data.items()])
else:
msg = str(data)
await user_cmd.send(msg)
sp_cmd = on_command("/sp", priority=5, block=True)
@sp_cmd.handle()
async def _(event: MessageEvent):
qq = event.user_id
parts = str(event.get_message()).split()
superusers = get_driver().config.superusers or []
is_true_admin = str(qq) in superusers
# ========== 1. 查询自己 ==========
if len(parts) == 1:
r = await call_api("permission", qq, {"action": "get"})
if not r.get("success"):
await sp_cmd.send(r.get("message"))
return
perms = r.get("permissions") or {}
enabled = [k for k, v in perms.items() if v == 1]
if not enabled:
await sp_cmd.send("你没有任何权限")
else:
await sp_cmd.send("拥有以下权限:\n" + "\n".join(enabled))
return
# ========== 2. 查询别人 ==========
if len(parts) == 2 and parts[1].isdigit():
target = parts[1]
r = await call_api("permission", qq, {"action": "get", "user_qq": target})
if not r.get("success"):
await sp_cmd.send(r.get("message"))
return
perms = r.get("permissions") or {}
enabled = [k for k, v in perms.items() if v == 1]
if not enabled:
await sp_cmd.send(f"{target} 没有任何权限")
else:
await sp_cmd.send(f"{target} 拥有以下权限:\n" + "\n".join(enabled))
return
# ========== 3. 修改权限 ==========
if len(parts) == 4:
op = parts[1]
perm = parts[2]
target = parts[3]
if not is_true_admin:
await sp_cmd.send("❌ 只有 Bot 超级管理员可以修改权限")
return
if op not in ("add", "del"):
await sp_cmd.send("用法:/sp add|del <权限> <QQ>")
return
if perm.lower() == "all" and op == "add":
r = await call_api("permission", qq, {
"action": "set",
"user_qq": target,
"perms": {"all": 1}
})
await sp_cmd.send("已赋予 all 权限" if r.get("success") else r.get("message"))
return
if perm not in ("upload", "up", "del", "unlock", "unban", "charge", "all"):
await sp_cmd.send("权限名无效")
return
value = 1 if op == "add" else 0
r = await call_api("permission", qq, {
"action": "set",
"user_qq": target,
"perms": {perm: value}
})
await sp_cmd.send("操作成功" if r.get("success") else r.get("message"))
return
await sp_cmd.send("用法:/sp | /sp <QQ> | /sp add|del <权限> <QQ> | /sp add all <QQ>")
# -------------------- 状态 --------------------
site_status_cmd = on_command("网站状态", aliases={"/网站状态"}, priority=5, block=True)
@site_status_cmd.handle()
async def _(event: Event):
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.set_viewport_size({"width": 1280, "height": 720})
await page.goto(STATUS_URL_XIN, wait_until="networkidle")
await page.wait_for_selector("#app", timeout=10000)
full_h = await page.evaluate("document.body.scrollHeight")
await page.set_viewport_size({"width": 1280, "height": full_h})
img = await page.screenshot(full_page=True)
await browser.close()
await site_status_cmd.send(MessageSegment.image(img))
except Exception as e:
await site_status_cmd.send(f"获取网站状态失败:{e}\n{traceback.format_exc()}")
mai_status_cmd = on_command("舞萌状态", aliases={"/舞萌状态"}, priority=5, block=True)
@mai_status_cmd.handle()
async def _(event: Event):
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.set_viewport_size({"width": 1280, "height": 720})
await page.goto(STATUS_URL_SBGA, wait_until="networkidle")
await page.wait_for_selector("#app", timeout=10000)
full_h = await page.evaluate("document.body.scrollHeight")
await page.set_viewport_size({"width": 1280, "height": full_h})
img = await page.screenshot(full_page=True)
await browser.close()
await mai_status_cmd.send(MessageSegment.image(img))
except Exception as e:
await mai_status_cmd.send(f"获取舞萌状态失败:{e}\n{traceback.format_exc()}")
# -------------------- 回调路由 --------------------
driver = get_driver()
app = driver.server_app
router = APIRouter()
@router.post("/maimai/callback")
async def callback_endpoint(req: Request):
try:
data = await req.json()
except Exception:
return {"success": False, "message": "无法解析 JSON"}
user_qq = data.get("user_qq")
message = data.get("message")
channel = data.get("channel", "private")
group_id = data.get("group_id")
if not user_qq or not message:
return {"success": False, "message": "缺少 user_qq 或 message"}
bots = nonebot.get_bots().values()
sent_any = False
for b in bots:
try:
if channel == "group" and group_id:
await b.call_api("send_group_msg", {"group_id": int(group_id), "message": message})
else:
await b.call_api("send_private_msg", {"user_id": str(user_qq), "message": message})
sent_any = True
except Exception:
continue
if not sent_any:
return {"success": False, "message": "未能向任何机器人发送消息"}
return {"success": True}
app.include_router(router)