feat: 增加tg文件上传

This commit is contained in:
dnslin
2025-12-13 12:03:53 +08:00
parent debce7c476
commit f223083998
7 changed files with 274 additions and 12 deletions

View File

@@ -34,3 +34,17 @@ ONEDRIVE_DELETE_AFTER_UPLOAD=false
# OneDrive 远程存储路径 # OneDrive 远程存储路径
ONEDRIVE_REMOTE_PATH=/aria2bot ONEDRIVE_REMOTE_PATH=/aria2bot
# ==================== Telegram 频道存储配置 ====================
# 启用 Telegram 频道存储功能
# Bot 必须是频道管理员且有发送消息权限
TELEGRAM_CHANNEL_ENABLED=false
# 频道 ID数字 ID 如 -1001234567890或 @username 格式)
TELEGRAM_CHANNEL_ID=
# 下载完成后自动发送到频道
TELEGRAM_CHANNEL_AUTO_UPLOAD=false
# 发送后删除本地文件
TELEGRAM_CHANNEL_DELETE_AFTER_UPLOAD=false

View File

@@ -8,10 +8,10 @@ You must communicate in Chinese and logs and comments must also be in Chinese, i
```bash ```bash
# Install dependencies # Install dependencies
pip install -e . uv pip install -e .
# Run the bot # Run the bot
python main.py uv run main.py
# or after install: # or after install:
aria2bot aria2bot
``` ```

View File

@@ -0,0 +1,82 @@
"""Telegram 频道存储客户端"""
from __future__ import annotations
import asyncio
from pathlib import Path
from telegram import Bot
from src.core.config import TelegramChannelConfig
from src.utils.logger import get_logger
logger = get_logger("telegram_channel")
# 文件大小限制
STANDARD_LIMIT = 50 * 1024 * 1024 # 50MB
LOCAL_API_LIMIT = 2 * 1024 * 1024 * 1024 # 2GB
# 重试配置
MAX_RETRIES = 3
RETRY_DELAY = 5 # 秒
class TelegramChannelClient:
"""Telegram 频道上传客户端"""
def __init__(self, config: TelegramChannelConfig, bot: Bot, is_local_api: bool = False):
self.config = config
self.bot = bot
self.max_size = LOCAL_API_LIMIT if is_local_api else STANDARD_LIMIT
def get_max_size(self) -> int:
"""获取最大文件大小限制"""
return self.max_size
def get_max_size_mb(self) -> int:
"""获取最大文件大小限制MB"""
return self.max_size // (1024 * 1024)
async def upload_file(self, local_path: Path) -> tuple[bool, str]:
"""上传文件到频道
Args:
local_path: 本地文件路径
Returns:
tuple[bool, str]: (成功与否, file_id 或错误信息)
"""
if not local_path.exists():
return False, "文件不存在"
file_size = local_path.stat().st_size
if file_size > self.max_size:
limit_mb = self.get_max_size_mb()
return False, f"文件超过 {limit_mb}MB 限制"
if not self.config.channel_id:
return False, "频道 ID 未配置"
last_error = None
for attempt in range(MAX_RETRIES):
try:
with open(local_path, "rb") as f:
message = await self.bot.send_document(
chat_id=self.config.channel_id,
document=f,
filename=local_path.name,
caption=f"📁 {local_path.name}",
read_timeout=300,
write_timeout=300,
connect_timeout=30,
)
file_id = message.document.file_id
logger.info(f"文件上传成功: {local_path.name}, file_id={file_id}")
return True, file_id
except Exception as e:
last_error = e
logger.warning(f"上传失败 (尝试 {attempt + 1}/{MAX_RETRIES}): {e}")
if attempt < MAX_RETRIES - 1:
await asyncio.sleep(RETRY_DELAY)
logger.error(f"上传到频道失败: {last_error}")
return False, str(last_error)

View File

