7 Commits
v1.0 ... v1.3

Author SHA1 Message Date
dnslin
1c7c102638 refactor: 重构 handlers 模块为独立子模块 + 支持直接发送链接下载
- 将 handlers.py 拆分为多个独立模块:
  - base.py: 基础类和工具函数
  - service.py: 服务管理命令
  - download.py: 下载管理命令
  - callbacks.py: 回调处理
  - cloud_*.py: 云存储相关功能
  - app_ref.py: bot 实例引用管理

- 新增功能:支持直接发送链接触发下载
  - 直接发送 HTTP/HTTPS 链接自动添加下载
  - 直接发送磁力链接自动添加下载
  - 支持一条消息包含多个链接

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-15 17:28:48 +08:00
dnslin
d6f77d52a0 fix: 修复 多个存储策略的时候 自动删除冲突 2025-12-15 16:55:21 +08:00
dnslin
0d3c4dd05a fix: 调整版本 2025-12-15 16:30:42 +08:00
dnslin
64fa02d0b6 fix: 修复文件名过长的错误 2025-12-15 16:21:50 +08:00
dnslin
64b3740bb4 fix: 调整镜像版本 2025-12-15 16:16:37 +08:00
dnslin
b5983f57bd feat: 移除默认的aria2 2025-12-15 16:10:00 +08:00
dnslin
8d7b2667e1 style: 优化下docker-compose 2025-12-15 16:01:00 +08:00
19 changed files with 2463 additions and 1508 deletions

View File

