fix: make category callbacks id-based and tighten ownership checks

This commit is contained in:
Xiaolan Bot
2026-02-22 01:26:24 +08:00
parent db8257fdde
commit 97bcee7258

View File

@@ -241,25 +241,17 @@ def format_frequency(unit, value) -> str:
return f"{value} {unit_map.get(unit, unit)}" return f"{value} {unit_map.get(unit, unit)}"
CATEGORY_CB_PREFIX = "list_subs_in_category_" CATEGORY_CB_PREFIX = "list_subs_in_category_id_"
EDITABLE_SUB_FIELDS = {'name', 'cost', 'currency', 'category', 'next_due', 'renewal_type', 'notes'} EDITABLE_SUB_FIELDS = {'name', 'cost', 'currency', 'category', 'next_due', 'renewal_type', 'notes'}
def _build_category_callback_data(category_name: str) -> str: def _build_category_callback_data(category_id: int) -> str:
"""Build callback_data within Telegram's 64-byte limit by falling back to a hash token.""" return f"{CATEGORY_CB_PREFIX}{category_id}"
candidate = f"{CATEGORY_CB_PREFIX}{category_name}"
if len(candidate.encode('utf-8')) <= 64:
return candidate
token = abs(hash(category_name)) % 100000000
return f"{CATEGORY_CB_PREFIX}h{token}"
def _parse_category_from_callback(data: str, context: CallbackContext) -> str | None: def _parse_category_id_from_callback(data: str) -> int | None:
payload = data.replace(CATEGORY_CB_PREFIX, '', 1) payload = data.replace(CATEGORY_CB_PREFIX, '', 1)
if payload.startswith('h') and payload[1:].isdigit(): return int(payload) if payload.isdigit() else None
mapping = context.user_data.get('category_cb_map', {})
return mapping.get(payload)
return payload
async def get_subs_list_keyboard(user_id: int, category_filter: str = None) -> InlineKeyboardMarkup: async def get_subs_list_keyboard(user_id: int, category_filter: str = None) -> InlineKeyboardMarkup:
@@ -727,7 +719,7 @@ async def list_categories(update: Update, context: CallbackContext):
user_id = update.effective_user.id user_id = update.effective_user.id
with get_db_connection() as conn: with get_db_connection() as conn:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT name FROM categories WHERE user_id = ? ORDER BY name", (user_id,)) cursor.execute("SELECT id, name FROM categories WHERE user_id = ? ORDER BY name", (user_id,))
categories = cursor.fetchall() categories = cursor.fetchall()
if not categories: if not categories:
if update.callback_query: if update.callback_query:
@@ -736,15 +728,10 @@ async def list_categories(update: Update, context: CallbackContext):
await update.message.reply_text("您还没有任何分类。") await update.message.reply_text("您还没有任何分类。")
return return
context.user_data['category_cb_map'] = {}
buttons = [] buttons = []
for cat in categories: for cat in categories:
cat_name = cat[0] cat_id, cat_name = cat[0], cat[1]
cb_data = _build_category_callback_data(cat_name) buttons.append(InlineKeyboardButton(cat_name, callback_data=_build_category_callback_data(cat_id)))
payload = cb_data.replace(CATEGORY_CB_PREFIX, '', 1)
if payload.startswith('h') and payload[1:].isdigit():
context.user_data['category_cb_map'][payload] = cat_name
buttons.append(InlineKeyboardButton(cat_name, callback_data=cb_data))
keyboard = [buttons[i:i + 2] for i in range(0, len(buttons), 2)] keyboard = [buttons[i:i + 2] for i in range(0, len(buttons), 2)]
keyboard.append([InlineKeyboardButton("查看全部订阅", callback_data="list_all_subs")]) keyboard.append([InlineKeyboardButton("查看全部订阅", callback_data="list_all_subs")])
@@ -795,11 +782,11 @@ async def show_subscription_view(update: Update, context: CallbackContext, sub_i
keyboard_buttons.insert(0, [InlineKeyboardButton("✅ 续费", callback_data=f'renewmanual_{sub_id}')]) keyboard_buttons.insert(0, [InlineKeyboardButton("✅ 续费", callback_data=f'renewmanual_{sub_id}')])
if 'list_subs_in_category' in context.user_data: if 'list_subs_in_category' in context.user_data:
cat_filter = context.user_data['list_subs_in_category'] cat_filter = context.user_data['list_subs_in_category']
back_cb = _build_category_callback_data(cat_filter) category_id = context.user_data.get('list_subs_in_category_id')
payload = back_cb.replace(CATEGORY_CB_PREFIX, '', 1) if category_id:
if payload.startswith('h') and payload[1:].isdigit(): back_cb = _build_category_callback_data(category_id)
category_cb_map = context.user_data.setdefault('category_cb_map', {}) else:
category_cb_map[payload] = cat_filter back_cb = 'list_categories'
keyboard_buttons.append([InlineKeyboardButton("« 返回分类订阅", callback_data=back_cb)]) keyboard_buttons.append([InlineKeyboardButton("« 返回分类订阅", callback_data=back_cb)])
else: else:
keyboard_buttons.append([InlineKeyboardButton("« 返回全部订阅", callback_data='list_all_subs')]) keyboard_buttons.append([InlineKeyboardButton("« 返回全部订阅", callback_data='list_all_subs')])
@@ -820,11 +807,23 @@ async def button_callback_handler(update: Update, context: CallbackContext):
logger.debug(f"Received callback query: {data} from user {user_id}") logger.debug(f"Received callback query: {data} from user {user_id}")
if data.startswith(CATEGORY_CB_PREFIX): if data.startswith(CATEGORY_CB_PREFIX):
category = _parse_category_from_callback(data, context) category_id = _parse_category_id_from_callback(data)
if not category: if not category_id:
await query.edit_message_text("错误:无效或已过期的分类,请重新选择。") await query.edit_message_text("错误:无效或已过期的分类,请重新选择。")
return return
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM categories WHERE id = ? AND user_id = ?", (category_id, user_id))
row = cursor.fetchone()
if not row:
await query.edit_message_text("错误:分类不存在或无权限。")
return
category = row['name']
context.user_data['list_subs_in_category'] = category context.user_data['list_subs_in_category'] = category
context.user_data['list_subs_in_category_id'] = category_id
keyboard = await get_subs_list_keyboard(user_id, category_filter=category) keyboard = await get_subs_list_keyboard(user_id, category_filter=category)
msg_text = f"分类“{escape_markdown(category, version=2)}”下的订阅:" msg_text = f"分类“{escape_markdown(category, version=2)}”下的订阅:"
if not keyboard: if not keyboard:
@@ -834,10 +833,12 @@ async def button_callback_handler(update: Update, context: CallbackContext):
return return
if data == 'list_categories': if data == 'list_categories':
context.user_data.pop('list_subs_in_category', None) context.user_data.pop('list_subs_in_category', None)
context.user_data.pop('list_subs_in_category_id', None)
await list_categories(update, context) await list_categories(update, context)
return return
if data == 'list_all_subs': if data == 'list_all_subs':
context.user_data.pop('list_subs_in_category', None) context.user_data.pop('list_subs_in_category', None)
context.user_data.pop('list_subs_in_category_id', None)
keyboard = await get_subs_list_keyboard(user_id) keyboard = await get_subs_list_keyboard(user_id)
if not keyboard: if not keyboard:
await query.edit_message_text("您还没有任何订阅。") await query.edit_message_text("您还没有任何订阅。")