@@ -29,6 +29,15 @@ class OneDriveConfig:
remote_path: str = "/aria2bot" remote_path: str = "/aria2bot"
@dataclass
class TelegramChannelConfig:
"""Telegram 频道存储配置"""
enabled: bool = False
channel_id: str = "" # 频道 ID 或 @username
auto_upload: bool = False
delete_after_upload: bool = False
@dataclass @dataclass
class BotConfig: class BotConfig:
token: str = "" token: str = ""
@@ -36,6 +45,7 @@ class BotConfig:
allowed_users: set[int] = field(default_factory=set) allowed_users: set[int] = field(default_factory=set)
aria2: Aria2Config = field(default_factory=Aria2Config) aria2: Aria2Config = field(default_factory=Aria2Config)
onedrive: OneDriveConfig = field(default_factory=OneDriveConfig) onedrive: OneDriveConfig = field(default_factory=OneDriveConfig)
telegram_channel: TelegramChannelConfig = field(default_factory=TelegramChannelConfig)
@classmethod @classmethod
def from_env(cls) -> "BotConfig": def from_env(cls) -> "BotConfig":
@@ -85,10 +95,19 @@ class BotConfig:
remote_path=os.environ.get("ONEDRIVE_REMOTE_PATH", "/aria2bot"), remote_path=os.environ.get("ONEDRIVE_REMOTE_PATH", "/aria2bot"),
) )
# 解析 Telegram 频道存储配置
telegram_channel = TelegramChannelConfig(
enabled=os.environ.get("TELEGRAM_CHANNEL_ENABLED", "").lower() == "true",
channel_id=os.environ.get("TELEGRAM_CHANNEL_ID", ""),
auto_upload=os.environ.get("TELEGRAM_CHANNEL_AUTO_UPLOAD", "").lower() == "true",
delete_after_upload=os.environ.get("TELEGRAM_CHANNEL_DELETE_AFTER_UPLOAD", "").lower() == "true",
)
return cls( return cls(
token=token, token=token,
api_base_url=os.environ.get("TELEGRAM_API_BASE_URL", ""), api_base_url=os.environ.get("TELEGRAM_API_BASE_URL", ""),
allowed_users=allowed_users, allowed_users=allowed_users,
aria2=aria2, aria2=aria2,
onedrive=onedrive, onedrive=onedrive,
telegram_channel=telegram_channel,
) )

View File

@@ -51,7 +51,7 @@ def create_app(config: BotConfig) -> Application:
builder = builder.base_url(config.api_base_url).base_file_url(config.api_base_url + "/file") builder = builder.base_url(config.api_base_url).base_file_url(config.api_base_url + "/file")
app = builder.build() app = builder.build()
api = Aria2BotAPI(config.aria2, config.allowed_users, config.onedrive) api = Aria2BotAPI(config.aria2, config.allowed_users, config.onedrive, config.telegram_channel, config.api_base_url)
for handler in build_handlers(api): for handler in build_handlers(api):
app.add_handler(handler) app.add_handler(handler)

View File