@@ -18,11 +18,9 @@ FROM python:3.13-slim
LABEL maintainer="dnslin" LABEL maintainer="dnslin"
LABEL description="Aria2 Telegram Bot - 通过 Telegram 控制 aria2 下载" LABEL description="Aria2 Telegram Bot - 通过 Telegram 控制 aria2 下载"
# 安装 aria2 和必要工具 # 安装必要工具(aria2 通过 /install 命令下载)
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y --no-install-recommends \ apt-get install -y --no-install-recommends ca-certificates \
aria2 \
ca-certificates \
&& rm -rf /var/lib/apt/lists/* \ && rm -rf /var/lib/apt/lists/* \
&& apt-get clean && apt-get clean
@@ -31,13 +29,13 @@ WORKDIR /app
# 从构建阶段复制虚拟环境 # 从构建阶段复制虚拟环境
COPY --from=builder /app/.venv /app/.venv COPY --from=builder /app/.venv /app/.venv
# 复制应用代码 # 复制应用代码和入口脚本
COPY src/ ./src/ COPY src/ ./src/
COPY main.py banner.txt ./ COPY main.py banner.txt docker-entrypoint.sh ./
RUN chmod +x docker-entrypoint.sh
# 创建必要目录和符号链接 # 创建必要目录
RUN mkdir -p /root/.local/bin /root/.config/aria2 /root/downloads && \ RUN mkdir -p /root/.local/bin /root/.config/aria2 /root/downloads
ln -s /usr/bin/aria2c /root/.local/bin/aria2c
# 设置环境变量 # 设置环境变量
ENV PATH="/app/.venv/bin:$PATH" ENV PATH="/app/.venv/bin:$PATH"
@@ -51,4 +49,4 @@ VOLUME ["/root/downloads", "/root/.config/aria2"]
EXPOSE 6800 EXPOSE 6800
# 启动命令 # 启动命令
CMD ["python", "main.py"] ENTRYPOINT ["./docker-entrypoint.sh"]

View File

@@ -79,7 +79,7 @@ uv run main.py
| 变量 | 说明 | | 变量 | 说明 |
| -------------------------------------- | -------------------------- | | -------------------------------------- | -------------------------- |
| `TELEGRAM_CHANNEL_ENABLED` | 启用频道存储true/false | | `TELEGRAM_CHANNEL_ENABLED` | 启用频道存储true/false |
| `TELEGRAM_CHANNEL_ID` | 频道 ID @username | | `TELEGRAM_CHANNEL_ID` | 频道 ID |
| `TELEGRAM_CHANNEL_AUTO_UPLOAD` | 下载完成后自动发送 | | `TELEGRAM_CHANNEL_AUTO_UPLOAD` | 下载完成后自动发送 |
| `TELEGRAM_CHANNEL_DELETE_AFTER_UPLOAD` | 发送后删除本地文件 | | `TELEGRAM_CHANNEL_DELETE_AFTER_UPLOAD` | 发送后删除本地文件 |

View File

@@ -1,7 +1,9 @@
services: services:
aria2bot: aria2bot:
build: . # 方式一:使用预构建镜像(推荐)
image: dnslin/aria2bot:latest image: dnslin/aria2bot:latest
# 方式二:本地构建(取消下行注释,注释上行)
# build: .
container_name: aria2bot container_name: aria2bot
restart: unless-stopped restart: unless-stopped
volumes: volumes:
@@ -14,3 +16,4 @@ services:
# 可选:暴露 aria2 RPC 端口(如需外部访问) # 可选:暴露 aria2 RPC 端口(如需外部访问)
# ports: # ports:
# - "6800:6800" # - "6800:6800"

8
docker-entrypoint.sh Normal file
View File

@@ -0,0 +1,8 @@
#!/bin/bash
# Docker 容器入口脚本
# 创建必要目录
mkdir -p /root/.local/bin /root/.config/aria2 /root/downloads
# 启动应用
exec python main.py

View File

@@ -257,10 +257,15 @@ class Aria2Installer:
try: try:
if ARIA2_CONFIG_DIR.exists(): if ARIA2_CONFIG_DIR.exists():
shutil.rmtree(ARIA2_CONFIG_DIR) # 删除配置目录中的文件,而不是整个目录(兼容 Docker 挂载卷)
logger.info(f"已删除配置目录: {ARIA2_CONFIG_DIR}") for item in ARIA2_CONFIG_DIR.iterdir():
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
logger.info(f"已清空配置目录: {ARIA2_CONFIG_DIR}")
except Exception as exc: # noqa: PERF203 except Exception as exc: # noqa: PERF203
logger.error(f"删除配置目录失败: {exc}") logger.error(f"清空配置目录失败: {exc}")
errors.append(exc) errors.append(exc)
try: try:

View File

@@ -225,7 +225,7 @@ class Aria2RpcClient:
return DownloadTask( return DownloadTask(
gid=data.get("gid", ""), gid=data.get("gid", ""),
status=data.get("status", "unknown"), status=data.get("status", "unknown"),
name=name[:40] if len(name) > 40 else name, # 截断文件名 name=name, # 保留完整文件名,显示时再截断
total_length=int(data.get("totalLength", 0)), total_length=int(data.get("totalLength", 0)),
completed_length=int(data.get("completedLength", 0)), completed_length=int(data.get("completedLength", 0)),
download_speed=int(data.get("downloadSpeed", 0)), download_speed=int(data.get("downloadSpeed", 0)),

View File

@@ -1,12 +1,16 @@
"""Configuration dataclass for aria2bot.""" """Configuration dataclass for aria2bot."""
from __future__ import annotations from __future__ import annotations
import json
import os import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from src.core.constants import DOWNLOAD_DIR from src.core.constants import DOWNLOAD_DIR
# 云存储配置持久化文件路径
CLOUD_CONFIG_FILE = Path.home() / ".config" / "aria2bot" / "cloud_config.json"
@dataclass @dataclass
class Aria2Config: class Aria2Config:
@@ -111,3 +115,78 @@ class BotConfig:
onedrive=onedrive, onedrive=onedrive,
telegram_channel=telegram_channel, telegram_channel=telegram_channel,
) )
def save_cloud_config(onedrive: OneDriveConfig, telegram: TelegramChannelConfig) -> bool:
"""保存云存储配置到文件
Args:
onedrive: OneDrive 配置
telegram: Telegram 频道配置
Returns:
是否保存成功
"""
try:
CLOUD_CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
data = {
"onedrive": {
"auto_upload": onedrive.auto_upload,
"delete_after_upload": onedrive.delete_after_upload,
"remote_path": onedrive.remote_path,
},
"telegram_channel": {
"channel_id": telegram.channel_id,
"auto_upload": telegram.auto_upload,
"delete_after_upload": telegram.delete_after_upload,
}
}
CLOUD_CONFIG_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False))
return True
except Exception:
return False
def load_cloud_config() -> dict | None:
"""从文件加载云存储配置
Returns:
配置字典,如果文件不存在或解析失败则返回 None
"""
try:
if CLOUD_CONFIG_FILE.exists():
return json.loads(CLOUD_CONFIG_FILE.read_text())
except Exception:
pass
return None
def apply_saved_config(onedrive: OneDriveConfig, telegram: TelegramChannelConfig) -> None:
"""将保存的配置应用到配置对象(文件配置优先级低于环境变量)
只有当环境变量未设置时,才使用文件中的配置
"""
saved = load_cloud_config()
if not saved:
return
# 应用 OneDrive 配置(仅当环境变量未明确设置时)
if "onedrive" in saved:
od = saved["onedrive"]
# auto_upload: 如果环境变量未设置,使用文件配置
if not os.environ.get("ONEDRIVE_AUTO_UPLOAD"):
onedrive.auto_upload = od.get("auto_upload", False)
if not os.environ.get("ONEDRIVE_DELETE_AFTER_UPLOAD"):
onedrive.delete_after_upload = od.get("delete_after_upload", False)
if not os.environ.get("ONEDRIVE_REMOTE_PATH"):
onedrive.remote_path = od.get("remote_path", "/aria2bot")
# 应用 Telegram 频道配置
if "telegram_channel" in saved:
tg = saved["telegram_channel"]
if not os.environ.get("TELEGRAM_CHANNEL_ID"):
telegram.channel_id = tg.get("channel_id", "")
if not os.environ.get("TELEGRAM_CHANNEL_AUTO_UPLOAD"):
telegram.auto_upload = tg.get("auto_upload", False)
if not os.environ.get("TELEGRAM_CHANNEL_DELETE_AFTER_UPLOAD"):
telegram.delete_after_upload = tg.get("delete_after_upload", False)

View File

@@ -7,11 +7,13 @@ from telegram import Bot, BotCommand
from telegram.ext import Application from telegram.ext import Application
from src.core import BotConfig, is_aria2_installed from src.core import BotConfig, is_aria2_installed
from src.core.config import apply_saved_config
from src.aria2.service import Aria2ServiceManager, get_service_mode from src.aria2.service import Aria2ServiceManager, get_service_mode
from src.telegram.handlers import Aria2BotAPI, build_handlers from src.telegram.handlers import Aria2BotAPI, build_handlers
from src.telegram.handlers.app_ref import set_bot_instance, get_bot_instance
from src.utils import setup_logger from src.utils import setup_logger
# 全局 bot 实例,用于自动上传等功能发送消息 # 全局 bot 实例,用于自动上传等功能发送消息(保留兼容性)
_bot_instance: Bot | None = None _bot_instance: Bot | None = None
# Bot 命令列表,用于 Telegram 命令自动补全 # Bot 命令列表,用于 Telegram 命令自动补全
@@ -42,10 +44,15 @@ async def post_init(application: Application) -> None:
logger.info("Setting bot commands...") logger.info("Setting bot commands...")
await application.bot.set_my_commands(BOT_COMMANDS) await application.bot.set_my_commands(BOT_COMMANDS)
_bot_instance = application.bot _bot_instance = application.bot
# 设置到 app_ref 模块,供 handlers 使用
set_bot_instance(application.bot)
def create_app(config: BotConfig) -> Application: def create_app(config: BotConfig) -> Application:
"""创建 Telegram Application""" """创建 Telegram Application"""
# 应用保存的云存储配置
apply_saved_config(config.onedrive, config.telegram_channel)
builder = Application.builder().token(config.token).post_init(post_init) builder = Application.builder().token(config.token).post_init(post_init)
if config.api_base_url: if config.api_base_url:
builder = builder.base_url(config.api_base_url).base_file_url(config.api_base_url + "/file") builder = builder.base_url(config.api_base_url).base_file_url(config.api_base_url + "/file")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,115 @@
"""Telegram bot handlers 模块。"""
from __future__ import annotations
from functools import wraps
from telegram import Update
from telegram.ext import (
ContextTypes,
CommandHandler,
CallbackQueryHandler,
MessageHandler,
filters,
)
from .base import Aria2BotAPIBase, BUTTON_COMMANDS, _get_user_info, _validate_download_url
from .service import ServiceHandlersMixin
from .download import DownloadHandlersMixin
from .cloud_onedrive import OneDriveHandlersMixin
from .cloud_channel import TelegramChannelHandlersMixin
from .cloud_coordinator import CloudCoordinatorMixin
from .callbacks import CallbackHandlersMixin
class Aria2BotAPI(
CallbackHandlersMixin,
CloudCoordinatorMixin,
TelegramChannelHandlersMixin,
OneDriveHandlersMixin,
DownloadHandlersMixin,
ServiceHandlersMixin,
Aria2BotAPIBase,
):
"""Aria2 Bot API - 组合所有功能"""
pass
def build_handlers(api: Aria2BotAPI) -> list:
"""构建 Handler 列表"""
def wrap_with_permission(handler_func):
"""包装处理函数,添加权限检查"""
@wraps(handler_func)
async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await api._check_permission(update, context):
return
return await handler_func(update, context)
return wrapped
# 构建按钮文本过滤器
button_pattern = (
"^("
+ "|".join(BUTTON_COMMANDS.keys()).replace("▶️", "▶️").replace("", "")
+ ")$"
)
return [
# 服务管理命令
CommandHandler("install", wrap_with_permission(api.install)),
CommandHandler("uninstall", wrap_with_permission(api.uninstall)),
CommandHandler("start", wrap_with_permission(api.start_service)),
CommandHandler("stop", wrap_with_permission(api.stop_service)),
CommandHandler("restart", wrap_with_permission(api.restart_service)),
CommandHandler("status", wrap_with_permission(api.status)),
CommandHandler("logs", wrap_with_permission(api.view_logs)),
CommandHandler("clear_logs", wrap_with_permission(api.clear_logs)),
CommandHandler("set_secret", wrap_with_permission(api.set_secret)),
CommandHandler("reset_secret", wrap_with_permission(api.reset_secret)),
CommandHandler("help", wrap_with_permission(api.help_command)),
CommandHandler("menu", wrap_with_permission(api.menu_command)),
# 下载管理命令
CommandHandler("add", wrap_with_permission(api.add_download)),
CommandHandler("list", wrap_with_permission(api.list_downloads)),
CommandHandler("stats", wrap_with_permission(api.global_stats)),
# 云存储命令
CommandHandler("cloud", wrap_with_permission(api.cloud_command)),
# Reply Keyboard 按钮文本处理也处理频道ID输入
MessageHandler(
filters.TEXT & filters.Regex(button_pattern),
wrap_with_permission(api.handle_text_message),
),
# 频道ID输入处理捕获 @channel 或 -100xxx 格式)
MessageHandler(
filters.TEXT & filters.Regex(r"^(@[\w]+|-?\d+)$"),
wrap_with_permission(api.handle_channel_id_input),
),
# OneDrive 认证回调 URL 处理
MessageHandler(
filters.TEXT & filters.Regex(r"^https://login\.microsoftonline\.com"),
wrap_with_permission(api.handle_auth_callback),
),
# 种子文件处理
MessageHandler(
filters.Document.FileExtension("torrent"),
wrap_with_permission(api.handle_torrent),
),
# 直接发送链接/磁力链接处理(放在最后,避免拦截其他文本消息)
MessageHandler(
filters.TEXT & ~filters.COMMAND & filters.Regex(r'(https?://|magnet:\?)'),
wrap_with_permission(api.handle_url_message),
),
# Callback Query 处理
CallbackQueryHandler(wrap_with_permission(api.handle_callback)),
]
__all__ = [
"Aria2BotAPI",
"build_handlers",
"BUTTON_COMMANDS",
"_get_user_info",
"_validate_download_url",
]

View File

@@ -0,0 +1,21 @@
"""Bot 实例引用,用于避免循环导入。"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from telegram import Bot
# 全局 bot 实例引用
_bot_instance: "Bot | None" = None
def set_bot_instance(bot: "Bot") -> None:
"""设置 bot 实例"""
global _bot_instance
_bot_instance = bot
def get_bot_instance() -> "Bot | None":
"""获取 bot 实例"""
return _bot_instance

View File

@@ -0,0 +1,220 @@
"""Telegram bot 基础类和工具方法。"""
from __future__ import annotations
import asyncio
from urllib.parse import urlparse
from telegram import Update
from telegram.ext import ContextTypes
from src.utils.logger import get_logger
from src.core import (
Aria2Config,
ARIA2_CONF,
)
from src.core.config import OneDriveConfig, TelegramChannelConfig, save_cloud_config
from src.aria2 import Aria2Installer, Aria2ServiceManager
from src.aria2.rpc import Aria2RpcClient
# Reply Keyboard 按钮文本到命令的映射
BUTTON_COMMANDS = {
"📥 下载列表": "list",
"📊 统计": "stats",
"▶️ 启动": "start",
"⏹ 停止": "stop",
"🔄 重启": "restart",
"📋 状态": "status",
"📜 日志": "logs",
"❓ 帮助": "help",
}
logger = get_logger("handlers")
def _get_user_info(update: Update) -> str:
"""获取用户信息用于日志"""
user = update.effective_user
if user:
return f"用户ID={user.id}, 用户名={user.username or 'N/A'}"
return "未知用户"
def _validate_download_url(url: str) -> tuple[bool, str]:
"""验证下载 URL 的有效性,防止恶意输入"""
# 检查 URL 长度
if len(url) > 2048:
return False, "URL 过长(最大 2048 字符)"
# 磁力链接直接通过
if url.startswith("magnet:"):
return True, ""
# 验证 HTTP/HTTPS URL
try:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False, f"不支持的协议: {parsed.scheme or ''},仅支持 HTTP/HTTPS/磁力链接"
if not parsed.netloc:
return False, "无效的 URL 格式"
return True, ""
except Exception:
return False, "URL 解析失败"
class Aria2BotAPIBase:
"""Aria2 Bot API 基础类,包含初始化和工具方法"""
def __init__(
self,
config: Aria2Config | None = None,
allowed_users: set[int] | None = None,
onedrive_config: OneDriveConfig | None = None,
telegram_channel_config: TelegramChannelConfig | None = None,
api_base_url: str = "",
):
self.config = config or Aria2Config()
self.allowed_users = allowed_users or set()
self.installer = Aria2Installer(self.config)
self.service = Aria2ServiceManager()
self._rpc: Aria2RpcClient | None = None
self._auto_refresh_tasks: dict[str, asyncio.Task] = {} # chat_id:msg_id -> task
self._auto_uploaded_gids: set[str] = set() # 已自动上传的任务GID防止重复上传
self._download_monitors: dict[str, asyncio.Task] = {} # gid -> 监控任务
self._notified_gids: set[str] = set() # 已通知的 GID防止重复通知
# 云存储相关
self._onedrive_config = onedrive_config
self._onedrive = None
self._pending_auth: dict[int, dict] = {} # user_id -> flow
# Telegram 频道存储
self._telegram_channel_config = telegram_channel_config
self._telegram_channel = None
self._api_base_url = api_base_url
self._channel_uploaded_gids: set[str] = set() # 已上传到频道的 GID
self._pending_channel_input: dict[int, bool] = {} # 等待用户输入频道ID
async def _check_permission(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
"""检查用户权限,返回 True 表示有权限"""
# 未配置白名单时拒绝所有用户
if not self.allowed_users:
logger.warning(f"未配置 ALLOWED_USERS拒绝访问 - {_get_user_info(update)}")
await self._reply(update, context, "⚠️ Bot 未配置允许的用户,请联系管理员")
return False
user_id = update.effective_user.id if update.effective_user else None
if user_id and user_id in self.allowed_users:
return True
logger.warning(f"未授权访问 - {_get_user_info(update)}")
await self._reply(update, context, "🚫 您没有权限使用此 Bot")
return False
def _get_rpc_client(self) -> Aria2RpcClient:
"""获取或创建 RPC 客户端"""
if self._rpc is None:
secret = self._get_rpc_secret()
port = self._get_rpc_port() or 6800
self._rpc = Aria2RpcClient(port=port, secret=secret)
return self._rpc
def _get_onedrive_client(self):
"""获取或创建 OneDrive 客户端"""
if self._onedrive is None and self._onedrive_config and self._onedrive_config.enabled:
from src.cloud.onedrive import OneDriveClient
self._onedrive = OneDriveClient(self._onedrive_config)
return self._onedrive
def _get_telegram_channel_client(self, bot):
"""获取或创建 Telegram 频道客户端"""
if (
self._telegram_channel is None
and self._telegram_channel_config
and self._telegram_channel_config.enabled
):
from src.cloud.telegram_channel import TelegramChannelClient
is_local_api = bool(self._api_base_url)
self._telegram_channel = TelegramChannelClient(
self._telegram_channel_config, bot, is_local_api
)
return self._telegram_channel
def _recreate_telegram_channel_client(self, bot):
"""重新创建 Telegram 频道客户端(配置更新后调用)"""
self._telegram_channel = None
return self._get_telegram_channel_client(bot)
async def _delete_local_file(self, local_path, gid: str) -> tuple[bool, str]:
"""删除本地文件,返回 (成功, 消息)"""
import shutil
from pathlib import Path
if isinstance(local_path, str):
local_path = Path(local_path)
try:
if local_path.is_dir():
shutil.rmtree(local_path)
else:
local_path.unlink()
logger.info(f"已删除本地文件 GID={gid}: {local_path}")
return True, "🗑️ 本地文件已删除"
except Exception as e:
logger.error(f"删除本地文件失败 GID={gid}: {e}")
return False, f"⚠️ 删除本地文件失败: {e}"
def _save_cloud_config(self) -> bool:
"""保存云存储配置"""
if self._onedrive_config and self._telegram_channel_config:
return save_cloud_config(self._onedrive_config, self._telegram_channel_config)
return False
async def _reply(
self, update: Update, context: ContextTypes.DEFAULT_TYPE, text: str, **kwargs
):
if update.effective_message:
return await update.effective_message.reply_text(text, **kwargs)
if update.effective_chat:
return await context.bot.send_message(
chat_id=update.effective_chat.id, text=text, **kwargs
)
return None
async def _delayed_delete_messages(self, messages: list, delay: int = 5) -> None:
"""延迟删除多条消息"""
try:
await asyncio.sleep(delay)
for msg in messages:
try:
await msg.delete()
except Exception as e:
logger.warning(f"删除消息失败: {e}")
logger.debug("已删除敏感认证消息")
except Exception as e:
logger.warning(f"延迟删除任务失败: {e}")
def _get_rpc_secret(self) -> str:
if self.config.rpc_secret:
return self.config.rpc_secret
if ARIA2_CONF.exists():
try:
for line in ARIA2_CONF.read_text(encoding="utf-8", errors="ignore").splitlines():
stripped = line.strip()
if stripped.startswith("rpc-secret="):
secret = stripped.split("=", 1)[1].strip()
if secret:
self.config.rpc_secret = secret
return secret
except OSError:
return ""
return ""
def _get_rpc_port(self) -> int | None:
if ARIA2_CONF.exists():
try:
for line in ARIA2_CONF.read_text(encoding="utf-8", errors="ignore").splitlines():
stripped = line.strip()
if stripped.startswith("rpc-listen-port="):
port_str = stripped.split("=", 1)[1].strip()
if port_str.isdigit():
return int(port_str)
except OSError:
return None
return self.config.rpc_port

View File

@@ -0,0 +1,723 @@
"""回调处理。"""
from __future__ import annotations
import asyncio
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ContextTypes
from src.utils.logger import get_logger
from src.core import RpcError
from src.aria2.rpc import Aria2RpcClient, DownloadTask, _format_size
from src.telegram.keyboards import (
STATUS_EMOJI,
build_list_type_keyboard,
build_delete_confirm_keyboard,
build_cloud_settings_keyboard,
build_detail_keyboard_with_upload,
build_onedrive_menu_keyboard,
build_telegram_channel_menu_keyboard,
build_telegram_channel_settings_keyboard,
build_cloud_menu_keyboard,
)
from .base import BUTTON_COMMANDS, _get_user_info
logger = get_logger("handlers.callbacks")
class CallbackHandlersMixin:
"""回调处理 Mixin"""
async def handle_text_message(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""处理文本消息包括频道ID输入和按钮点击"""
# 先检查是否是频道ID输入
if await self.handle_channel_id_input(update, context):
return
# 然后检查是否是按钮点击
await self.handle_button_text(update, context)
async def handle_button_text(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""处理 Reply Keyboard 按钮点击"""
text = update.message.text
if text not in BUTTON_COMMANDS:
return
cmd = BUTTON_COMMANDS[text]
handler_map = {
"list": self.list_downloads,
"stats": self.global_stats,
"start": self.start_service,
"stop": self.stop_service,
"restart": self.restart_service,
"status": self.status,
"logs": self.view_logs,
"help": self.help_command,
}
if cmd in handler_map:
await handler_map[cmd](update, context)
async def handle_callback(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""处理 Inline Keyboard 回调"""
query = update.callback_query
try:
await query.answer()
except Exception as e:
logger.warning(f"回调应答失败 (可忽略): {e}")
data = query.data
if not data:
return
parts = data.split(":")
if not parts:
await query.edit_message_text("❌ 无效操作")
return
action = parts[0]
# 安全检查:验证回调数据格式,防止索引越界
required_parts = {
"pause": 2,
"resume": 2,
"delete": 2,
"detail": 2,
"refresh": 2,
"confirm_del": 3,
"cancel_del": 3,
}
if action in required_parts and len(parts) < required_parts[action]:
await query.edit_message_text("❌ 无效操作")
return
# 点击非详情相关按钮时,停止该消息的自动刷新
if action not in ("detail", "refresh", "pause", "resume"):
key = f"{query.message.chat_id}:{query.message.message_id}"
self._stop_auto_refresh(key)
try:
rpc = self._get_rpc_client()
if action == "list":
await self._handle_list_callback(query, rpc, parts)
elif action == "pause":
await self._handle_pause_callback(query, rpc, parts[1])
elif action == "resume":
await self._handle_resume_callback(query, rpc, parts[1])
elif action == "delete":
await self._handle_delete_callback(query, parts[1])
elif action == "confirm_del":
await self._handle_confirm_delete_callback(query, rpc, parts[1], parts[2])
elif action == "detail":
await self._handle_detail_callback(query, rpc, parts[1])
elif action == "refresh":
await self._handle_detail_callback(query, rpc, parts[1])
elif action == "stats":
await self._handle_stats_callback(query, rpc)
elif action == "cancel":
await query.edit_message_text("❌ 操作已取消")
# 云存储相关回调
elif action == "cloud":
await self._handle_cloud_callback(query, update, context, parts)
elif action == "upload":
await self._handle_upload_callback(query, update, context, parts)
except RpcError as e:
await query.edit_message_text(f"❌ 操作失败: {e}")
async def _handle_list_callback(
self, query, rpc: Aria2RpcClient, parts: list
) -> None:
"""处理列表相关回调"""
if parts[1] == "menu":
stat = await rpc.get_global_stat()
keyboard = build_list_type_keyboard(
int(stat.get("numActive", 0)),
int(stat.get("numWaiting", 0)),
int(stat.get("numStopped", 0)),
)
await query.edit_message_text("📥 选择查看类型:", reply_markup=keyboard)
return
list_type = parts[1]
page = int(parts[2]) if len(parts) > 2 else 1
if list_type == "active":
tasks = await rpc.get_active()
title = "▶️ 活动任务"
elif list_type == "waiting":
tasks = await rpc.get_waiting()
title = "⏳ 等待任务"
else: # stopped
tasks = await rpc.get_stopped()
title = "✅ 已完成/错误"
await self._send_task_list(query, tasks, page, list_type, title)
async def _send_task_list(
self, query, tasks: list[DownloadTask], page: int, list_type: str, title: str
) -> None:
"""发送任务列表"""
page_size = 5
total_pages = max(1, (len(tasks) + page_size - 1) // page_size)
start = (page - 1) * page_size
page_tasks = tasks[start : start + page_size]
if not tasks:
from src.telegram.keyboards import build_task_list_keyboard
keyboard = build_task_list_keyboard(1, 1, list_type)
await query.edit_message_text(f"{title}\n\n📭 暂无任务", reply_markup=keyboard)
return
lines = [f"{title} ({page}/{total_pages})\n"]
for t in page_tasks:
emoji = STATUS_EMOJI.get(t.status, "")
lines.append(f"{emoji} {t.name}")
lines.append(f" {t.progress_bar} {t.progress:.1f}%")
lines.append(f" {t.size_str} | {t.speed_str}")
# 添加操作按钮提示
if t.status == "active":
lines.append(f" ⏸ /pause\\_{t.gid[:8]}")
elif t.status in ("paused", "waiting"):
lines.append(f" ▶️ /resume\\_{t.gid[:8]}")
lines.append(f" 📋 详情: 点击下方按钮\n")
# 为每个任务添加操作按钮
task_buttons = []
for t in page_tasks:
row = []
if t.status == "active":
row.append(
{"text": f"{t.gid[:6]}", "callback_data": f"pause:{t.gid}"}
)
elif t.status in ("paused", "waiting"):
row.append(
{"text": f"▶️ {t.gid[:6]}", "callback_data": f"resume:{t.gid}"}
)
row.append({"text": f"🗑 {t.gid[:6]}", "callback_data": f"delete:{t.gid}"})
row.append({"text": f"📋 {t.gid[:6]}", "callback_data": f"detail:{t.gid}"})
task_buttons.append(row)
# 构建完整键盘
keyboard_rows = []
for row in task_buttons:
keyboard_rows.append(
[
InlineKeyboardButton(b["text"], callback_data=b["callback_data"])
for b in row
]
)
# 添加翻页按钮
nav_buttons = []
if page > 1:
nav_buttons.append(
InlineKeyboardButton(
"⬅️ 上一页", callback_data=f"list:{list_type}:{page - 1}"
)
)
if page < total_pages:
nav_buttons.append(
InlineKeyboardButton(
"➡️ 下一页", callback_data=f"list:{list_type}:{page + 1}"
)
)
if nav_buttons:
keyboard_rows.append(nav_buttons)
keyboard_rows.append(
[InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")]
)
await query.edit_message_text(
"\n".join(lines), reply_markup=InlineKeyboardMarkup(keyboard_rows)
)
async def _handle_pause_callback(
self, query, rpc: Aria2RpcClient, gid: str
) -> None:
"""处理暂停回调,然后返回详情页继续刷新"""
await rpc.pause(gid)
await self._handle_detail_callback(query, rpc, gid)
async def _handle_resume_callback(
self, query, rpc: Aria2RpcClient, gid: str
) -> None:
"""处理恢复回调,然后返回详情页继续刷新"""
await rpc.unpause(gid)
await self._handle_detail_callback(query, rpc, gid)
async def _handle_delete_callback(self, query, gid: str) -> None:
"""处理删除确认回调"""
keyboard = build_delete_confirm_keyboard(gid)
await query.edit_message_text(
f"⚠️ 确认删除任务?\n🆔 GID: `{gid}`",
parse_mode="Markdown",
reply_markup=keyboard,
)
async def _handle_confirm_delete_callback(
self, query, rpc: Aria2RpcClient, gid: str, delete_file: str
) -> None:
"""处理确认删除回调"""
task = None
try:
task = await rpc.get_status(gid)
except RpcError:
pass
# 尝试删除任务
try:
await rpc.remove(gid)
except RpcError:
try:
await rpc.force_remove(gid)
except RpcError:
pass
try:
await rpc.remove_download_result(gid)
except RpcError:
pass
# 如果需要删除文件(使用 asyncio.to_thread 避免阻塞事件循环)
file_deleted = False
if delete_file == "1" and task:
file_deleted = await asyncio.to_thread(rpc.delete_files, task)
msg = f"🗑️ 任务已删除\n🆔 GID: `{gid}`"
if delete_file == "1":
msg += f"\n📁 文件: {'已删除' if file_deleted else '删除失败或不存在'}"
await query.edit_message_text(msg, parse_mode="Markdown")
def _stop_auto_refresh(self, key: str) -> None:
"""停止自动刷新任务并等待清理"""
if key in self._auto_refresh_tasks:
task = self._auto_refresh_tasks.pop(key)
task.cancel()
# 注意:这里不等待任务完成,因为是同步方法
# 任务会在 finally 块中自行清理
async def _handle_detail_callback(
self, query, rpc: Aria2RpcClient, gid: str
) -> None:
"""处理详情回调,启动自动刷新"""
chat_id = query.message.chat_id
msg_id = query.message.message_id
key = f"{chat_id}:{msg_id}"
# 停止该消息之前的刷新任务
self._stop_auto_refresh(key)
# 启动新的自动刷新任务
task = asyncio.create_task(
self._auto_refresh_detail(query.message, rpc, gid, key)
)
self._auto_refresh_tasks[key] = task
async def _auto_refresh_detail(
self, message, rpc: Aria2RpcClient, gid: str, key: str
) -> None:
"""自动刷新详情页面"""
from .app_ref import get_bot_instance
try:
last_text = ""
for _ in range(60): # 最多刷新 2 分钟
try:
task = await rpc.get_status(gid)
except RpcError:
break
emoji = STATUS_EMOJI.get(task.status, "")
safe_name = (
task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
)
text = (
f"📋 *任务详情*\n"
f"📄 文件: {safe_name}\n"
f"🆔 GID: `{task.gid}`\n"
f"📊 状态: {emoji} {task.status}\n"
f"📈 进度: {task.progress_bar} {task.progress:.1f}%\n"
f"📦 大小: {task.size_str}\n"
f"⬇️ 下载: {task.speed_str}\n"
f"⬆️ 上传: {_format_size(task.upload_speed)}/s"
)
if task.error_message:
text += f"\n❌ 错误: {task.error_message}"
# 检查是否显示上传按钮
show_onedrive = (
task.status == "complete"
and self._onedrive_config
and self._onedrive_config.enabled
)
show_channel = (
task.status == "complete"
and self._telegram_channel_config
and self._telegram_channel_config.enabled
)
keyboard = build_detail_keyboard_with_upload(
gid, task.status, show_onedrive, show_channel
)
# 只有内容变化时才更新
if text != last_text:
try:
await message.edit_text(
text, parse_mode="Markdown", reply_markup=keyboard
)
last_text = text
except Exception as e:
logger.warning(f"编辑消息失败 (GID={gid}): {e}")
break
# 任务完成或出错时停止刷新
if task.status in ("complete", "error", "removed"):
# 任务完成时检查是否需要自动上传(使用协调上传)
if task.status == "complete" and gid not in self._auto_uploaded_gids:
_bot_instance = get_bot_instance()
need_onedrive = (
self._onedrive_config
and self._onedrive_config.enabled
and self._onedrive_config.auto_upload
)
need_telegram = (
self._telegram_channel_config
and self._telegram_channel_config.enabled
and self._telegram_channel_config.auto_upload
)
if need_onedrive or need_telegram:
self._auto_uploaded_gids.add(gid)
self._channel_uploaded_gids.add(gid)
asyncio.create_task(
self._coordinated_auto_upload(
message.chat_id, gid, task, _bot_instance
)
)
break
await asyncio.sleep(2)
finally:
self._auto_refresh_tasks.pop(key, None)
async def _handle_stats_callback(self, query, rpc: Aria2RpcClient) -> None:
"""处理统计回调"""
stat = await rpc.get_global_stat()
text = (
"📊 *全局统计*\n"
f"⬇️ 下载速度: {_format_size(int(stat.get('downloadSpeed', 0)))}/s\n"
f"⬆️ 上传速度: {_format_size(int(stat.get('uploadSpeed', 0)))}/s\n"
f"▶️ 活动任务: {stat.get('numActive', 0)}\n"
f"⏳ 等待任务: {stat.get('numWaiting', 0)}\n"
f"⏹️ 已停止: {stat.get('numStopped', 0)}"
)
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("🔙 返回列表", callback_data="list:menu")]]
)
await query.edit_message_text(text, parse_mode="Markdown", reply_markup=keyboard)
# === 云存储回调处理 ===
async def _handle_cloud_callback(
self, query, update: Update, context: ContextTypes.DEFAULT_TYPE, parts: list
) -> None:
"""处理云存储相关回调"""
if len(parts) < 2:
await query.edit_message_text("❌ 无效操作")
return
sub_action = parts[1]
# 主菜单
if sub_action == "menu":
keyboard = build_cloud_menu_keyboard()
await query.edit_message_text(
"☁️ *云存储管理*\n\n选择要配置的云存储:",
parse_mode="Markdown",
reply_markup=keyboard,
)
# OneDrive 相关
elif sub_action == "onedrive":
await self._handle_onedrive_callback(
query, update, context, parts[2:] if len(parts) > 2 else []
)
# Telegram 频道相关
elif sub_action == "telegram":
await self._handle_telegram_channel_callback(
query, update, context, parts[2:] if len(parts) > 2 else []
)
# 兼容旧的回调格式
elif sub_action == "auth":
await self.cloud_auth(update, context)
elif sub_action == "status":
await self._handle_onedrive_callback(query, update, context, ["status"])
elif sub_action == "settings":
await self._handle_onedrive_callback(query, update, context, ["settings"])
elif sub_action == "logout":
await self._handle_onedrive_callback(query, update, context, ["logout"])
elif sub_action == "toggle":
await self._handle_onedrive_callback(
query, update, context, ["toggle"] + parts[2:]
)
async def _handle_onedrive_callback(
self, query, update: Update, context: ContextTypes.DEFAULT_TYPE, parts: list
) -> None:
"""处理 OneDrive 相关回调"""
action = parts[0] if parts else "menu"
if action == "menu":
keyboard = build_onedrive_menu_keyboard()
await query.edit_message_text(
"☁️ *OneDrive 设置*", parse_mode="Markdown", reply_markup=keyboard
)
elif action == "auth":
await self.cloud_auth(update, context)
elif action == "status":
client = self._get_onedrive_client()
if not client:
await query.edit_message_text("❌ OneDrive 未配置")
return
is_auth = await client.is_authenticated()
auto_upload = (
self._onedrive_config.auto_upload if self._onedrive_config else False
)
delete_after = (
self._onedrive_config.delete_after_upload
if self._onedrive_config
else False
)
remote_path = (
self._onedrive_config.remote_path
if self._onedrive_config
else "/aria2bot"
)
text = (
"☁️ *OneDrive 状态*\n\n"
f"🔐 认证状态: {'✅ 已认证' if is_auth else '❌ 未认证'}\n"
f"📤 自动上传: {'✅ 开启' if auto_upload else '❌ 关闭'}\n"
f"🗑️ 上传后删除: {'✅ 开启' if delete_after else '❌ 关闭'}\n"
f"📁 远程路径: `{remote_path}`"
)
keyboard = build_onedrive_menu_keyboard()
await query.edit_message_text(
text, parse_mode="Markdown", reply_markup=keyboard
)
elif action == "settings":
auto_upload = (
self._onedrive_config.auto_upload if self._onedrive_config else False
)
delete_after = (
self._onedrive_config.delete_after_upload
if self._onedrive_config
else False
)
keyboard = build_cloud_settings_keyboard(auto_upload, delete_after)
await query.edit_message_text(
"⚙️ *OneDrive 设置*\n\n点击切换设置:",
parse_mode="Markdown",
reply_markup=keyboard,
)
elif action == "logout":
client = self._get_onedrive_client()
if client and await client.logout():
await query.edit_message_text("✅ 已登出 OneDrive")
else:
await query.edit_message_text("❌ 登出失败")
elif action == "toggle":
if len(parts) < 2:
return
setting = parts[1]
if self._onedrive_config:
if setting == "auto_upload":
self._onedrive_config.auto_upload = not self._onedrive_config.auto_upload
elif setting == "delete_after":
self._onedrive_config.delete_after_upload = (
not self._onedrive_config.delete_after_upload
)
# 保存配置
self._save_cloud_config()
auto_upload = (
self._onedrive_config.auto_upload if self._onedrive_config else False
)
delete_after = (
self._onedrive_config.delete_after_upload
if self._onedrive_config
else False
)
keyboard = build_cloud_settings_keyboard(auto_upload, delete_after)
await query.edit_message_text(
"⚙️ *OneDrive 设置*\n\n点击切换设置:",
parse_mode="Markdown",
reply_markup=keyboard,
)
async def _handle_telegram_channel_callback(
self, query, update: Update, context: ContextTypes.DEFAULT_TYPE, parts: list
) -> None:
"""处理 Telegram 频道相关回调"""
action = parts[0] if parts else "menu"
if action == "menu":
enabled = (
self._telegram_channel_config.enabled
if self._telegram_channel_config
else False
)
channel_id = (
self._telegram_channel_config.channel_id
if self._telegram_channel_config
else ""
)
keyboard = build_telegram_channel_menu_keyboard(enabled, channel_id)
await query.edit_message_text(
"📢 *Telegram 频道设置*", parse_mode="Markdown", reply_markup=keyboard
)
elif action == "info":
# 显示频道信息
if not self._telegram_channel_config:
await query.answer("频道未配置")
return
channel_id = self._telegram_channel_config.channel_id
if channel_id:
await query.answer(f"当前频道: {channel_id}")
else:
await query.answer("频道ID未设置请在设置中配置")
elif action == "settings":
auto_upload = (
self._telegram_channel_config.auto_upload
if self._telegram_channel_config
else False
)
delete_after = (
self._telegram_channel_config.delete_after_upload
if self._telegram_channel_config
else False
)
channel_id = (
self._telegram_channel_config.channel_id
if self._telegram_channel_config
else ""
)
keyboard = build_telegram_channel_settings_keyboard(
auto_upload, delete_after, channel_id
)
await query.edit_message_text(
"⚙️ *Telegram 频道设置*\n\n点击切换设置:",
parse_mode="Markdown",
reply_markup=keyboard,
)
elif action == "toggle":
if len(parts) < 2:
return
setting = parts[1]
if self._telegram_channel_config:
if setting == "enabled":
self._telegram_channel_config.enabled = (
not self._telegram_channel_config.enabled
)
# 重新创建客户端
self._recreate_telegram_channel_client(context.bot)
elif setting == "auto_upload":
self._telegram_channel_config.auto_upload = (
not self._telegram_channel_config.auto_upload
)
elif setting == "delete_after":
self._telegram_channel_config.delete_after_upload = (
not self._telegram_channel_config.delete_after_upload
)
# 保存配置
self._save_cloud_config()
# 根据来源返回不同页面
if setting == "enabled":
enabled = (
self._telegram_channel_config.enabled
if self._telegram_channel_config
else False
)
channel_id = (
self._telegram_channel_config.channel_id
if self._telegram_channel_config
else ""
)
keyboard = build_telegram_channel_menu_keyboard(enabled, channel_id)
await query.edit_message_text(
"📢 *Telegram 频道设置*",
parse_mode="Markdown",
reply_markup=keyboard,
)
else:
auto_upload = (
self._telegram_channel_config.auto_upload
if self._telegram_channel_config
else False
)
delete_after = (
self._telegram_channel_config.delete_after_upload
if self._telegram_channel_config
else False
)
channel_id = (
self._telegram_channel_config.channel_id
if self._telegram_channel_config
else ""
)
keyboard = build_telegram_channel_settings_keyboard(
auto_upload, delete_after, channel_id
)
await query.edit_message_text(
"⚙️ *Telegram 频道设置*\n\n点击切换设置:",
parse_mode="Markdown",
reply_markup=keyboard,
)
elif action == "set_channel":
# 提示用户输入频道ID
user_id = update.effective_user.id if update.effective_user else None
if user_id:
self._pending_channel_input = {user_id: True}
await query.edit_message_text(
"📝 *设置频道ID*\n\n"
"请发送频道ID或频道用户名\n"
"• 频道ID格式: `-100xxxxxxxxxx`\n"
"• 用户名格式: `@channel_name`\n\n"
"注意Bot 必须是频道管理员才能发送消息",
parse_mode="Markdown",
)
async def _handle_upload_callback(
self, query, update: Update, context: ContextTypes.DEFAULT_TYPE, parts: list
) -> None:
"""处理上传回调"""
if len(parts) < 3:
await query.edit_message_text("❌ 无效操作")
return
provider = parts[1] # onedrive / telegram
gid = parts[2]
if provider == "onedrive":
await self.upload_to_cloud(update, context, gid)
elif provider == "telegram":
await self._upload_to_channel_manual(query, update, context, gid)

View File

@@ -0,0 +1,212 @@
"""Telegram 频道存储功能处理。"""
from __future__ import annotations
import asyncio
from pathlib import Path
from telegram import Update
from telegram.ext import ContextTypes
from src.utils.logger import get_logger
from src.core import RpcError
from .base import _get_user_info
logger = get_logger("handlers.cloud_channel")
class TelegramChannelHandlersMixin:
"""Telegram 频道存储功能 Mixin"""
async def _trigger_channel_auto_upload(self, chat_id: int, gid: str, bot) -> None:
"""触发频道自动上传"""
logger.info(f"触发频道自动上传 GID={gid}")
client = self._get_telegram_channel_client(bot)
if not client:
logger.warning(f"频道上传跳过:频道未配置 GID={gid}")
return
rpc = self._get_rpc_client()
try:
task = await rpc.get_status(gid)
except RpcError as e:
logger.error(f"频道上传失败:获取任务信息失败 GID={gid}: {e}")
return
if task.status != "complete":
return
local_path = Path(task.dir) / task.name
if not local_path.exists():
logger.error(
f"频道上传失败:本地文件不存在 GID={gid}, dir={task.dir}, name={task.name}, path={local_path}"
)
return
# 检查文件大小
file_size = local_path.stat().st_size
if file_size > client.get_max_size():
limit_mb = client.get_max_size_mb()
await bot.send_message(
chat_id=chat_id, text=f"⚠️ 文件 {task.name} 超过 {limit_mb}MB 限制,跳过频道上传"
)
return
asyncio.create_task(
self._do_channel_upload(client, local_path, task.name, chat_id, gid, bot)
)
async def _do_channel_upload(
self,
client,
local_path,
task_name: str,
chat_id: int,
gid: str,
bot,
skip_delete: bool = False,
) -> bool:
"""执行频道上传
Args:
skip_delete: 是否跳过删除(用于并行上传协调)
Returns:
上传是否成功
"""
try:
msg = await bot.send_message(chat_id=chat_id, text=f"📢 正在发送到频道: {task_name}")
except Exception as e:
logger.error(f"频道上传失败:发送消息失败 GID={gid}: {e}")
return False
try:
success, result = await client.upload_file(local_path)
if success:
result_text = f"✅ 已发送到频道: {task_name}"
# 只有不跳过删除且配置了删除时才删除
if (
not skip_delete
and self._telegram_channel_config
and self._telegram_channel_config.delete_after_upload
):
_, delete_msg = await self._delete_local_file(local_path, gid)
result_text += f"\n{delete_msg}"
await msg.edit_text(result_text)
logger.info(f"频道上传成功 GID={gid}")
return True
else:
await msg.edit_text(f"❌ 发送到频道失败: {task_name}\n原因: {result}")
logger.error(f"频道上传失败 GID={gid}: {result}")
return False
except Exception as e:
logger.error(f"频道上传异常 GID={gid}: {e}")
try:
await msg.edit_text(f"❌ 发送到频道失败: {task_name}\n错误: {e}")
except Exception:
pass
return False
async def handle_channel_id_input(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> bool:
"""处理频道ID输入返回 True 表示已处理"""
user_id = update.effective_user.id if update.effective_user else None
if not user_id or user_id not in self._pending_channel_input:
return False
# 清除等待状态
del self._pending_channel_input[user_id]
text = update.message.text.strip()
if not text:
await self._reply(update, context, "❌ 频道ID不能为空")
return True
# 验证格式
if not (
text.startswith("@") or text.startswith("-100") or text.lstrip("-").isdigit()
):
await self._reply(
update,
context,
"❌ 无效的频道ID格式\n\n"
"请使用以下格式之一:\n"
"• `@channel_name`\n"
"• `-100xxxxxxxxxx`",
parse_mode="Markdown",
)
return True
# 更新配置
if self._telegram_channel_config:
self._telegram_channel_config.channel_id = text
# 重新创建客户端
self._recreate_telegram_channel_client(context.bot)
# 保存配置
self._save_cloud_config()
await self._reply(
update,
context,
f"✅ 频道ID已设置为: `{text}`\n\n" "请确保 Bot 已被添加为频道管理员",
parse_mode="Markdown",
)
else:
await self._reply(update, context, "❌ 频道配置未初始化")
return True
async def _upload_to_channel_manual(
self, query, update: Update, context: ContextTypes.DEFAULT_TYPE, gid: str
) -> None:
"""手动上传到频道"""
import shutil
client = self._get_telegram_channel_client(context.bot)
if not client:
await query.edit_message_text("❌ 频道存储未配置")
return
rpc = self._get_rpc_client()
try:
task = await rpc.get_status(gid)
except RpcError as e:
await query.edit_message_text(f"❌ 获取任务信息失败: {e}")
return
if task.status != "complete":
await query.edit_message_text("❌ 任务未完成,无法上传")
return
local_path = Path(task.dir) / task.name
if not local_path.exists():
await query.edit_message_text("❌ 本地文件不存在")
return
# 检查文件大小
file_size = local_path.stat().st_size
if file_size > client.get_max_size():
limit_mb = client.get_max_size_mb()
await query.edit_message_text(f"❌ 文件超过 {limit_mb}MB 限制")
return
await query.edit_message_text(f"📢 正在发送到频道: {task.name}")
success, result = await client.upload_file(local_path)
if success:
result_text = f"✅ 已发送到频道: {task.name}"
if (
self._telegram_channel_config
and self._telegram_channel_config.delete_after_upload
):
try:
if local_path.is_dir():
shutil.rmtree(local_path)
else:
local_path.unlink()
result_text += "\n🗑️ 本地文件已删除"
except Exception as e:
result_text += f"\n⚠️ 删除本地文件失败: {e}"
await query.edit_message_text(result_text)
else:
await query.edit_message_text(f"❌ 发送失败: {result}")

View File

@@ -0,0 +1,147 @@
"""多云存储协调功能。"""
from __future__ import annotations
import asyncio
from pathlib import Path
from src.utils.logger import get_logger
from src.core import DOWNLOAD_DIR
logger = get_logger("handlers.cloud_coordinator")
class CloudCoordinatorMixin:
"""多云存储协调 Mixin"""
async def _coordinated_auto_upload(self, chat_id: int, gid: str, task, bot) -> None:
"""协调多云存储并行上传
当 OneDrive 和 Telegram 频道都启用自动上传且都启用删除时,
并行执行上传,全部成功后才删除本地文件。
"""
local_path = Path(task.dir) / task.name
if not local_path.exists():
logger.error(f"协调上传失败:本地文件不存在 GID={gid}")
return
# 检测哪些云存储需要上传
need_onedrive = (
self._onedrive_config
and self._onedrive_config.enabled
and self._onedrive_config.auto_upload
)
need_telegram = (
self._telegram_channel_config
and self._telegram_channel_config.enabled
and self._telegram_channel_config.auto_upload
)
# 检测是否需要协调删除(两个都启用删除)
onedrive_delete = need_onedrive and self._onedrive_config.delete_after_upload
telegram_delete = need_telegram and self._telegram_channel_config.delete_after_upload
need_coordinated_delete = onedrive_delete and telegram_delete
if need_coordinated_delete:
# 并行执行,跳过各自的删除,最后统一删除
logger.info(f"启动协调并行上传 GID={gid}")
await self._parallel_upload_with_coordinated_delete(
chat_id, gid, local_path, task.name, bot
)
else:
# 独立执行(保持现有逻辑)
if need_onedrive and gid not in self._auto_uploaded_gids:
self._auto_uploaded_gids.add(gid)
asyncio.create_task(self._trigger_auto_upload(chat_id, gid))
if need_telegram and gid not in self._channel_uploaded_gids:
self._channel_uploaded_gids.add(gid)
asyncio.create_task(self._trigger_channel_auto_upload(chat_id, gid, bot))
async def _parallel_upload_with_coordinated_delete(
self, chat_id: int, gid: str, local_path, task_name: str, bot
) -> None:
"""并行上传到多个云存储,全部成功后才删除文件"""
from .app_ref import get_bot_instance
# 准备 OneDrive 上传参数
onedrive_client = self._get_onedrive_client()
onedrive_authenticated = onedrive_client and await onedrive_client.is_authenticated()
# 计算 OneDrive 远程路径
try:
download_dir = DOWNLOAD_DIR.resolve()
relative_path = local_path.resolve().relative_to(download_dir)
remote_path = f"{self._onedrive_config.remote_path}/{relative_path.parent}"
except ValueError:
remote_path = self._onedrive_config.remote_path
# 准备 Telegram 频道客户端
telegram_client = self._get_telegram_channel_client(bot)
# 检查文件大小是否超过 Telegram 限制
telegram_size_ok = True
if telegram_client:
file_size = local_path.stat().st_size
if file_size > telegram_client.get_max_size():
telegram_size_ok = False
limit_mb = telegram_client.get_max_size_mb()
await bot.send_message(
chat_id=chat_id,
text=f"⚠️ 文件 {task_name} 超过 {limit_mb}MB 限制,跳过频道上传",
)
# 构建上传任务列表
tasks = []
task_names = []
if onedrive_authenticated:
tasks.append(
self._do_auto_upload(
onedrive_client,
local_path,
remote_path,
task_name,
chat_id,
gid,
skip_delete=True,
)
)
task_names.append("onedrive")
if telegram_client and telegram_size_ok:
tasks.append(
self._do_channel_upload(
telegram_client, local_path, task_name, chat_id, gid, bot, skip_delete=True
)
)
task_names.append("telegram")
if not tasks:
logger.warning(f"协调上传跳过:没有可用的上传目标 GID={gid}")
return
# 并行执行上传
results = await asyncio.gather(*tasks, return_exceptions=True)
# 分析结果
all_success = True
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"协调上传异常 ({task_names[i]}) GID={gid}: {result}")
all_success = False
elif result is not True:
all_success = False
# 只有全部成功才删除
_bot_instance = get_bot_instance()
if all_success and len(tasks) > 0:
_, delete_msg = await self._delete_local_file(local_path, gid)
if _bot_instance:
await _bot_instance.send_message(
chat_id=chat_id, text=f"📦 所有上传完成: {task_name}\n{delete_msg}"
)
elif not all_success:
if _bot_instance:
await _bot_instance.send_message(
chat_id=chat_id, text=f"⚠️ 部分上传失败,保留本地文件: {task_name}"
)

View File

@@ -0,0 +1,358 @@
"""OneDrive 云存储功能处理。"""
from __future__ import annotations
import asyncio
from pathlib import Path
from telegram import Update
from telegram.ext import ContextTypes
from src.utils.logger import get_logger
from src.core import RpcError, DOWNLOAD_DIR
from src.cloud.base import UploadProgress, UploadStatus
from src.telegram.keyboards import build_cloud_menu_keyboard
from .base import _get_user_info
logger = get_logger("handlers.cloud_onedrive")
class OneDriveHandlersMixin:
"""OneDrive 云存储功能 Mixin"""
async def cloud_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""云存储管理菜单"""
logger.info(f"收到 /cloud 命令 - {_get_user_info(update)}")
if not self._onedrive_config or not self._onedrive_config.enabled:
await self._reply(
update, context, "❌ 云存储功能未启用,请在配置中设置 ONEDRIVE_ENABLED=true"
)
return
keyboard = build_cloud_menu_keyboard()
await self._reply(
update, context, "☁️ *云存储管理*", parse_mode="Markdown", reply_markup=keyboard
)
async def cloud_auth(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""开始 OneDrive 认证"""
logger.info(f"收到云存储认证请求 - {_get_user_info(update)}")
client = self._get_onedrive_client()
if not client:
await self._reply(update, context, "❌ OneDrive 未配置")
return
if await client.is_authenticated():
await self._reply(update, context, "✅ OneDrive 已认证")
return
url, flow = await client.get_auth_url()
user_id = update.effective_user.id
auth_message = await self._reply(
update,
context,
f"🔐 *OneDrive 认证*\n\n"
f"1\\. 点击下方链接登录 Microsoft 账户\n"
f"2\\. 授权后会跳转到一个空白页面\n"
f"3\\. 复制该页面的完整 URL 发送给我\n\n"
f"[点击认证]({url})",
parse_mode="Markdown",
)
self._pending_auth[user_id] = {"flow": flow, "message": auth_message}
async def handle_auth_callback(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""处理用户发送的认证回调 URL"""
text = update.message.text
if not text or not text.startswith("https://login.microsoftonline.com"):
return
user_id = update.effective_user.id
if user_id not in self._pending_auth:
return
client = self._get_onedrive_client()
if not client:
return
user_message = update.message # 保存用户消息引用
pending = self._pending_auth[user_id]
flow = pending["flow"]
auth_message = pending.get("message") # 认证指引消息
if await client.authenticate_with_code(text, flow=flow):
del self._pending_auth[user_id]
reply_message = await self._reply(update, context, "✅ OneDrive 认证成功!")
logger.info(f"OneDrive 认证成功 - {_get_user_info(update)}")
else:
# 认证失败时清理认证信息
del self._pending_auth[user_id]
await client.logout() # 删除可能存在的旧 token
reply_message = await self._reply(update, context, "❌ 认证失败,请重试")
logger.error(f"OneDrive 认证失败 - {_get_user_info(update)}")
# 延迟 5 秒后删除敏感消息(包括认证指引消息)
messages_to_delete = [msg for msg in [user_message, reply_message, auth_message] if msg]
if messages_to_delete:
asyncio.create_task(self._delayed_delete_messages(messages_to_delete))
async def cloud_logout(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""登出云存储"""
logger.info(f"收到云存储登出请求 - {_get_user_info(update)}")
client = self._get_onedrive_client()
if not client:
await self._reply(update, context, "❌ OneDrive 未配置")
return
if await client.logout():
await self._reply(update, context, "✅ 已登出 OneDrive")
else:
await self._reply(update, context, "❌ 登出失败")
async def cloud_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""查看云存储状态"""
logger.info(f"收到云存储状态查询 - {_get_user_info(update)}")
client = self._get_onedrive_client()
if not client:
await self._reply(update, context, "❌ OneDrive 未配置")
return
is_auth = await client.is_authenticated()
auto_upload = self._onedrive_config.auto_upload if self._onedrive_config else False
delete_after = (
self._onedrive_config.delete_after_upload if self._onedrive_config else False
)
remote_path = self._onedrive_config.remote_path if self._onedrive_config else "/aria2bot"
text = (
"☁️ *OneDrive 状态*\n\n"
f"🔐 认证状态: {'✅ 已认证' if is_auth else '❌ 未认证'}\n"
f"📤 自动上传: {'✅ 开启' if auto_upload else '❌ 关闭'}\n"
f"🗑️ 上传后删除: {'✅ 开启' if delete_after else '❌ 关闭'}\n"
f"📁 远程路径: `{remote_path}`"
)
await self._reply(update, context, text, parse_mode="Markdown")
async def upload_to_cloud(
self, update: Update, context: ContextTypes.DEFAULT_TYPE, gid: str
) -> None:
"""上传文件到云存储(启动后台任务,不阻塞其他命令)"""
logger.info(f"收到上传请求 GID={gid} - {_get_user_info(update)}")
client = self._get_onedrive_client()
if not client or not await client.is_authenticated():
await self._reply(update, context, "❌ OneDrive 未认证,请先使用 /cloud 进行认证")
return
rpc = self._get_rpc_client()
try:
task = await rpc.get_status(gid)
except RpcError as e:
await self._reply(update, context, f"❌ 获取任务信息失败: {e}")
return
if task.status != "complete":
await self._reply(update, context, "❌ 任务未完成,无法上传")
return
local_path = Path(task.dir) / task.name
if not local_path.exists():
await self._reply(update, context, "❌ 本地文件不存在")
return
# 计算远程路径(保持目录结构)
try:
download_dir = DOWNLOAD_DIR.resolve()
relative_path = local_path.resolve().relative_to(download_dir)
remote_path = f"{self._onedrive_config.remote_path}/{relative_path.parent}"
except ValueError:
remote_path = self._onedrive_config.remote_path
msg = await self._reply(update, context, f"☁️ 正在上传: {task.name}\n⏳ 请稍候...")
# 启动后台上传任务,不阻塞其他命令
asyncio.create_task(
self._do_upload_to_cloud(
client, local_path, remote_path, task.name, msg, gid, _get_user_info(update)
)
)
async def _do_upload_to_cloud(
self, client, local_path, remote_path: str, task_name: str, msg, gid: str, user_info: str
) -> None:
"""后台执行上传任务"""
import shutil
loop = asyncio.get_running_loop()
# 进度回调函数
async def update_progress(progress: UploadProgress):
"""更新上传进度消息"""
if progress.status == UploadStatus.UPLOADING and progress.total_size > 0:
percent = progress.progress
uploaded_mb = progress.uploaded_size / (1024 * 1024)
total_mb = progress.total_size / (1024 * 1024)
progress_text = (
f"☁️ 正在上传: {task_name}\n"
f"📤 {percent:.1f}% ({uploaded_mb:.1f}MB / {total_mb:.1f}MB)"
)
try:
await msg.edit_text(progress_text)
except Exception:
pass # 忽略消息更新失败(如内容未变化)
def sync_progress_callback(progress: UploadProgress):
"""同步回调,将异步更新调度到事件循环"""
if progress.status == UploadStatus.UPLOADING:
asyncio.run_coroutine_threadsafe(update_progress(progress), loop)
try:
success = await client.upload_file(
local_path, remote_path, progress_callback=sync_progress_callback
)
if success:
result_text = f"✅ 上传成功: {task_name}"
if self._onedrive_config and self._onedrive_config.delete_after_upload:
try:
if local_path.is_dir():
shutil.rmtree(local_path)
else:
local_path.unlink()
result_text += "\n🗑️ 本地文件已删除"
except Exception as e:
result_text += f"\n⚠️ 删除本地文件失败: {e}"
await msg.edit_text(result_text)
logger.info(f"上传成功 GID={gid} - {user_info}")
else:
await msg.edit_text(f"❌ 上传失败: {task_name}")
logger.error(f"上传失败 GID={gid} - {user_info}")
except Exception as e:
logger.error(f"上传异常 GID={gid}: {e} - {user_info}")
try:
await msg.edit_text(f"❌ 上传失败: {task_name}\n错误: {e}")
except Exception:
pass
async def _trigger_auto_upload(self, chat_id: int, gid: str) -> None:
"""自动上传触发(下载完成后自动调用)"""
logger.info(f"触发自动上传 GID={gid}")
client = self._get_onedrive_client()
if not client or not await client.is_authenticated():
logger.warning(f"自动上传跳过OneDrive 未认证 GID={gid}")
return
rpc = self._get_rpc_client()
try:
task = await rpc.get_status(gid)
except RpcError as e:
logger.error(f"自动上传失败:获取任务信息失败 GID={gid}: {e}")
return
if task.status != "complete":
logger.warning(f"自动上传跳过:任务未完成 GID={gid}")
return
local_path = Path(task.dir) / task.name
if not local_path.exists():
logger.error(f"自动上传失败:本地文件不存在 GID={gid}")
return
# 计算远程路径
try:
download_dir = DOWNLOAD_DIR.resolve()
relative_path = local_path.resolve().relative_to(download_dir)
remote_path = f"{self._onedrive_config.remote_path}/{relative_path.parent}"
except ValueError:
remote_path = self._onedrive_config.remote_path
# 启动后台上传任务
asyncio.create_task(
self._do_auto_upload(client, local_path, remote_path, task.name, chat_id, gid)
)
async def _do_auto_upload(
self,
client,
local_path,
remote_path: str,
task_name: str,
chat_id: int,
gid: str,
skip_delete: bool = False,
) -> bool:
"""后台执行自动上传任务
Args:
skip_delete: 是否跳过删除(用于并行上传协调)
Returns:
上传是否成功
"""
from .app_ref import get_bot_instance
_bot_instance = get_bot_instance()
if _bot_instance is None:
logger.error(f"自动上传失败:无法获取 bot 实例 GID={gid}")
return False
# 发送上传开始通知
try:
msg = await _bot_instance.send_message(
chat_id=chat_id, text=f"☁️ 自动上传开始: {task_name}\n⏳ 请稍候..."
)
except Exception as e:
logger.error(f"自动上传失败:发送消息失败 GID={gid}: {e}")
return False
loop = asyncio.get_running_loop()
# 进度回调函数
async def update_progress(progress):
if progress.status == UploadStatus.UPLOADING and progress.total_size > 0:
percent = progress.progress
uploaded_mb = progress.uploaded_size / (1024 * 1024)
total_mb = progress.total_size / (1024 * 1024)
progress_text = (
f"☁️ 自动上传: {task_name}\n"
f"📤 {percent:.1f}% ({uploaded_mb:.1f}MB / {total_mb:.1f}MB)"
)
try:
await msg.edit_text(progress_text)
except Exception:
pass
def sync_progress_callback(progress):
if progress.status == UploadStatus.UPLOADING:
asyncio.run_coroutine_threadsafe(update_progress(progress), loop)
try:
success = await client.upload_file(
local_path, remote_path, progress_callback=sync_progress_callback
)
if success:
result_text = f"✅ 自动上传成功: {task_name}"
# 只有不跳过删除且配置了删除时才删除
if (
not skip_delete
and self._onedrive_config
and self._onedrive_config.delete_after_upload
):
_, delete_msg = await self._delete_local_file(local_path, gid)
result_text += f"\n{delete_msg}"
await msg.edit_text(result_text)
logger.info(f"自动上传成功 GID={gid}")
return True
else:
await msg.edit_text(f"❌ 自动上传失败: {task_name}")
logger.error(f"自动上传失败 GID={gid}")
return False
except Exception as e:
logger.error(f"自动上传异常 GID={gid}: {e}")
try:
await msg.edit_text(f"❌ 自动上传失败: {task_name}\n错误: {e}")
except Exception:
pass
return False

View File

@@ -0,0 +1,219 @@
"""下载管理命令处理。"""
from __future__ import annotations
import asyncio
import re
from telegram import Update
from telegram.ext import ContextTypes
from src.utils.logger import get_logger
from src.core import RpcError
from src.aria2.rpc import DownloadTask, _format_size
from src.telegram.keyboards import (
build_list_type_keyboard,
build_after_add_keyboard,
)
from .base import _get_user_info, _validate_download_url
# 匹配 HTTP/HTTPS 链接和磁力链接的正则表达式
URL_PATTERN = re.compile(r'(https?://[^\s<>"]+|magnet:\?[^\s<>"]+)')
logger = get_logger("handlers.download")
class DownloadHandlersMixin:
"""下载管理命令 Mixin"""
async def add_download(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/add <url> - 添加下载任务"""
logger.info(f"收到 /add 命令 - {_get_user_info(update)}")
if not context.args:
await self._reply(update, context, "用法: /add <URL>\n支持 HTTP/HTTPS/磁力链接")
return
url = context.args[0]
# 验证 URL 格式
is_valid, error_msg = _validate_download_url(url)
if not is_valid:
await self._reply(update, context, f"❌ URL 无效: {error_msg}")
return
try:
rpc = self._get_rpc_client()
gid = await rpc.add_uri(url)
task = await rpc.get_status(gid)
# 转义文件名中的 Markdown 特殊字符
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
text = f"✅ 任务已添加\n📄 {safe_name}\n🆔 GID: `{gid}`"
keyboard = build_after_add_keyboard(gid)
await self._reply(update, context, text, parse_mode="Markdown", reply_markup=keyboard)
logger.info(f"/add 命令执行成功, GID={gid} - {_get_user_info(update)}")
# 启动下载监控,完成或失败时通知用户
chat_id = update.effective_chat.id
asyncio.create_task(self._start_download_monitor(gid, chat_id))
except RpcError as e:
logger.error(f"/add 命令执行失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 添加失败: {e}")
async def handle_torrent(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理用户发送的种子文件"""
logger.info(f"收到种子文件 - {_get_user_info(update)}")
document = update.message.document
if not document or not document.file_name.endswith(".torrent"):
return
try:
file = await context.bot.get_file(document.file_id)
torrent_data = await file.download_as_bytearray()
rpc = self._get_rpc_client()
gid = await rpc.add_torrent(bytes(torrent_data))
task = await rpc.get_status(gid)
# 转义文件名中的 Markdown 特殊字符
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
text = f"✅ 种子任务已添加\n📄 {safe_name}\n🆔 GID: `{gid}`"
keyboard = build_after_add_keyboard(gid)
await self._reply(update, context, text, parse_mode="Markdown", reply_markup=keyboard)
logger.info(f"种子任务添加成功, GID={gid} - {_get_user_info(update)}")
# 启动下载监控,完成或失败时通知用户
chat_id = update.effective_chat.id
asyncio.create_task(self._start_download_monitor(gid, chat_id))
except RpcError as e:
logger.error(f"种子任务添加失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 添加种子失败: {e}")
async def handle_url_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理用户直接发送的链接消息HTTP/HTTPS/磁力链接)"""
text = update.message.text or ""
urls = URL_PATTERN.findall(text)
if not urls:
return
logger.info(f"收到链接消息,提取到 {len(urls)} 个链接 - {_get_user_info(update)}")
chat_id = update.effective_chat.id
rpc = self._get_rpc_client()
for url in urls:
# 验证 URL 格式
is_valid, error_msg = _validate_download_url(url)
if not is_valid:
await self._reply(update, context, f"❌ URL 无效: {error_msg}\n{url[:50]}...")
continue
try:
gid = await rpc.add_uri(url)
task = await rpc.get_status(gid)
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
reply_text = f"✅ 任务已添加\n📄 {safe_name}\n🆔 GID: `{gid}`"
keyboard = build_after_add_keyboard(gid)
await self._reply(update, context, reply_text, parse_mode="Markdown", reply_markup=keyboard)
logger.info(f"链接任务添加成功, GID={gid} - {_get_user_info(update)}")
asyncio.create_task(self._start_download_monitor(gid, chat_id))
except RpcError as e:
logger.error(f"链接任务添加失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 添加失败: {e}")
async def list_downloads(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/list - 查看下载列表"""
logger.info(f"收到 /list 命令 - {_get_user_info(update)}")
try:
rpc = self._get_rpc_client()
stat = await rpc.get_global_stat()
active_count = int(stat.get("numActive", 0))
waiting_count = int(stat.get("numWaiting", 0))
stopped_count = int(stat.get("numStopped", 0))
keyboard = build_list_type_keyboard(active_count, waiting_count, stopped_count)
await self._reply(update, context, "📥 选择查看类型:", reply_markup=keyboard)
except RpcError as e:
logger.error(f"/list 命令执行失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 获取列表失败: {e}")
async def global_stats(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/stats - 全局下载统计"""
logger.info(f"收到 /stats 命令 - {_get_user_info(update)}")
try:
rpc = self._get_rpc_client()
stat = await rpc.get_global_stat()
text = (
"📊 *全局统计*\n"
f"⬇️ 下载速度: {_format_size(int(stat.get('downloadSpeed', 0)))}/s\n"
f"⬆️ 上传速度: {_format_size(int(stat.get('uploadSpeed', 0)))}/s\n"
f"▶️ 活动任务: {stat.get('numActive', 0)}\n"
f"⏳ 等待任务: {stat.get('numWaiting', 0)}\n"
f"⏹️ 已停止: {stat.get('numStopped', 0)}"
)
await self._reply(update, context, text, parse_mode="Markdown")
except RpcError as e:
logger.error(f"/stats 命令执行失败: {e} - {_get_user_info(update)}")
await self._reply(update, context, f"❌ 获取统计失败: {e}")
# === 下载任务监控和通知 ===
async def _start_download_monitor(self, gid: str, chat_id: int) -> None:
"""启动下载任务监控"""
if gid in self._download_monitors:
return
task = asyncio.create_task(self._monitor_download(gid, chat_id))
self._download_monitors[gid] = task
async def _monitor_download(self, gid: str, chat_id: int) -> None:
"""监控下载任务直到完成或失败"""
from .app_ref import get_bot_instance
try:
rpc = self._get_rpc_client()
for _ in range(17280): # 最长 24 小时 (5秒 * 17280)
try:
task = await rpc.get_status(gid)
except RpcError:
break # 任务可能已被删除
if task.status == "complete":
if gid not in self._notified_gids:
self._notified_gids.add(gid)
await self._send_completion_notification(chat_id, task)
break
elif task.status == "error":
if gid not in self._notified_gids:
self._notified_gids.add(gid)
await self._send_error_notification(chat_id, task)
break
elif task.status == "removed":
break
await asyncio.sleep(5)
finally:
self._download_monitors.pop(gid, None)
async def _send_completion_notification(self, chat_id: int, task: DownloadTask) -> None:
"""发送下载完成通知"""
from .app_ref import get_bot_instance
_bot_instance = get_bot_instance()
if _bot_instance is None:
return
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
text = f"✅ *下载完成*\n📄 {safe_name}\n📦 大小: {task.size_str}\n🆔 GID: `{task.gid}`"
try:
await _bot_instance.send_message(chat_id=chat_id, text=text, parse_mode="Markdown")
# 注意:自动上传已在 _auto_refresh_task 中通过 _coordinated_auto_upload 处理
# 这里不再单独触发,避免重复上传
except Exception as e:
logger.warning(f"发送完成通知失败 (GID={task.gid}): {e}")
async def _send_error_notification(self, chat_id: int, task: DownloadTask) -> None:
"""发送下载失败通知"""
from .app_ref import get_bot_instance
_bot_instance = get_bot_instance()
if _bot_instance is None:
return
safe_name = task.name.replace("_", "\\_").replace("*", "\\*").replace("`", "\\`")
text = f"❌ *下载失败*\n📄 {safe_name}\n🆔 GID: `{task.gid}`\n⚠️ 原因: {task.error_message or '未知错误'}"
try:
await _bot_instance.send_message(chat_id=chat_id, text=text, parse_mode="Markdown")
except Exception as e:
logger.warning(f"发送失败通知失败 (GID={task.gid}): {e}")

View File

@@ -0,0 +1,287 @@
"""服务管理命令处理。"""
from __future__ import annotations
from telegram import Update
from telegram.ext import ContextTypes
from src.utils.logger import get_logger
from src.core import (
Aria2Error,
NotInstalledError,
ServiceError,
DownloadError,
ConfigError,
is_aria2_installed,
get_aria2_version,
generate_rpc_secret,
)
from src.telegram.keyboards import build_main_reply_keyboard
from .base import _get_user_info
logger = get_logger("handlers.service")
class ServiceHandlersMixin:
"""服务管理命令 Mixin"""
async def install(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /install 命令 - {_get_user_info(update)}")
if is_aria2_installed():
await self._reply(
update, context, "aria2 已安装,无需重复安装。如需重新安装,请先运行 /uninstall"
)
return
await self._reply(update, context, "正在安装 aria2处理中请稍候...")
try:
result = await self.installer.install()
version = get_aria2_version() or result.get("version") or "未知"
rpc_secret = self._get_rpc_secret() or "未设置"
rpc_port = self._get_rpc_port() or self.config.rpc_port
await self._reply(
update,
context,
"\n".join(
[
"安装完成 ✅",
f"版本:{version}",
f"二进制:{result.get('binary')}",
f"配置目录:{result.get('config_dir')}",
f"配置文件:{result.get('config')}",
f"RPC 端口:{rpc_port}",
f"RPC 密钥:{rpc_secret[:4]}****{rpc_secret[-4:] if len(rpc_secret) > 8 else '****'}",
]
),
)
logger.info(f"/install 命令执行成功 - {_get_user_info(update)}")
except (DownloadError, ConfigError, Aria2Error) as exc:
logger.error(f"/install 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"安装失败:{exc}")
except Exception as exc: # noqa: BLE001
logger.error(f"/install 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"安装失败,发生未知错误:{exc}")
async def uninstall(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /uninstall 命令 - {_get_user_info(update)}")
if not is_aria2_installed():
await self._reply(update, context, "aria2 未安装,无需卸载")
return
await self._reply(update, context, "正在卸载 aria2处理中请稍候...")
try:
try:
self.service.stop()
except ServiceError:
pass
self.installer.uninstall()
await self._reply(update, context, "卸载完成 ✅")
logger.info(f"/uninstall 命令执行成功 - {_get_user_info(update)}")
except Aria2Error as exc:
logger.error(f"/uninstall 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"卸载失败:{exc}")
except Exception as exc: # noqa: BLE001
logger.error(f"/uninstall 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"卸载失败,发生未知错误:{exc}")
async def start_service(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /start 命令 - {_get_user_info(update)}")
try:
if not is_aria2_installed():
logger.info(f"/start 命令: aria2 未安装 - {_get_user_info(update)}")
await self._reply(update, context, "aria2 未安装,请先运行 /install")
return
self.service.start()
await self._reply(update, context, "aria2 服务已启动 ✅")
logger.info(f"/start 命令执行成功 - {_get_user_info(update)}")
except NotInstalledError:
logger.info(f"/start 命令: aria2 未安装 - {_get_user_info(update)}")
await self._reply(update, context, "aria2 未安装,请先运行 /install")
except ServiceError as exc:
logger.error(f"/start 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"启动失败:{exc}")
except Exception as exc: # noqa: BLE001
logger.error(f"/start 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"启动失败,发生未知错误:{exc}")
async def stop_service(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /stop 命令 - {_get_user_info(update)}")
try:
self.service.stop()
await self._reply(update, context, "aria2 服务已停止 ✅")
logger.info(f"/stop 命令执行成功 - {_get_user_info(update)}")
except ServiceError as exc:
logger.error(f"/stop 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"停止失败:{exc}")
except Exception as exc: # noqa: BLE001
logger.error(f"/stop 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"停止失败,发生未知错误:{exc}")
async def restart_service(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /restart 命令 - {_get_user_info(update)}")
try:
self.service.restart()
await self._reply(update, context, "aria2 服务已重启 ✅")
logger.info(f"/restart 命令执行成功 - {_get_user_info(update)}")
except ServiceError as exc:
logger.error(f"/restart 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"重启失败:{exc}")
except Exception as exc: # noqa: BLE001
logger.error(f"/restart 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"重启失败,发生未知错误:{exc}")
async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /status 命令 - {_get_user_info(update)}")
try:
info = self.service.status()
version = get_aria2_version() or "未知"
rpc_secret = self._get_rpc_secret() or "未设置"
rpc_port = self._get_rpc_port() or self.config.rpc_port or "未知"
except ServiceError as exc:
logger.error(f"/status 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"获取状态失败:{exc}")
return
except Exception as exc: # noqa: BLE001
logger.error(f"/status 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"获取状态失败,发生未知错误:{exc}")
return
text = (
"*Aria2 状态*\n"
f"- 安装状态:{'已安装 ✅' if info.get('installed') or is_aria2_installed() else '未安装 ❌'}\n"
f"- 运行状态:{'运行中 ✅' if info.get('running') else '未运行 ❌'}\n"
f"- PID`{info.get('pid') or 'N/A'}`\n"
f"- 版本:`{version}`\n"
f"- RPC 端口:`{rpc_port}`\n"
f"- RPC 密钥:`{rpc_secret[:4]}****{rpc_secret[-4:] if len(rpc_secret) > 8 else '****'}`"
)
await self._reply(update, context, text, parse_mode="Markdown")
logger.info(f"/status 命令执行成功 - {_get_user_info(update)}")
async def view_logs(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /logs 命令 - {_get_user_info(update)}")
try:
logs = self.service.view_log(lines=30)
except ServiceError as exc:
logger.error(f"/logs 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"读取日志失败:{exc}")
return
except Exception as exc: # noqa: BLE001
logger.error(f"/logs 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"读取日志失败,发生未知错误:{exc}")
return
if not logs.strip():
await self._reply(update, context, "暂无日志内容。")
logger.info(f"/logs 命令执行成功(无日志) - {_get_user_info(update)}")
return
await self._reply(update, context, f"最近 30 行日志:\n{logs}")
logger.info(f"/logs 命令执行成功 - {_get_user_info(update)}")
async def clear_logs(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /clear_logs 命令 - {_get_user_info(update)}")
try:
self.service.clear_log()
await self._reply(update, context, "日志已清空 ✅")
logger.info(f"/clear_logs 命令执行成功 - {_get_user_info(update)}")
except ServiceError as exc:
logger.error(f"/clear_logs 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"清空日志失败:{exc}")
except Exception as exc: # noqa: BLE001
logger.error(f"/clear_logs 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"清空日志失败,发生未知错误:{exc}")
async def set_secret(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""设置自定义 RPC 密钥"""
logger.info(f"收到 /set_secret 命令 - {_get_user_info(update)}")
if not context.args or len(context.args) != 1:
await self._reply(update, context, "用法: /set_secret <密钥>\n密钥长度需为 16 位")
return
new_secret = context.args[0]
if len(new_secret) != 16:
await self._reply(update, context, "密钥长度需为 16 位")
return
try:
self.service.update_rpc_secret(new_secret)
self.config.rpc_secret = new_secret
self.service.restart()
await self._reply(
update,
context,
f"RPC 密钥已更新并重启服务 ✅\n新密钥: `{new_secret[:4]}****{new_secret[-4:]}`",
parse_mode="Markdown",
)
logger.info(f"/set_secret 命令执行成功 - {_get_user_info(update)}")
except ConfigError as exc:
logger.error(f"/set_secret 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"设置密钥失败:{exc}")
except ServiceError as exc:
logger.error(f"/set_secret 命令执行失败(重启服务): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"密钥已更新但重启服务失败:{exc}")
except Exception as exc: # noqa: BLE001
logger.error(f"/set_secret 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"设置密钥失败,发生未知错误:{exc}")
async def reset_secret(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""重新生成随机 RPC 密钥"""
logger.info(f"收到 /reset_secret 命令 - {_get_user_info(update)}")
try:
new_secret = generate_rpc_secret()
self.service.update_rpc_secret(new_secret)
self.config.rpc_secret = new_secret
self.service.restart()
await self._reply(
update,
context,
f"RPC 密钥已重新生成并重启服务 ✅\n新密钥: `{new_secret[:4]}****{new_secret[-4:]}`",
parse_mode="Markdown",
)
logger.info(f"/reset_secret 命令执行成功 - {_get_user_info(update)}")
except ConfigError as exc:
logger.error(f"/reset_secret 命令执行失败: {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"重置密钥失败:{exc}")
except ServiceError as exc:
logger.error(f"/reset_secret 命令执行失败(重启服务): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"密钥已更新但重启服务失败:{exc}")
except Exception as exc: # noqa: BLE001
logger.error(f"/reset_secret 命令执行失败(未知错误): {exc} - {_get_user_info(update)}")
await self._reply(update, context, f"重置密钥失败,发生未知错误:{exc}")
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"收到 /help 命令 - {_get_user_info(update)}")
commands = [
"*服务管理*",
"/install - 安装 aria2",
"/uninstall - 卸载 aria2",
"/start - 启动 aria2 服务",
"/stop - 停止 aria2 服务",
"/restart - 重启 aria2 服务",
"/status - 查看 aria2 状态",
"/logs - 查看最近日志",
"/clear\\_logs - 清空日志",
"/set\\_secret <密钥> - 设置 RPC 密钥",
"/reset\\_secret - 重新生成 RPC 密钥",
"",
"*下载管理*",
"/add <URL> - 添加下载任务",
"/list - 查看下载列表",
"/stats - 全局下载统计",
"",
"*云存储*",
"/cloud - 云存储管理菜单",
"",
"/menu - 显示快捷菜单",
"/help - 显示此帮助",
]
await self._reply(update, context, "可用命令:\n" + "\n".join(commands), parse_mode="Markdown")
async def menu_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 /menu 命令,显示 Reply Keyboard 主菜单"""
logger.info(f"收到 /menu 命令 - {_get_user_info(update)}")
keyboard = build_main_reply_keyboard()
await self._reply(
update,
context,
"📋 *快捷菜单*\n\n使用下方按钮快速操作,或输入命令:\n/add <URL> - 添加下载任务",
parse_mode="Markdown",
reply_markup=keyboard,
)

View File

@@ -121,14 +121,10 @@ def build_main_reply_keyboard() -> ReplyKeyboardMarkup:
def build_cloud_menu_keyboard() -> InlineKeyboardMarkup: def build_cloud_menu_keyboard() -> InlineKeyboardMarkup:
"""构建云存储管理菜单""" """构建云存储主菜单 - 选择配置哪个云存储"""
return InlineKeyboardMarkup([ return InlineKeyboardMarkup([
[InlineKeyboardButton("🔐 OneDrive 认证", callback_data="cloud:auth:onedrive")], [InlineKeyboardButton("☁️ OneDrive 设置", callback_data="cloud:onedrive:menu")],
[ [InlineKeyboardButton("📢 Telegram 频道设置", callback_data="cloud:telegram:menu")],
InlineKeyboardButton("📊 状态", callback_data="cloud:status"),
InlineKeyboardButton("⚙️ 设置", callback_data="cloud:settings"),
],
[InlineKeyboardButton("🚪 登出", callback_data="cloud:logout")],
]) ])
@@ -141,16 +137,54 @@ def build_upload_choice_keyboard(gid: str) -> InlineKeyboardMarkup:
def build_cloud_settings_keyboard(auto_upload: bool, delete_after: bool) -> InlineKeyboardMarkup: def build_cloud_settings_keyboard(auto_upload: bool, delete_after: bool) -> InlineKeyboardMarkup:
"""构建云存储设置键盘""" """构建 OneDrive 设置键盘"""
auto_text = "✅ 自动上传" if auto_upload else "❌ 自动上传" auto_text = "✅ 自动上传" if auto_upload else "❌ 自动上传"
delete_text = "✅ 上传后删除" if delete_after else "❌ 上传后删除" delete_text = "✅ 上传后删除" if delete_after else "❌ 上传后删除"
return InlineKeyboardMarkup([ return InlineKeyboardMarkup([
[InlineKeyboardButton(auto_text, callback_data="cloud:toggle:auto_upload")], [InlineKeyboardButton(auto_text, callback_data="cloud:onedrive:toggle:auto_upload")],
[InlineKeyboardButton(delete_text, callback_data="cloud:toggle:delete_after")], [InlineKeyboardButton(delete_text, callback_data="cloud:onedrive:toggle:delete_after")],
[InlineKeyboardButton("🔙 返回", callback_data="cloud:menu")], [InlineKeyboardButton("🔙 返回", callback_data="cloud:menu")],
]) ])
def build_onedrive_menu_keyboard() -> InlineKeyboardMarkup:
"""构建 OneDrive 菜单键盘"""
return InlineKeyboardMarkup([
[InlineKeyboardButton("🔐 认证", callback_data="cloud:onedrive:auth")],
[
InlineKeyboardButton("📊 状态", callback_data="cloud:onedrive:status"),
InlineKeyboardButton("⚙️ 设置", callback_data="cloud:onedrive:settings"),
],
[InlineKeyboardButton("🚪 登出", callback_data="cloud:onedrive:logout")],
[InlineKeyboardButton("🔙 返回", callback_data="cloud:menu")],
])
def build_telegram_channel_menu_keyboard(config_enabled: bool, channel_id: str) -> InlineKeyboardMarkup:
"""构建 Telegram 频道菜单键盘"""
status_text = f"📢 频道: {channel_id}" if channel_id else "📢 频道: 未设置"
enabled_text = "✅ 已启用" if config_enabled else "❌ 未启用"
return InlineKeyboardMarkup([
[InlineKeyboardButton(status_text, callback_data="cloud:telegram:info")],
[InlineKeyboardButton(enabled_text, callback_data="cloud:telegram:toggle:enabled")],
[InlineKeyboardButton("⚙️ 设置", callback_data="cloud:telegram:settings")],
[InlineKeyboardButton("🔙 返回", callback_data="cloud:menu")],
])
def build_telegram_channel_settings_keyboard(auto_upload: bool, delete_after: bool, channel_id: str) -> InlineKeyboardMarkup:
"""构建 Telegram 频道设置键盘"""
auto_text = "✅ 自动上传" if auto_upload else "❌ 自动上传"
delete_text = "✅ 上传后删除" if delete_after else "❌ 上传后删除"
channel_text = f"📝 频道ID: {channel_id}" if channel_id else "📝 设置频道ID"
return InlineKeyboardMarkup([
[InlineKeyboardButton(channel_text, callback_data="cloud:telegram:set_channel")],
[InlineKeyboardButton(auto_text, callback_data="cloud:telegram:toggle:auto_upload")],
[InlineKeyboardButton(delete_text, callback_data="cloud:telegram:toggle:delete_after")],
[InlineKeyboardButton("🔙 返回", callback_data="cloud:telegram:menu")],
])
def build_detail_keyboard_with_upload(gid: str, status: str, show_onedrive: bool = False, show_channel: bool = False) -> InlineKeyboardMarkup: def build_detail_keyboard_with_upload(gid: str, status: str, show_onedrive: bool = False, show_channel: bool = False) -> InlineKeyboardMarkup:
"""构建详情页面的操作按钮(含上传选项)""" """构建详情页面的操作按钮(含上传选项)"""
buttons = [] buttons = []