feat: 增加下载暂停等aria2功能

This commit is contained in:
dnslin
2025-12-12 10:01:15 +08:00
parent cf1d98610e
commit cde94d3287
10 changed files with 693 additions and 13 deletions

View File

@@ -29,11 +29,11 @@ Copy `.env.example` to `.env` and set:
Three-layer design:
- `src/telegram/` - Bot interface (handlers.py defines commands, app.py runs polling)
- `src/aria2/` - aria2 management (installer.py downloads/configures, service.py manages systemd)
- `src/telegram/` - Bot interface (handlers.py defines commands, keyboards.py builds inline keyboards, app.py runs polling)
- `src/aria2/` - aria2 management (installer.py downloads/configures, service.py manages systemd, rpc.py communicates with aria2)
- `src/core/` - Shared utilities (constants, config dataclasses, exceptions, system detection)
Flow: Telegram command → `Aria2BotAPI` handler → `Aria2Installer` or `Aria2ServiceManager` → system
Flow: Telegram command → `Aria2BotAPI` handler → `Aria2Installer` or `Aria2ServiceManager` or `Aria2RpcClient` → system/aria2
## Key Paths (defined in src/core/constants.py)
@@ -44,5 +44,11 @@ Flow: Telegram command → `Aria2BotAPI` handler → `Aria2Installer` or `Aria2S
## Bot Commands
/install, /uninstall, /start, /stop, /restart, /status, /logs, /clear_logs, /set_secret, /reset_secret, /help
服务管理: /install, /uninstall, /start, /stop, /restart, /status, /logs, /clear_logs, /set_secret, /reset_secret
下载管理: /add <URL>, /list, /stats
其他: /help
支持发送 .torrent 文件直接添加下载任务

View File

@@ -7,6 +7,7 @@ requires-python = ">=3.13"
dependencies = [
"python-dotenv>=1.2.1",
"python-telegram-bot>=21.0",
"httpx>=0.27.0",
]
[project.scripts]

View File

@@ -1,5 +1,6 @@
"""Aria2 operations module - installer and service management."""
"""Aria2 operations module - installer, service management, and RPC client."""
from src.aria2.installer import Aria2Installer
from src.aria2.service import Aria2ServiceManager
from src.aria2.rpc import Aria2RpcClient, DownloadTask
__all__ = ["Aria2Installer", "Aria2ServiceManager"]
__all__ = ["Aria2Installer", "Aria2ServiceManager", "Aria2RpcClient", "DownloadTask"]

222
src/aria2/rpc.py Normal file
View File

@@ -0,0 +1,222 @@
"""aria2 JSON-RPC 2.0 客户端"""
from __future__ import annotations
import base64
import json
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import httpx
from src.core.exceptions import RpcError
from src.utils.logger import get_logger
logger = get_logger("rpc")
def _format_size(size: int) -> str:
"""格式化字节大小"""
for unit in ("B", "KB", "MB", "GB"):
if size < 1024:
return f"{size:.1f}{unit}"
size /= 1024
return f"{size:.1f}TB"
@dataclass
class DownloadTask:
"""下载任务数据类"""
gid: str
status: str # active, waiting, paused, error, complete, removed
name: str
total_length: int
completed_length: int
download_speed: int
upload_speed: int = 0
error_message: str = ""
dir: str = ""
@property
def progress(self) -> float:
"""计算下载进度百分比"""
if self.total_length == 0:
return 0.0
return (self.completed_length / self.total_length) * 100
@property
def progress_bar(self) -> str:
"""生成进度条"""
pct = int(self.progress / 10)
return "" * pct + "" * (10 - pct)
@property
def speed_str(self) -> str:
"""格式化下载速度"""
return _format_size(self.download_speed) + "/s"
@property
def size_str(self) -> str:
"""格式化文件大小"""
return f"{_format_size(self.completed_length)}/{_format_size(self.total_length)}"
class Aria2RpcClient:
"""aria2 RPC 客户端"""
def __init__(self, host: str = "localhost", port: int = 6800, secret: str = ""):
self.url = f"http://{host}:{port}/jsonrpc"
self.secret = secret
async def _call(self, method: str, params: list | None = None) -> Any:
"""发送 RPC 请求"""
payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": method,
"params": [],
}
# 添加 token 认证
if self.secret:
payload["params"].append(f"token:{self.secret}")
if params:
payload["params"].extend(params)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(self.url, json=payload)
data = resp.json()
except httpx.ConnectError:
raise RpcError("aria2 服务可能未运行,请先使用 /start 命令启动服务") from None
except httpx.RequestError as e:
raise RpcError(f"RPC 请求失败: {e}") from e
except json.JSONDecodeError as e:
raise RpcError(f"RPC 响应解析失败: {e}") from e
if "error" in data:
raise RpcError(data["error"].get("message", "未知错误"))
return data.get("result")
# === 添加任务 ===
async def add_uri(self, uri: str) -> str:
"""添加 URL 下载任务,返回 GID"""
result = await self._call("aria2.addUri", [[uri]])
logger.info(f"添加下载任务: {uri[:50]}..., GID={result}")
return result
async def add_torrent(self, torrent_data: bytes) -> str:
"""添加种子下载任务,返回 GID"""
b64_data = base64.b64encode(torrent_data).decode("utf-8")
result = await self._call("aria2.addTorrent", [b64_data])
logger.info(f"添加种子任务, GID={result}")
return result
# === 任务控制 ===
async def pause(self, gid: str) -> str:
"""暂停任务"""
return await self._call("aria2.pause", [gid])
async def unpause(self, gid: str) -> str:
"""恢复任务"""
return await self._call("aria2.unpause", [gid])
async def remove(self, gid: str) -> str:
"""删除任务(仅从队列移除)"""
return await self._call("aria2.remove", [gid])
async def force_remove(self, gid: str) -> str:
"""强制删除任务"""
return await self._call("aria2.forceRemove", [gid])
async def remove_download_result(self, gid: str) -> str:
"""删除已完成/错误任务的记录"""
return await self._call("aria2.removeDownloadResult", [gid])
# === 查询任务 ===
async def get_status(self, gid: str) -> DownloadTask:
"""获取单个任务状态"""
keys = ["gid", "status", "totalLength", "completedLength",
"downloadSpeed", "uploadSpeed", "files", "errorMessage", "dir"]
result = await self._call("aria2.tellStatus", [gid, keys])
return self._parse_task(result)
async def get_active(self) -> list[DownloadTask]:
"""获取活动任务列表"""
keys = ["gid", "status", "totalLength", "completedLength",
"downloadSpeed", "uploadSpeed", "files", "dir"]
result = await self._call("aria2.tellActive", [keys])
return [self._parse_task(t) for t in result]
async def get_waiting(self, offset: int = 0, num: int = 100) -> list[DownloadTask]:
"""获取等待/暂停任务列表"""
keys = ["gid", "status", "totalLength", "completedLength",
"downloadSpeed", "uploadSpeed", "files", "dir"]
result = await self._call("aria2.tellWaiting", [offset, num, keys])
return [self._parse_task(t) for t in result]
async def get_stopped(self, offset: int = 0, num: int = 100) -> list[DownloadTask]:
"""获取已停止任务列表(完成/错误)"""
keys = ["gid", "status", "totalLength", "completedLength",
"downloadSpeed", "uploadSpeed", "files", "errorMessage", "dir"]
result = await self._call("aria2.tellStopped", [offset, num, keys])
return [self._parse_task(t) for t in result]
async def get_global_stat(self) -> dict:
"""获取全局统计"""
return await self._call("aria2.getGlobalStat")
# === 文件操作 ===
async def get_files(self, gid: str) -> list[dict]:
"""获取任务文件列表"""
return await self._call("aria2.getFiles", [gid])
def delete_files(self, task: DownloadTask) -> bool:
"""删除任务对应的文件(同步方法)"""
if not task.dir or not task.name:
return False
try:
file_path = Path(task.dir) / task.name
if file_path.exists():
if file_path.is_dir():
import shutil
shutil.rmtree(file_path)
else:
file_path.unlink()
logger.info(f"已删除文件: {file_path}")
return True
except OSError as e:
logger.error(f"删除文件失败: {e}")
return False
# === 内部方法 ===
def _parse_task(self, data: dict) -> DownloadTask:
"""解析任务数据"""
# 从 files 中提取文件名
name = "未知文件"
if data.get("files"):
path = data["files"][0].get("path", "")
if path:
name = path.split("/")[-1]
elif data["files"][0].get("uris"):
uris = data["files"][0]["uris"]
if uris:
uri = uris[0].get("uri", "")
name = uri.split("/")[-1].split("?")[0] or uri[:30]
return DownloadTask(
gid=data.get("gid", ""),
status=data.get("status", "unknown"),
name=name[:40] if len(name) > 40 else name, # 截断文件名
total_length=int(data.get("totalLength", 0)),
completed_length=int(data.get("completedLength", 0)),
download_speed=int(data.get("downloadSpeed", 0)),
upload_speed=int(data.get("uploadSpeed", 0)),
error_message=data.get("errorMessage", ""),
dir=data.get("dir", ""),
)

View File

@@ -20,6 +20,7 @@ from src.core.exceptions import (
ConfigError,
ServiceError,
NotInstalledError,
RpcError,
)
from src.core.config import Aria2Config, BotConfig
from src.core.system import (
@@ -49,6 +50,7 @@ __all__ = [
"ConfigError",
"ServiceError",
"NotInstalledError",
"RpcError",
"Aria2Config",
"BotConfig",
"detect_os",

View File

@@ -27,3 +27,7 @@ class ServiceError(Aria2Error):
class NotInstalledError(Aria2Error):
"""aria2 未安装"""
class RpcError(Aria2Error):
"""RPC 调用失败"""

View File

@@ -13,6 +13,11 @@ from src.utils import setup_logger
# Bot 命令列表,用于 Telegram 命令自动补全
BOT_COMMANDS = [
# 下载管理
BotCommand("add", "添加下载任务"),
BotCommand("list", "查看下载列表"),
BotCommand("stats", "全局下载统计"),
# 服务管理
BotCommand("install", "安装 aria2"),
BotCommand("uninstall", "卸载 aria2"),
BotCommand("start", "启动 aria2 服务"),

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from telegram import Update
from telegram.ext import ContextTypes, CommandHandler
from telegram.ext import ContextTypes, CommandHandler, CallbackQueryHandler, MessageHandler, filters
from src.utils.logger import get_logger
@@ -13,12 +13,23 @@ from src.core import (
ServiceError,
DownloadError,
ConfigError,
RpcError,
is_aria2_installed,
get_aria2_version,
generate_rpc_secret,
ARIA2_CONF,
)
from src.aria2 import Aria2Installer, Aria2ServiceManager
from src.aria2.rpc import Aria2RpcClient, DownloadTask, _format_size
from src.telegram.keyboards import (
STATUS_EMOJI,
build_list_type_keyboard,
build_task_keyboard,
build_task_list_keyboard,
build_delete_confirm_keyboard,
build_detail_keyboard,
build_after_add_keyboard,
)
logger = get_logger("handlers")
@@ -36,6 +47,15 @@ class Aria2BotAPI:
self.config = config or Aria2Config()
self.installer = Aria2Installer(self.config)
self.service = Aria2ServiceManager()
self._rpc: Aria2RpcClient | None = None
def _get_rpc_client(self) -> Aria2RpcClient:
"""获取或创建 RPC 客户端"""
if self._rpc is None:
secret = self._get_rpc_secret()
port = self._get_rpc_port() or 6800
self._rpc = Aria2RpcClient(port=port, secret=secret)
return self._rpc
async def _reply(self, update: Update, context: ContextTypes.DEFAULT_TYPE, text: str, **kwargs):
if update.effective_message:
@@ -285,6 +305,7 @@ class Aria2BotAPI:
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /help 命令 - {_get_user_info(update)}")
commands = [
"*服务管理*",
"/install - 安装 aria2",
"/uninstall - 卸载 aria2",
"/start - 启动 aria2 服务",
@@ -292,17 +313,322 @@ class Aria2BotAPI:
"/restart - 重启 aria2 服务",
"/status - 查看 aria2 状态",
"/logs - 查看最近日志",
"/clear_logs - 清空日志",
"/set_secret <密钥> - 设置自定义 RPC 密钥",
"/reset_secret - 重新生成随机 RPC 密钥",
"/clear\\_logs - 清空日志",
"/set\\_secret <密钥> - 设置 RPC 密钥",
"/reset\\_secret - 重新生成 RPC 密钥",
"",
"*下载管理*",
"/add <URL> - 添加下载任务",
"/list - 查看下载列表",
"/stats - 全局下载统计",
"",
"/help - 显示此帮助",
]
await self._reply(update, context, "可用命令:\n" + "\n".join(commands))
await self._reply(update, context, "可用命令:\n" + "\n".join(commands), parse_mode="Markdown")
# === 下载管理命令 ===
async def add_download(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/add <url> - 添加下载任务"""
logger.info(f"收到 /add 命令 - {_get_user_info(update)}")
if not context.args:
await self._reply(update, context, "用法: /add <URL>\n支持 HTTP/HTTPS/磁力链接")
return
url = context.args[0]
try:
rpc = self._get_rpc_client()
gid = await rpc.add_uri(url)
task = await rpc.get_status(gid)
# 转义文件名中的 Markdown 特殊字符
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
text = f"✅ 任务已添加\n📄 {safe_name}\n🆔 GID: `{gid}`"
keyboard = build_after_add_keyboard(gid)
await self._reply(update, context, text, parse_mode="Markdown", reply_markup=keyboard)
logger.info(f"/add 命令执行成功, GID={gid} - {_get_user_info(update)}")
except RpcError as e:
logger.error(f"/add 命令执行失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 添加失败: {e}")
async def handle_torrent(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理用户发送的种子文件"""
logger.info(f"收到种子文件 - {_get_user_info(update)}")
document = update.message.document
if not document or not document.file_name.endswith(".torrent"):
return
try:
file = await context.bot.get_file(document.file_id)
torrent_data = await file.download_as_bytearray()
rpc = self._get_rpc_client()
gid = await rpc.add_torrent(bytes(torrent_data))
task = await rpc.get_status(gid)
# 转义文件名中的 Markdown 特殊字符
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
text = f"✅ 种子任务已添加\n📄 {safe_name}\n🆔 GID: `{gid}`"
keyboard = build_after_add_keyboard(gid)
await self._reply(update, context, text, parse_mode="Markdown", reply_markup=keyboard)
logger.info(f"种子任务添加成功, GID={gid} - {_get_user_info(update)}")
except RpcError as e:
logger.error(f"种子任务添加失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 添加种子失败: {e}")
async def list_downloads(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/list - 查看下载列表"""
logger.info(f"收到 /list 命令 - {_get_user_info(update)}")
try:
rpc = self._get_rpc_client()
stat = await rpc.get_global_stat()
active_count = int(stat.get("numActive", 0))
waiting_count = int(stat.get("numWaiting", 0))
stopped_count = int(stat.get("numStopped", 0))
keyboard = build_list_type_keyboard(active_count, waiting_count, stopped_count)
await self._reply(update, context, "📥 选择查看类型:", reply_markup=keyboard)
except RpcError as e:
logger.error(f"/list 命令执行失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 获取列表失败: {e}")
async def global_stats(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/stats - 全局下载统计"""
logger.info(f"收到 /stats 命令 - {_get_user_info(update)}")
try:
rpc = self._get_rpc_client()
stat = await rpc.get_global_stat()
text = (
"📊 *全局统计*\n"
f"⬇️ 下载速度: {_format_size(int(stat.get('downloadSpeed', 0)))}/s\n"
f"⬆️ 上传速度: {_format_size(int(stat.get('uploadSpeed', 0)))}/s\n"
f"▶️ 活动任务: {stat.get('numActive', 0)}\n"
f"⏳ 等待任务: {stat.get('numWaiting', 0)}\n"
f"⏹️ 已停止: {stat.get('numStopped', 0)}"
)
await self._reply(update, context, text, parse_mode="Markdown")
except RpcError as e:
logger.error(f"/stats 命令执行失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 获取统计失败: {e}")
# === Callback Query 处理 ===
async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 Inline Keyboard 回调"""
query = update.callback_query
try:
await query.answer()
except Exception as e:
logger.warning(f"回调应答失败 (可忽略): {e}")
data = query.data
if not data:
return
parts = data.split(":")
action = parts[0]
try:
rpc = self._get_rpc_client()
if action == "list":
await self._handle_list_callback(query, rpc, parts)
elif action == "pause":
await self._handle_pause_callback(query, rpc, parts[1])
elif action == "resume":
await self._handle_resume_callback(query, rpc, parts[1])
elif action == "delete":
await self._handle_delete_callback(query, parts[1])
elif action == "confirm_del":
await self._handle_confirm_delete_callback(query, rpc, parts[1], parts[2])
elif action == "detail":
await self._handle_detail_callback(query, rpc, parts[1])
elif action == "stats":
await self._handle_stats_callback(query, rpc)
elif action == "cancel":
await query.edit_message_text("❌ 操作已取消")
except RpcError as e:
await query.edit_message_text(f"❌ 操作失败: {e}")
async def _handle_list_callback(self, query, rpc: Aria2RpcClient, parts: list) -> None:
"""处理列表相关回调"""
if parts[1] == "menu":
stat = await rpc.get_global_stat()
keyboard = build_list_type_keyboard(
int(stat.get("numActive", 0)),
int(stat.get("numWaiting", 0)),
int(stat.get("numStopped", 0)),
)
await query.edit_message_text("📥 选择查看类型:", reply_markup=keyboard)
return
list_type = parts[1]
page = int(parts[2]) if len(parts) > 2 else 1
if list_type == "active":
tasks = await rpc.get_active()
title = "▶️ 活动任务"
elif list_type == "waiting":
tasks = await rpc.get_waiting()
title = "⏳ 等待任务"
else: # stopped
tasks = await rpc.get_stopped()
title = "✅ 已完成/错误"
await self._send_task_list(query, tasks, page, list_type, title)
async def _send_task_list(self, query, tasks: list[DownloadTask], page: int, list_type: str, title: str) -> None:
"""发送任务列表"""
page_size = 5
total_pages = max(1, (len(tasks) + page_size - 1) // page_size)
start = (page - 1) * page_size
page_tasks = tasks[start:start + page_size]
if not tasks:
keyboard = build_task_list_keyboard(1, 1, list_type)
await query.edit_message_text(f"{title}\n\n📭 暂无任务", reply_markup=keyboard)
return
lines = [f"{title} ({page}/{total_pages})\n"]
for t in page_tasks:
emoji = STATUS_EMOJI.get(t.status, "")
lines.append(f"{emoji} {t.name}")
lines.append(f" {t.progress_bar} {t.progress:.1f}%")
lines.append(f" {t.size_str} | {t.speed_str}")
# 添加操作按钮提示
if t.status == "active":
lines.append(f" ⏸ /pause\\_{t.gid[:8]}")
elif t.status in ("paused", "waiting"):
lines.append(f" ▶️ /resume\\_{t.gid[:8]}")
lines.append(f" 📋 详情: 点击下方按钮\n")
# 为每个任务添加操作按钮
task_buttons = []
for t in page_tasks:
row = []
if t.status == "active":
row.append({"text": f"{t.gid[:6]}", "callback_data": f"pause:{t.gid}"})
elif t.status in ("paused", "waiting"):
row.append({"text": f"▶️ {t.gid[:6]}", "callback_data": f"resume:{t.gid}"})
row.append({"text": f"🗑 {t.gid[:6]}", "callback_data": f"delete:{t.gid}"})
row.append({"text": f"📋 {t.gid[:6]}", "callback_data": f"detail:{t.gid}"})
task_buttons.append(row)
# 构建完整键盘
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
keyboard_rows = []
for row in task_buttons:
keyboard_rows.append([InlineKeyboardButton(b["text"], callback_data=b["callback_data"]) for b in row])
# 添加翻页按钮
nav_buttons = []
if page > 1:
nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"list:{list_type}:{page - 1}"))
if page < total_pages:
nav_buttons.append(InlineKeyboardButton("➡️ 下一页", callback_data=f"list:{list_type}:{page + 1}"))
if nav_buttons:
keyboard_rows.append(nav_buttons)
keyboard_rows.append([InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")])
await query.edit_message_text("\n".join(lines), reply_markup=InlineKeyboardMarkup(keyboard_rows))
async def _handle_pause_callback(self, query, rpc: Aria2RpcClient, gid: str) -> None:
"""处理暂停回调"""
await rpc.pause(gid)
task = await rpc.get_status(gid)
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
keyboard = build_task_keyboard(gid, task.status)
await query.edit_message_text(f"⏸️ 任务已暂停\n📄 {safe_name}\n🆔 GID: `{gid}`",
parse_mode="Markdown", reply_markup=keyboard)
async def _handle_resume_callback(self, query, rpc: Aria2RpcClient, gid: str) -> None:
"""处理恢复回调"""
await rpc.unpause(gid)
task = await rpc.get_status(gid)
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
keyboard = build_task_keyboard(gid, task.status)
await query.edit_message_text(f"▶️ 任务已恢复\n📄 {safe_name}\n🆔 GID: `{gid}`",
parse_mode="Markdown", reply_markup=keyboard)
async def _handle_delete_callback(self, query, gid: str) -> None:
"""处理删除确认回调"""
keyboard = build_delete_confirm_keyboard(gid)
await query.edit_message_text(f"⚠️ 确认删除任务?\n🆔 GID: `{gid}`",
parse_mode="Markdown", reply_markup=keyboard)
async def _handle_confirm_delete_callback(self, query, rpc: Aria2RpcClient, gid: str, delete_file: str) -> None:
"""处理确认删除回调"""
task = None
try:
task = await rpc.get_status(gid)
except RpcError:
pass
# 尝试删除任务
try:
await rpc.remove(gid)
except RpcError:
try:
await rpc.force_remove(gid)
except RpcError:
pass
try:
await rpc.remove_download_result(gid)
except RpcError:
pass
# 如果需要删除文件
file_deleted = False
if delete_file == "1" and task:
file_deleted = rpc.delete_files(task)
msg = f"🗑️ 任务已删除\n🆔 GID: `{gid}`"
if delete_file == "1":
msg += f"\n📁 文件: {'已删除' if file_deleted else '删除失败或不存在'}"
await query.edit_message_text(msg, parse_mode="Markdown")
async def _handle_detail_callback(self, query, rpc: Aria2RpcClient, gid: str) -> None:
"""处理详情回调"""
task = await rpc.get_status(gid)
emoji = STATUS_EMOJI.get(task.status, "")
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
text = (
f"📋 *任务详情*\n"
f"📄 文件: {safe_name}\n"
f"🆔 GID: `{task.gid}`\n"
f"📊 状态: {emoji} {task.status}\n"
f"📈 进度: {task.progress_bar} {task.progress:.1f}%\n"
f"📦 大小: {task.size_str}\n"
f"⬇️ 下载: {task.speed_str}\n"
f"⬆️ 上传: {_format_size(task.upload_speed)}/s"
)
if task.error_message:
text += f"\n❌ 错误: {task.error_message}"
keyboard = build_detail_keyboard(gid, task.status)
await query.edit_message_text(text, parse_mode="Markdown", reply_markup=keyboard)
async def _handle_stats_callback(self, query, rpc: Aria2RpcClient) -> None:
"""处理统计回调"""
stat = await rpc.get_global_stat()
text = (
"📊 *全局统计*\n"
f"⬇️ 下载速度: {_format_size(int(stat.get('downloadSpeed', 0)))}/s\n"
f"⬆️ 上传速度: {_format_size(int(stat.get('uploadSpeed', 0)))}/s\n"
f"▶️ 活动任务: {stat.get('numActive', 0)}\n"
f"⏳ 等待任务: {stat.get('numWaiting', 0)}\n"
f"⏹️ 已停止: {stat.get('numStopped', 0)}"
)
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")]])
await query.edit_message_text(text, parse_mode="Markdown", reply_markup=keyboard)
def build_handlers(api: Aria2BotAPI) -> list[CommandHandler]:
"""构建 CommandHandler 列表"""
def build_handlers(api: Aria2BotAPI) -> list:
"""构建 Handler 列表"""
return [
# 服务管理命令
CommandHandler("install", api.install),
CommandHandler("uninstall", api.uninstall),
CommandHandler("start", api.start_service),
@@ -314,4 +640,12 @@ def build_handlers(api: Aria2BotAPI) -> list[CommandHandler]:
CommandHandler("set_secret", api.set_secret),
CommandHandler("reset_secret", api.reset_secret),
CommandHandler("help", api.help_command),
# 下载管理命令
CommandHandler("add", api.add_download),
CommandHandler("list", api.list_downloads),
CommandHandler("stats", api.global_stats),
# 种子文件处理
MessageHandler(filters.Document.FileExtension("torrent"), api.handle_torrent),
# Callback Query 处理
CallbackQueryHandler(api.handle_callback),
]

103
src/telegram/keyboards.py Normal file
View File

@@ -0,0 +1,103 @@
"""Telegram 键盘构建工具"""
from __future__ import annotations
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
# 状态 emoji 映射
STATUS_EMOJI = {
"active": "⬇️",
"waiting": "",
"paused": "⏸️",
"complete": "",
"error": "",
"removed": "🗑️",
}
def build_list_type_keyboard(active_count: int, waiting_count: int, stopped_count: int) -> InlineKeyboardMarkup:
"""构建列表类型选择键盘"""
return InlineKeyboardMarkup([
[
InlineKeyboardButton(f"▶️ 活动 ({active_count})", callback_data="list:active:1"),
InlineKeyboardButton(f"⏳ 等待 ({waiting_count})", callback_data="list:waiting:1"),
],
[
InlineKeyboardButton(f"✅ 已完成 ({stopped_count})", callback_data="list:stopped:1"),
InlineKeyboardButton("📊 统计", callback_data="stats"),
],
])
def build_task_keyboard(gid: str, status: str) -> InlineKeyboardMarkup:
"""构建单个任务的操作按钮"""
buttons = []
if status == "active":
buttons.append(InlineKeyboardButton("⏸ 暂停", callback_data=f"pause:{gid}"))
elif status in ("paused", "waiting"):
buttons.append(InlineKeyboardButton("▶️ 恢复", callback_data=f"resume:{gid}"))
buttons.append(InlineKeyboardButton("🗑 删除", callback_data=f"delete:{gid}"))
buttons.append(InlineKeyboardButton("📋 详情", callback_data=f"detail:{gid}"))
return InlineKeyboardMarkup([buttons])
def build_task_list_keyboard(page: int, total_pages: int, list_type: str) -> InlineKeyboardMarkup | None:
"""构建任务列表的翻页按钮"""
nav_buttons = []
if page > 1:
nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"list:{list_type}:{page - 1}"))
if page < total_pages:
nav_buttons.append(InlineKeyboardButton("➡️ 下一页", callback_data=f"list:{list_type}:{page + 1}"))
# 返回按钮
back_button = [InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")]
rows = []
if nav_buttons:
rows.append(nav_buttons)
rows.append(back_button)
return InlineKeyboardMarkup(rows)
def build_delete_confirm_keyboard(gid: str) -> InlineKeyboardMarkup:
"""构建删除确认按钮(含是否删除文件选项)"""
return InlineKeyboardMarkup([
[
InlineKeyboardButton("✅ 仅删任务", callback_data=f"confirm_del:{gid}:0"),
InlineKeyboardButton("🗑 删任务+文件", callback_data=f"confirm_del:{gid}:1"),
],
[
InlineKeyboardButton("❌ 取消", callback_data="cancel"),
],
])
def build_detail_keyboard(gid: str, status: str) -> InlineKeyboardMarkup:
"""构建详情页面的操作按钮"""
buttons = []
if status == "active":
buttons.append(InlineKeyboardButton("⏸ 暂停", callback_data=f"pause:{gid}"))
elif status in ("paused", "waiting"):
buttons.append(InlineKeyboardButton("▶️ 恢复", callback_data=f"resume:{gid}"))
buttons.append(InlineKeyboardButton("🗑 删除", callback_data=f"delete:{gid}"))
return InlineKeyboardMarkup([
buttons,
[InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")],
])
def build_after_add_keyboard(gid: str) -> InlineKeyboardMarkup:
"""构建添加任务后的操作按钮"""
return InlineKeyboardMarkup([
[
InlineKeyboardButton("📋 查看详情", callback_data=f"detail:{gid}"),
InlineKeyboardButton("📥 查看列表", callback_data="list:menu"),
],
])

2
uv.lock generated
View File

@@ -19,12 +19,14 @@ name = "aria2bot"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "httpx" },
{ name = "python-dotenv" },
{ name = "python-telegram-bot" },
]
[package.metadata]
requires-dist = [
{ name = "httpx", specifier = ">=0.27.0" },
{ name = "python-dotenv", specifier = ">=1.2.1" },
{ name = "python-telegram-bot", specifier = ">=21.0" },
]