@@ -22,7 +22,7 @@ from src.core import (
ARIA2_CONF, ARIA2_CONF,
DOWNLOAD_DIR, DOWNLOAD_DIR,
) )
from src.core.config import OneDriveConfig from src.core.config import OneDriveConfig, TelegramChannelConfig
from src.cloud.base import UploadProgress, UploadStatus from src.cloud.base import UploadProgress, UploadStatus
from src.aria2 import Aria2Installer, Aria2ServiceManager from src.aria2 import Aria2Installer, Aria2ServiceManager
from src.aria2.rpc import Aria2RpcClient, DownloadTask, _format_size from src.aria2.rpc import Aria2RpcClient, DownloadTask, _format_size
@@ -88,7 +88,9 @@ from functools import wraps
class Aria2BotAPI: class Aria2BotAPI:
def __init__(self, config: Aria2Config | None = None, allowed_users: set[int] | None = None, def __init__(self, config: Aria2Config | None = None, allowed_users: set[int] | None = None,
onedrive_config: OneDriveConfig | None = None): onedrive_config: OneDriveConfig | None = None,
telegram_channel_config: TelegramChannelConfig | None = None,
api_base_url: str = ""):
self.config = config or Aria2Config() self.config = config or Aria2Config()
self.allowed_users = allowed_users or set() self.allowed_users = allowed_users or set()
self.installer = Aria2Installer(self.config) self.installer = Aria2Installer(self.config)
@@ -102,6 +104,11 @@ class Aria2BotAPI:
self._onedrive_config = onedrive_config self._onedrive_config = onedrive_config
self._onedrive = None self._onedrive = None
self._pending_auth: dict[int, dict] = {} # user_id -> flow self._pending_auth: dict[int, dict] = {} # user_id -> flow
# Telegram 频道存储
self._telegram_channel_config = telegram_channel_config
self._telegram_channel = None
self._api_base_url = api_base_url
self._channel_uploaded_gids: set[str] = set() # 已上传到频道的 GID
async def _check_permission(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: async def _check_permission(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
"""检查用户权限,返回 True 表示有权限""" """检查用户权限,返回 True 表示有权限"""
@@ -132,6 +139,14 @@ class Aria2BotAPI:
self._onedrive = OneDriveClient(self._onedrive_config) self._onedrive = OneDriveClient(self._onedrive_config)
return self._onedrive return self._onedrive
def _get_telegram_channel_client(self, bot):
"""获取或创建 Telegram 频道客户端"""
if self._telegram_channel is None and self._telegram_channel_config and self._telegram_channel_config.enabled:
from src.cloud.telegram_channel import TelegramChannelClient
is_local_api = bool(self._api_base_url)
self._telegram_channel = TelegramChannelClient(self._telegram_channel_config, bot, is_local_api)
return self._telegram_channel
async def _reply(self, update: Update, context: ContextTypes.DEFAULT_TYPE, text: str, **kwargs): async def _reply(self, update: Update, context: ContextTypes.DEFAULT_TYPE, text: str, **kwargs):
if update.effective_message: if update.effective_message:
return await update.effective_message.reply_text(text, **kwargs) return await update.effective_message.reply_text(text, **kwargs)
@@ -740,6 +755,79 @@ class Aria2BotAPI:
except Exception: except Exception:
pass pass
async def _trigger_channel_auto_upload(self, chat_id: int, gid: str, bot) -> None:
"""触发频道自动上传"""
from pathlib import Path
logger.info(f"触发频道自动上传 GID={gid}")
client = self._get_telegram_channel_client(bot)
if not client:
logger.warning(f"频道上传跳过:频道未配置 GID={gid}")
return
rpc = self._get_rpc_client()
try:
task = await rpc.get_status(gid)
except RpcError as e:
logger.error(f"频道上传失败:获取任务信息失败 GID={gid}: {e}")
return
if task.status != "complete":
return
local_path = Path(task.dir) / task.name
if not local_path.exists():
logger.error(f"频道上传失败:本地文件不存在 GID={gid}")
return
# 检查文件大小
file_size = local_path.stat().st_size
if file_size > client.get_max_size():
limit_mb = client.get_max_size_mb()
await bot.send_message(
chat_id=chat_id,
text=f"⚠️ 文件 {task.name} 超过 {limit_mb}MB 限制,跳过频道上传"
)
return
asyncio.create_task(self._do_channel_upload(client, local_path, task.name, chat_id, gid, bot))
async def _do_channel_upload(self, client, local_path, task_name: str, chat_id: int, gid: str, bot) -> None:
"""执行频道上传"""
import shutil
try:
msg = await bot.send_message(chat_id=chat_id, text=f"📢 正在发送到频道: {task_name}")
except Exception as e:
logger.error(f"频道上传失败:发送消息失败 GID={gid}: {e}")
return
try:
success, result = await client.upload_file(local_path)
if success:
result_text = f"✅ 已发送到频道: {task_name}"
if self._telegram_channel_config and self._telegram_channel_config.delete_after_upload:
try:
if local_path.is_dir():
shutil.rmtree(local_path)
else:
local_path.unlink()
result_text += "\n🗑️ 本地文件已删除"
except Exception as e:
result_text += f"\n⚠️ 删除本地文件失败: {e}"
await msg.edit_text(result_text)
logger.info(f"频道上传成功 GID={gid}")
else:
await msg.edit_text(f"❌ 发送到频道失败: {task_name}\n原因: {result}")
logger.error(f"频道上传失败 GID={gid}: {result}")
except Exception as e:
logger.error(f"频道上传异常 GID={gid}: {e}")
try:
await msg.edit_text(f"❌ 发送到频道失败: {task_name}\n错误: {e}")
except Exception:
pass
async def handle_button_text(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def handle_button_text(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理 Reply Keyboard 按钮点击""" """处理 Reply Keyboard 按钮点击"""
text = update.message.text text = update.message.text
@@ -1097,13 +1185,18 @@ class Aria2BotAPI:
if task.error_message: if task.error_message:
text += f"\n❌ 错误: {task.error_message}" text += f"\n❌ 错误: {task.error_message}"
# 检查是否显示上传按钮(任务完成且云存储已配置) # 检查是否显示上传按钮
show_upload = ( show_onedrive = (
task.status == "complete" and task.status == "complete" and
self._onedrive_config and self._onedrive_config and
self._onedrive_config.enabled self._onedrive_config.enabled
) )
keyboard = build_detail_keyboard_with_upload(gid, task.status, show_upload) show_channel = (
task.status == "complete" and
self._telegram_channel_config and
self._telegram_channel_config.enabled
)
keyboard = build_detail_keyboard_with_upload(gid, task.status, show_onedrive, show_channel)
# 只有内容变化时才更新 # 只有内容变化时才更新
if text != last_text: if text != last_text:
@@ -1176,6 +1269,13 @@ class Aria2BotAPI:
text = f"✅ *下载完成*\n📄 {safe_name}\n📦 大小: {task.size_str}\n🆔 GID: `{task.gid}`" text = f"✅ *下载完成*\n📄 {safe_name}\n📦 大小: {task.size_str}\n🆔 GID: `{task.gid}`"
try: try:
await _bot_instance.send_message(chat_id=chat_id, text=text, parse_mode="Markdown") await _bot_instance.send_message(chat_id=chat_id, text=text, parse_mode="Markdown")
# 触发频道自动上传
if (self._telegram_channel_config and
self._telegram_channel_config.enabled and
self._telegram_channel_config.auto_upload and
task.gid not in self._channel_uploaded_gids):
self._channel_uploaded_gids.add(task.gid)
asyncio.create_task(self._trigger_channel_auto_upload(chat_id, task.gid, _bot_instance))
except Exception as e: except Exception as e:
logger.warning(f"发送完成通知失败 (GID={task.gid}): {e}") logger.warning(f"发送完成通知失败 (GID={task.gid}): {e}")
@@ -1276,11 +1376,52 @@ class Aria2BotAPI:
await query.edit_message_text("❌ 无效操作") await query.edit_message_text("❌ 无效操作")
return return
provider = parts[1] # onedrive provider = parts[1] # onedrive / telegram
gid = parts[2] gid = parts[2]
if provider == "onedrive": if provider == "onedrive":
await self.upload_to_cloud(update, context, gid) await self.upload_to_cloud(update, context, gid)
elif provider == "telegram":
await self._upload_to_channel_manual(query, update, context, gid)
async def _upload_to_channel_manual(self, query, update: Update, context: ContextTypes.DEFAULT_TYPE, gid: str) -> None:
"""手动上传到频道"""
from pathlib import Path
client = self._get_telegram_channel_client(context.bot)
if not client:
await query.edit_message_text("❌ 频道存储未配置")
return
rpc = self._get_rpc_client()
try:
task = await rpc.get_status(gid)
except RpcError as e:
await query.edit_message_text(f"❌ 获取任务信息失败: {e}")
return
if task.status != "complete":
await query.edit_message_text("❌ 任务未完成,无法上传")
return
local_path = Path(task.dir) / task.name
if not local_path.exists():
await query.edit_message_text("❌ 本地文件不存在")
return
# 检查文件大小
file_size = local_path.stat().st_size
if file_size > client.get_max_size():
limit_mb = client.get_max_size_mb()
await query.edit_message_text(f"❌ 文件超过 {limit_mb}MB 限制")
return
await query.edit_message_text(f"📢 正在发送到频道: {task.name}")
success, result = await client.upload_file(local_path)
if success:
await query.edit_message_text(f"✅ 已发送到频道: {task.name}")
else:
await query.edit_message_text(f"❌ 发送失败: {result}")
def build_handlers(api: Aria2BotAPI) -> list: def build_handlers(api: Aria2BotAPI) -> list:

View File

@@ -151,7 +151,7 @@ def build_cloud_settings_keyboard(auto_upload: bool, delete_after: bool) -> Inli
]) ])
def build_detail_keyboard_with_upload(gid: str, status: str, show_upload: bool = False) -> InlineKeyboardMarkup: def build_detail_keyboard_with_upload(gid: str, status: str, show_onedrive: bool = False, show_channel: bool = False) -> InlineKeyboardMarkup:
"""构建详情页面的操作按钮(含上传选项)""" """构建详情页面的操作按钮(含上传选项)"""
buttons = [] buttons = []
@@ -165,8 +165,14 @@ def build_detail_keyboard_with_upload(gid: str, status: str, show_upload: bool =
rows = [buttons] rows = [buttons]
# 任务完成时显示上传按钮 # 任务完成时显示上传按钮
if show_upload and status == "complete": if status == "complete":
rows.append([InlineKeyboardButton("☁️ 上传到云盘", callback_data=f"upload:onedrive:{gid}")]) upload_buttons = []
if show_onedrive:
upload_buttons.append(InlineKeyboardButton("☁️ OneDrive", callback_data=f"upload:onedrive:{gid}"))
if show_channel:
upload_buttons.append(InlineKeyboardButton("📢 频道", callback_data=f"upload:telegram:{gid}"))
if upload_buttons:
rows.append(upload_buttons)
rows.append([ rows.append([
InlineKeyboardButton("🔄 刷新", callback_data=f"refresh:{gid}"), InlineKeyboardButton("🔄 刷新", callback_data=f"refresh:{gid}"),