From cde94d3287db83ccebfde2c045a0bf8045053771 Mon Sep 17 00:00:00 2001 From: dnslin Date: Fri, 12 Dec 2025 10:01:15 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E4=B8=8B=E8=BD=BD?= =?UTF-8?q?=E6=9A=82=E5=81=9C=E7=AD=89aria2=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 14 +- pyproject.toml | 1 + src/aria2/__init__.py | 5 +- src/aria2/rpc.py | 222 ++++++++++++++++++++++++ src/core/__init__.py | 2 + src/core/exceptions.py | 4 + src/telegram/app.py | 5 + src/telegram/handlers.py | 348 +++++++++++++++++++++++++++++++++++++- src/telegram/keyboards.py | 103 +++++++++++ uv.lock | 2 + 10 files changed, 693 insertions(+), 13 deletions(-) create mode 100644 src/aria2/rpc.py create mode 100644 src/telegram/keyboards.py diff --git a/CLAUDE.md b/CLAUDE.md index 31682be..ee65df0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -29,11 +29,11 @@ Copy `.env.example` to `.env` and set: Three-layer design: -- `src/telegram/` - Bot interface (handlers.py defines commands, app.py runs polling) -- `src/aria2/` - aria2 management (installer.py downloads/configures, service.py manages systemd) +- `src/telegram/` - Bot interface (handlers.py defines commands, keyboards.py builds inline keyboards, app.py runs polling) +- `src/aria2/` - aria2 management (installer.py downloads/configures, service.py manages systemd, rpc.py communicates with aria2) - `src/core/` - Shared utilities (constants, config dataclasses, exceptions, system detection) -Flow: Telegram command → `Aria2BotAPI` handler → `Aria2Installer` or `Aria2ServiceManager` → system +Flow: Telegram command → `Aria2BotAPI` handler → `Aria2Installer` or `Aria2ServiceManager` or `Aria2RpcClient` → system/aria2 ## Key Paths (defined in src/core/constants.py) @@ -44,5 +44,11 @@ Flow: Telegram command → `Aria2BotAPI` handler → `Aria2Installer` or `Aria2S ## Bot Commands -/install, /uninstall, /start, /stop, /restart, /status, /logs, /clear_logs, /set_secret, /reset_secret, /help +服务管理: /install, /uninstall, /start, /stop, /restart, /status, /logs, /clear_logs, /set_secret, /reset_secret + +下载管理: /add , /list, /stats + +其他: /help + +支持发送 .torrent 文件直接添加下载任务 diff --git a/pyproject.toml b/pyproject.toml index c10ea3f..9f72808 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.13" dependencies = [ "python-dotenv>=1.2.1", "python-telegram-bot>=21.0", + "httpx>=0.27.0", ] [project.scripts] diff --git a/src/aria2/__init__.py b/src/aria2/__init__.py index 5aae1ef..e6fdc84 100644 --- a/src/aria2/__init__.py +++ b/src/aria2/__init__.py @@ -1,5 +1,6 @@ -"""Aria2 operations module - installer and service management.""" +"""Aria2 operations module - installer, service management, and RPC client.""" from src.aria2.installer import Aria2Installer from src.aria2.service import Aria2ServiceManager +from src.aria2.rpc import Aria2RpcClient, DownloadTask -__all__ = ["Aria2Installer", "Aria2ServiceManager"] +__all__ = ["Aria2Installer", "Aria2ServiceManager", "Aria2RpcClient", "DownloadTask"] diff --git a/src/aria2/rpc.py b/src/aria2/rpc.py new file mode 100644 index 0000000..323774a --- /dev/null +++ b/src/aria2/rpc.py @@ -0,0 +1,222 @@ +"""aria2 JSON-RPC 2.0 客户端""" +from __future__ import annotations + +import base64 +import json +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import httpx + +from src.core.exceptions import RpcError +from src.utils.logger import get_logger + +logger = get_logger("rpc") + + +def _format_size(size: int) -> str: + """格式化字节大小""" + for unit in ("B", "KB", "MB", "GB"): + if size < 1024: + return f"{size:.1f}{unit}" + size /= 1024 + return f"{size:.1f}TB" + + +@dataclass +class DownloadTask: + """下载任务数据类""" + gid: str + status: str # active, waiting, paused, error, complete, removed + name: str + total_length: int + completed_length: int + download_speed: int + upload_speed: int = 0 + error_message: str = "" + dir: str = "" + + @property + def progress(self) -> float: + """计算下载进度百分比""" + if self.total_length == 0: + return 0.0 + return (self.completed_length / self.total_length) * 100 + + @property + def progress_bar(self) -> str: + """生成进度条""" + pct = int(self.progress / 10) + return "█" * pct + "░" * (10 - pct) + + @property + def speed_str(self) -> str: + """格式化下载速度""" + return _format_size(self.download_speed) + "/s" + + @property + def size_str(self) -> str: + """格式化文件大小""" + return f"{_format_size(self.completed_length)}/{_format_size(self.total_length)}" + + +class Aria2RpcClient: + """aria2 RPC 客户端""" + + def __init__(self, host: str = "localhost", port: int = 6800, secret: str = ""): + self.url = f"http://{host}:{port}/jsonrpc" + self.secret = secret + + async def _call(self, method: str, params: list | None = None) -> Any: + """发送 RPC 请求""" + payload = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": method, + "params": [], + } + # 添加 token 认证 + if self.secret: + payload["params"].append(f"token:{self.secret}") + if params: + payload["params"].extend(params) + + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(self.url, json=payload) + data = resp.json() + except httpx.ConnectError: + raise RpcError("aria2 服务可能未运行,请先使用 /start 命令启动服务") from None + except httpx.RequestError as e: + raise RpcError(f"RPC 请求失败: {e}") from e + except json.JSONDecodeError as e: + raise RpcError(f"RPC 响应解析失败: {e}") from e + + if "error" in data: + raise RpcError(data["error"].get("message", "未知错误")) + return data.get("result") + + # === 添加任务 === + + async def add_uri(self, uri: str) -> str: + """添加 URL 下载任务,返回 GID""" + result = await self._call("aria2.addUri", [[uri]]) + logger.info(f"添加下载任务: {uri[:50]}..., GID={result}") + return result + + async def add_torrent(self, torrent_data: bytes) -> str: + """添加种子下载任务,返回 GID""" + b64_data = base64.b64encode(torrent_data).decode("utf-8") + result = await self._call("aria2.addTorrent", [b64_data]) + logger.info(f"添加种子任务, GID={result}") + return result + + # === 任务控制 === + + async def pause(self, gid: str) -> str: + """暂停任务""" + return await self._call("aria2.pause", [gid]) + + async def unpause(self, gid: str) -> str: + """恢复任务""" + return await self._call("aria2.unpause", [gid]) + + async def remove(self, gid: str) -> str: + """删除任务(仅从队列移除)""" + return await self._call("aria2.remove", [gid]) + + async def force_remove(self, gid: str) -> str: + """强制删除任务""" + return await self._call("aria2.forceRemove", [gid]) + + async def remove_download_result(self, gid: str) -> str: + """删除已完成/错误任务的记录""" + return await self._call("aria2.removeDownloadResult", [gid]) + + # === 查询任务 === + + async def get_status(self, gid: str) -> DownloadTask: + """获取单个任务状态""" + keys = ["gid", "status", "totalLength", "completedLength", + "downloadSpeed", "uploadSpeed", "files", "errorMessage", "dir"] + result = await self._call("aria2.tellStatus", [gid, keys]) + return self._parse_task(result) + + async def get_active(self) -> list[DownloadTask]: + """获取活动任务列表""" + keys = ["gid", "status", "totalLength", "completedLength", + "downloadSpeed", "uploadSpeed", "files", "dir"] + result = await self._call("aria2.tellActive", [keys]) + return [self._parse_task(t) for t in result] + + async def get_waiting(self, offset: int = 0, num: int = 100) -> list[DownloadTask]: + """获取等待/暂停任务列表""" + keys = ["gid", "status", "totalLength", "completedLength", + "downloadSpeed", "uploadSpeed", "files", "dir"] + result = await self._call("aria2.tellWaiting", [offset, num, keys]) + return [self._parse_task(t) for t in result] + + async def get_stopped(self, offset: int = 0, num: int = 100) -> list[DownloadTask]: + """获取已停止任务列表(完成/错误)""" + keys = ["gid", "status", "totalLength", "completedLength", + "downloadSpeed", "uploadSpeed", "files", "errorMessage", "dir"] + result = await self._call("aria2.tellStopped", [offset, num, keys]) + return [self._parse_task(t) for t in result] + + async def get_global_stat(self) -> dict: + """获取全局统计""" + return await self._call("aria2.getGlobalStat") + + # === 文件操作 === + + async def get_files(self, gid: str) -> list[dict]: + """获取任务文件列表""" + return await self._call("aria2.getFiles", [gid]) + + def delete_files(self, task: DownloadTask) -> bool: + """删除任务对应的文件(同步方法)""" + if not task.dir or not task.name: + return False + try: + file_path = Path(task.dir) / task.name + if file_path.exists(): + if file_path.is_dir(): + import shutil + shutil.rmtree(file_path) + else: + file_path.unlink() + logger.info(f"已删除文件: {file_path}") + return True + except OSError as e: + logger.error(f"删除文件失败: {e}") + return False + + # === 内部方法 === + + def _parse_task(self, data: dict) -> DownloadTask: + """解析任务数据""" + # 从 files 中提取文件名 + name = "未知文件" + if data.get("files"): + path = data["files"][0].get("path", "") + if path: + name = path.split("/")[-1] + elif data["files"][0].get("uris"): + uris = data["files"][0]["uris"] + if uris: + uri = uris[0].get("uri", "") + name = uri.split("/")[-1].split("?")[0] or uri[:30] + + return DownloadTask( + gid=data.get("gid", ""), + status=data.get("status", "unknown"), + name=name[:40] if len(name) > 40 else name, # 截断文件名 + total_length=int(data.get("totalLength", 0)), + completed_length=int(data.get("completedLength", 0)), + download_speed=int(data.get("downloadSpeed", 0)), + upload_speed=int(data.get("uploadSpeed", 0)), + error_message=data.get("errorMessage", ""), + dir=data.get("dir", ""), + ) diff --git a/src/core/__init__.py b/src/core/__init__.py index 2145487..00e5850 100644 --- a/src/core/__init__.py +++ b/src/core/__init__.py @@ -20,6 +20,7 @@ from src.core.exceptions import ( ConfigError, ServiceError, NotInstalledError, + RpcError, ) from src.core.config import Aria2Config, BotConfig from src.core.system import ( @@ -49,6 +50,7 @@ __all__ = [ "ConfigError", "ServiceError", "NotInstalledError", + "RpcError", "Aria2Config", "BotConfig", "detect_os", diff --git a/src/core/exceptions.py b/src/core/exceptions.py index 6c28c31..5469fe3 100644 --- a/src/core/exceptions.py +++ b/src/core/exceptions.py @@ -27,3 +27,7 @@ class ServiceError(Aria2Error): class NotInstalledError(Aria2Error): """aria2 未安装""" + + +class RpcError(Aria2Error): + """RPC 调用失败""" diff --git a/src/telegram/app.py b/src/telegram/app.py index cb76a6b..e7461fe 100644 --- a/src/telegram/app.py +++ b/src/telegram/app.py @@ -13,6 +13,11 @@ from src.utils import setup_logger # Bot 命令列表,用于 Telegram 命令自动补全 BOT_COMMANDS = [ + # 下载管理 + BotCommand("add", "添加下载任务"), + BotCommand("list", "查看下载列表"), + BotCommand("stats", "全局下载统计"), + # 服务管理 BotCommand("install", "安装 aria2"), BotCommand("uninstall", "卸载 aria2"), BotCommand("start", "启动 aria2 服务"), diff --git a/src/telegram/handlers.py b/src/telegram/handlers.py index 501a8e6..775648f 100644 --- a/src/telegram/handlers.py +++ b/src/telegram/handlers.py @@ -2,7 +2,7 @@ from __future__ import annotations from telegram import Update -from telegram.ext import ContextTypes, CommandHandler +from telegram.ext import ContextTypes, CommandHandler, CallbackQueryHandler, MessageHandler, filters from src.utils.logger import get_logger @@ -13,12 +13,23 @@ from src.core import ( ServiceError, DownloadError, ConfigError, + RpcError, is_aria2_installed, get_aria2_version, generate_rpc_secret, ARIA2_CONF, ) from src.aria2 import Aria2Installer, Aria2ServiceManager +from src.aria2.rpc import Aria2RpcClient, DownloadTask, _format_size +from src.telegram.keyboards import ( + STATUS_EMOJI, + build_list_type_keyboard, + build_task_keyboard, + build_task_list_keyboard, + build_delete_confirm_keyboard, + build_detail_keyboard, + build_after_add_keyboard, +) logger = get_logger("handlers") @@ -36,6 +47,15 @@ class Aria2BotAPI: self.config = config or Aria2Config() self.installer = Aria2Installer(self.config) self.service = Aria2ServiceManager() + self._rpc: Aria2RpcClient | None = None + + def _get_rpc_client(self) -> Aria2RpcClient: + """获取或创建 RPC 客户端""" + if self._rpc is None: + secret = self._get_rpc_secret() + port = self._get_rpc_port() or 6800 + self._rpc = Aria2RpcClient(port=port, secret=secret) + return self._rpc async def _reply(self, update: Update, context: ContextTypes.DEFAULT_TYPE, text: str, **kwargs): if update.effective_message: @@ -285,6 +305,7 @@ class Aria2BotAPI: async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.info(f"收到 /help 命令 - {_get_user_info(update)}") commands = [ + "*服务管理*", "/install - 安装 aria2", "/uninstall - 卸载 aria2", "/start - 启动 aria2 服务", @@ -292,17 +313,322 @@ class Aria2BotAPI: "/restart - 重启 aria2 服务", "/status - 查看 aria2 状态", "/logs - 查看最近日志", - "/clear_logs - 清空日志", - "/set_secret <密钥> - 设置自定义 RPC 密钥", - "/reset_secret - 重新生成随机 RPC 密钥", + "/clear\\_logs - 清空日志", + "/set\\_secret <密钥> - 设置 RPC 密钥", + "/reset\\_secret - 重新生成 RPC 密钥", + "", + "*下载管理*", + "/add - 添加下载任务", + "/list - 查看下载列表", + "/stats - 全局下载统计", + "", "/help - 显示此帮助", ] - await self._reply(update, context, "可用命令:\n" + "\n".join(commands)) + await self._reply(update, context, "可用命令:\n" + "\n".join(commands), parse_mode="Markdown") + + # === 下载管理命令 === + + async def add_download(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """/add - 添加下载任务""" + logger.info(f"收到 /add 命令 - {_get_user_info(update)}") + if not context.args: + await self._reply(update, context, "用法: /add \n支持 HTTP/HTTPS/磁力链接") + return + + url = context.args[0] + try: + rpc = self._get_rpc_client() + gid = await rpc.add_uri(url) + task = await rpc.get_status(gid) + # 转义文件名中的 Markdown 特殊字符 + safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`") + text = f"✅ 任务已添加\n📄 {safe_name}\n🆔 GID: `{gid}`" + keyboard = build_after_add_keyboard(gid) + await self._reply(update, context, text, parse_mode="Markdown", reply_markup=keyboard) + logger.info(f"/add 命令执行成功, GID={gid} - {_get_user_info(update)}") + except RpcError as e: + logger.error(f"/add 命令执行失败: {e} - {_get_user_info(update)}") + await self._reply(update, context, f"❌ 添加失败: {e}") + + async def handle_torrent(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """处理用户发送的种子文件""" + logger.info(f"收到种子文件 - {_get_user_info(update)}") + document = update.message.document + if not document or not document.file_name.endswith(".torrent"): + return + + try: + file = await context.bot.get_file(document.file_id) + torrent_data = await file.download_as_bytearray() + rpc = self._get_rpc_client() + gid = await rpc.add_torrent(bytes(torrent_data)) + task = await rpc.get_status(gid) + # 转义文件名中的 Markdown 特殊字符 + safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`") + text = f"✅ 种子任务已添加\n📄 {safe_name}\n🆔 GID: `{gid}`" + keyboard = build_after_add_keyboard(gid) + await self._reply(update, context, text, parse_mode="Markdown", reply_markup=keyboard) + logger.info(f"种子任务添加成功, GID={gid} - {_get_user_info(update)}") + except RpcError as e: + logger.error(f"种子任务添加失败: {e} - {_get_user_info(update)}") + await self._reply(update, context, f"❌ 添加种子失败: {e}") + + async def list_downloads(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """/list - 查看下载列表""" + logger.info(f"收到 /list 命令 - {_get_user_info(update)}") + try: + rpc = self._get_rpc_client() + stat = await rpc.get_global_stat() + active_count = int(stat.get("numActive", 0)) + waiting_count = int(stat.get("numWaiting", 0)) + stopped_count = int(stat.get("numStopped", 0)) + + keyboard = build_list_type_keyboard(active_count, waiting_count, stopped_count) + await self._reply(update, context, "📥 选择查看类型:", reply_markup=keyboard) + except RpcError as e: + logger.error(f"/list 命令执行失败: {e} - {_get_user_info(update)}") + await self._reply(update, context, f"❌ 获取列表失败: {e}") + + async def global_stats(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """/stats - 全局下载统计""" + logger.info(f"收到 /stats 命令 - {_get_user_info(update)}") + try: + rpc = self._get_rpc_client() + stat = await rpc.get_global_stat() + text = ( + "📊 *全局统计*\n" + f"⬇️ 下载速度: {_format_size(int(stat.get('downloadSpeed', 0)))}/s\n" + f"⬆️ 上传速度: {_format_size(int(stat.get('uploadSpeed', 0)))}/s\n" + f"▶️ 活动任务: {stat.get('numActive', 0)}\n" + f"⏳ 等待任务: {stat.get('numWaiting', 0)}\n" + f"⏹️ 已停止: {stat.get('numStopped', 0)}" + ) + await self._reply(update, context, text, parse_mode="Markdown") + except RpcError as e: + logger.error(f"/stats 命令执行失败: {e} - {_get_user_info(update)}") + await self._reply(update, context, f"❌ 获取统计失败: {e}") + + # === Callback Query 处理 === + + async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """处理 Inline Keyboard 回调""" + query = update.callback_query + + try: + await query.answer() + except Exception as e: + logger.warning(f"回调应答失败 (可忽略): {e}") + + data = query.data + if not data: + return + + parts = data.split(":") + action = parts[0] + + try: + rpc = self._get_rpc_client() + + if action == "list": + await self._handle_list_callback(query, rpc, parts) + elif action == "pause": + await self._handle_pause_callback(query, rpc, parts[1]) + elif action == "resume": + await self._handle_resume_callback(query, rpc, parts[1]) + elif action == "delete": + await self._handle_delete_callback(query, parts[1]) + elif action == "confirm_del": + await self._handle_confirm_delete_callback(query, rpc, parts[1], parts[2]) + elif action == "detail": + await self._handle_detail_callback(query, rpc, parts[1]) + elif action == "stats": + await self._handle_stats_callback(query, rpc) + elif action == "cancel": + await query.edit_message_text("❌ 操作已取消") + + except RpcError as e: + await query.edit_message_text(f"❌ 操作失败: {e}") + + async def _handle_list_callback(self, query, rpc: Aria2RpcClient, parts: list) -> None: + """处理列表相关回调""" + if parts[1] == "menu": + stat = await rpc.get_global_stat() + keyboard = build_list_type_keyboard( + int(stat.get("numActive", 0)), + int(stat.get("numWaiting", 0)), + int(stat.get("numStopped", 0)), + ) + await query.edit_message_text("📥 选择查看类型:", reply_markup=keyboard) + return + + list_type = parts[1] + page = int(parts[2]) if len(parts) > 2 else 1 + + if list_type == "active": + tasks = await rpc.get_active() + title = "▶️ 活动任务" + elif list_type == "waiting": + tasks = await rpc.get_waiting() + title = "⏳ 等待任务" + else: # stopped + tasks = await rpc.get_stopped() + title = "✅ 已完成/错误" + + await self._send_task_list(query, tasks, page, list_type, title) + + async def _send_task_list(self, query, tasks: list[DownloadTask], page: int, list_type: str, title: str) -> None: + """发送任务列表""" + page_size = 5 + total_pages = max(1, (len(tasks) + page_size - 1) // page_size) + start = (page - 1) * page_size + page_tasks = tasks[start:start + page_size] + + if not tasks: + keyboard = build_task_list_keyboard(1, 1, list_type) + await query.edit_message_text(f"{title}\n\n📭 暂无任务", reply_markup=keyboard) + return + + lines = [f"{title} ({page}/{total_pages})\n"] + for t in page_tasks: + emoji = STATUS_EMOJI.get(t.status, "❓") + lines.append(f"{emoji} {t.name}") + lines.append(f" {t.progress_bar} {t.progress:.1f}%") + lines.append(f" {t.size_str} | {t.speed_str}") + # 添加操作按钮提示 + if t.status == "active": + lines.append(f" ⏸ /pause\\_{t.gid[:8]}") + elif t.status in ("paused", "waiting"): + lines.append(f" ▶️ /resume\\_{t.gid[:8]}") + lines.append(f" 📋 详情: 点击下方按钮\n") + + # 为每个任务添加操作按钮 + task_buttons = [] + for t in page_tasks: + row = [] + if t.status == "active": + row.append({"text": f"⏸ {t.gid[:6]}", "callback_data": f"pause:{t.gid}"}) + elif t.status in ("paused", "waiting"): + row.append({"text": f"▶️ {t.gid[:6]}", "callback_data": f"resume:{t.gid}"}) + row.append({"text": f"🗑 {t.gid[:6]}", "callback_data": f"delete:{t.gid}"}) + row.append({"text": f"📋 {t.gid[:6]}", "callback_data": f"detail:{t.gid}"}) + task_buttons.append(row) + + # 构建完整键盘 + from telegram import InlineKeyboardButton, InlineKeyboardMarkup + keyboard_rows = [] + for row in task_buttons: + keyboard_rows.append([InlineKeyboardButton(b["text"], callback_data=b["callback_data"]) for b in row]) + + # 添加翻页按钮 + nav_buttons = [] + if page > 1: + nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"list:{list_type}:{page - 1}")) + if page < total_pages: + nav_buttons.append(InlineKeyboardButton("➡️ 下一页", callback_data=f"list:{list_type}:{page + 1}")) + if nav_buttons: + keyboard_rows.append(nav_buttons) + + keyboard_rows.append([InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")]) + + await query.edit_message_text("\n".join(lines), reply_markup=InlineKeyboardMarkup(keyboard_rows)) + + async def _handle_pause_callback(self, query, rpc: Aria2RpcClient, gid: str) -> None: + """处理暂停回调""" + await rpc.pause(gid) + task = await rpc.get_status(gid) + safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`") + keyboard = build_task_keyboard(gid, task.status) + await query.edit_message_text(f"⏸️ 任务已暂停\n📄 {safe_name}\n🆔 GID: `{gid}`", + parse_mode="Markdown", reply_markup=keyboard) + + async def _handle_resume_callback(self, query, rpc: Aria2RpcClient, gid: str) -> None: + """处理恢复回调""" + await rpc.unpause(gid) + task = await rpc.get_status(gid) + safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`") + keyboard = build_task_keyboard(gid, task.status) + await query.edit_message_text(f"▶️ 任务已恢复\n📄 {safe_name}\n🆔 GID: `{gid}`", + parse_mode="Markdown", reply_markup=keyboard) + + async def _handle_delete_callback(self, query, gid: str) -> None: + """处理删除确认回调""" + keyboard = build_delete_confirm_keyboard(gid) + await query.edit_message_text(f"⚠️ 确认删除任务?\n🆔 GID: `{gid}`", + parse_mode="Markdown", reply_markup=keyboard) + + async def _handle_confirm_delete_callback(self, query, rpc: Aria2RpcClient, gid: str, delete_file: str) -> None: + """处理确认删除回调""" + task = None + try: + task = await rpc.get_status(gid) + except RpcError: + pass + + # 尝试删除任务 + try: + await rpc.remove(gid) + except RpcError: + try: + await rpc.force_remove(gid) + except RpcError: + pass + try: + await rpc.remove_download_result(gid) + except RpcError: + pass + + # 如果需要删除文件 + file_deleted = False + if delete_file == "1" and task: + file_deleted = rpc.delete_files(task) + + msg = f"🗑️ 任务已删除\n🆔 GID: `{gid}`" + if delete_file == "1": + msg += f"\n📁 文件: {'已删除' if file_deleted else '删除失败或不存在'}" + + await query.edit_message_text(msg, parse_mode="Markdown") + + async def _handle_detail_callback(self, query, rpc: Aria2RpcClient, gid: str) -> None: + """处理详情回调""" + task = await rpc.get_status(gid) + emoji = STATUS_EMOJI.get(task.status, "❓") + safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`") + text = ( + f"📋 *任务详情*\n" + f"📄 文件: {safe_name}\n" + f"🆔 GID: `{task.gid}`\n" + f"📊 状态: {emoji} {task.status}\n" + f"📈 进度: {task.progress_bar} {task.progress:.1f}%\n" + f"📦 大小: {task.size_str}\n" + f"⬇️ 下载: {task.speed_str}\n" + f"⬆️ 上传: {_format_size(task.upload_speed)}/s" + ) + if task.error_message: + text += f"\n❌ 错误: {task.error_message}" + + keyboard = build_detail_keyboard(gid, task.status) + await query.edit_message_text(text, parse_mode="Markdown", reply_markup=keyboard) + + async def _handle_stats_callback(self, query, rpc: Aria2RpcClient) -> None: + """处理统计回调""" + stat = await rpc.get_global_stat() + text = ( + "📊 *全局统计*\n" + f"⬇️ 下载速度: {_format_size(int(stat.get('downloadSpeed', 0)))}/s\n" + f"⬆️ 上传速度: {_format_size(int(stat.get('uploadSpeed', 0)))}/s\n" + f"▶️ 活动任务: {stat.get('numActive', 0)}\n" + f"⏳ 等待任务: {stat.get('numWaiting', 0)}\n" + f"⏹️ 已停止: {stat.get('numStopped', 0)}" + ) + from telegram import InlineKeyboardButton, InlineKeyboardMarkup + keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")]]) + await query.edit_message_text(text, parse_mode="Markdown", reply_markup=keyboard) -def build_handlers(api: Aria2BotAPI) -> list[CommandHandler]: - """构建 CommandHandler 列表""" +def build_handlers(api: Aria2BotAPI) -> list: + """构建 Handler 列表""" return [ + # 服务管理命令 CommandHandler("install", api.install), CommandHandler("uninstall", api.uninstall), CommandHandler("start", api.start_service), @@ -314,4 +640,12 @@ def build_handlers(api: Aria2BotAPI) -> list[CommandHandler]: CommandHandler("set_secret", api.set_secret), CommandHandler("reset_secret", api.reset_secret), CommandHandler("help", api.help_command), + # 下载管理命令 + CommandHandler("add", api.add_download), + CommandHandler("list", api.list_downloads), + CommandHandler("stats", api.global_stats), + # 种子文件处理 + MessageHandler(filters.Document.FileExtension("torrent"), api.handle_torrent), + # Callback Query 处理 + CallbackQueryHandler(api.handle_callback), ] diff --git a/src/telegram/keyboards.py b/src/telegram/keyboards.py new file mode 100644 index 0000000..1c3a024 --- /dev/null +++ b/src/telegram/keyboards.py @@ -0,0 +1,103 @@ +"""Telegram 键盘构建工具""" +from __future__ import annotations + +from telegram import InlineKeyboardButton, InlineKeyboardMarkup + +# 状态 emoji 映射 +STATUS_EMOJI = { + "active": "⬇️", + "waiting": "⏳", + "paused": "⏸️", + "complete": "✅", + "error": "❌", + "removed": "🗑️", +} + + +def build_list_type_keyboard(active_count: int, waiting_count: int, stopped_count: int) -> InlineKeyboardMarkup: + """构建列表类型选择键盘""" + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton(f"▶️ 活动 ({active_count})", callback_data="list:active:1"), + InlineKeyboardButton(f"⏳ 等待 ({waiting_count})", callback_data="list:waiting:1"), + ], + [ + InlineKeyboardButton(f"✅ 已完成 ({stopped_count})", callback_data="list:stopped:1"), + InlineKeyboardButton("📊 统计", callback_data="stats"), + ], + ]) + + +def build_task_keyboard(gid: str, status: str) -> InlineKeyboardMarkup: + """构建单个任务的操作按钮""" + buttons = [] + + if status == "active": + buttons.append(InlineKeyboardButton("⏸ 暂停", callback_data=f"pause:{gid}")) + elif status in ("paused", "waiting"): + buttons.append(InlineKeyboardButton("▶️ 恢复", callback_data=f"resume:{gid}")) + + buttons.append(InlineKeyboardButton("🗑 删除", callback_data=f"delete:{gid}")) + buttons.append(InlineKeyboardButton("📋 详情", callback_data=f"detail:{gid}")) + + return InlineKeyboardMarkup([buttons]) + + +def build_task_list_keyboard(page: int, total_pages: int, list_type: str) -> InlineKeyboardMarkup | None: + """构建任务列表的翻页按钮""" + nav_buttons = [] + + if page > 1: + nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"list:{list_type}:{page - 1}")) + if page < total_pages: + nav_buttons.append(InlineKeyboardButton("➡️ 下一页", callback_data=f"list:{list_type}:{page + 1}")) + + # 返回按钮 + back_button = [InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")] + + rows = [] + if nav_buttons: + rows.append(nav_buttons) + rows.append(back_button) + + return InlineKeyboardMarkup(rows) + + +def build_delete_confirm_keyboard(gid: str) -> InlineKeyboardMarkup: + """构建删除确认按钮(含是否删除文件选项)""" + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton("✅ 仅删任务", callback_data=f"confirm_del:{gid}:0"), + InlineKeyboardButton("🗑 删任务+文件", callback_data=f"confirm_del:{gid}:1"), + ], + [ + InlineKeyboardButton("❌ 取消", callback_data="cancel"), + ], + ]) + + +def build_detail_keyboard(gid: str, status: str) -> InlineKeyboardMarkup: + """构建详情页面的操作按钮""" + buttons = [] + + if status == "active": + buttons.append(InlineKeyboardButton("⏸ 暂停", callback_data=f"pause:{gid}")) + elif status in ("paused", "waiting"): + buttons.append(InlineKeyboardButton("▶️ 恢复", callback_data=f"resume:{gid}")) + + buttons.append(InlineKeyboardButton("🗑 删除", callback_data=f"delete:{gid}")) + + return InlineKeyboardMarkup([ + buttons, + [InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")], + ]) + + +def build_after_add_keyboard(gid: str) -> InlineKeyboardMarkup: + """构建添加任务后的操作按钮""" + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton("📋 查看详情", callback_data=f"detail:{gid}"), + InlineKeyboardButton("📥 查看列表", callback_data="list:menu"), + ], + ]) diff --git a/uv.lock b/uv.lock index bc7b7ed..e198cdd 100644 --- a/uv.lock +++ b/uv.lock @@ -19,12 +19,14 @@ name = "aria2bot" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "httpx" }, { name = "python-dotenv" }, { name = "python-telegram-bot" }, ] [package.metadata] requires-dist = [ + { name = "httpx", specifier = ">=0.27.0" }, { name = "python-dotenv", specifier = ">=1.2.1" }, { name = "python-telegram-bot", specifier = ">=21.0" }, ]