TG-forward-videos

import asyncio
import os
import random
import re
import logging
import signal
from logging.handlers import RotatingFileHandler
from typing import List, Dict, Optional
import aiosqlite
from telethon import TelegramClient, events
from telethon.tl.types import (
    MessageMediaDocument, 
    DocumentAttributeVideo, 
    Message
)
from telethon.errors import (
    FloodWaitError, 
    ChatForwardsRestrictedError, 
    SecurityError, 
    rpcerrorlist
)
from dotenv import load_dotenv

# ==================== 日志设置 ====================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    handlers=[
        RotatingFileHandler("bot.log", maxBytes=5*1024*1024, backupCount=5, encoding="utf-8"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("TGForwardBot")

# ==================== 配置加载 ====================
load_dotenv()

API_ID = int(os.getenv("API_ID", 0))
API_HASH = os.getenv("API_HASH", "")
PHONE_NUMBER = os.getenv("PHONE_NUMBER", "")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD", "")

TARGET_CHANNEL = int(os.getenv("TARGET_CHANNEL", 0))
SOURCE_CHANNELS = [
    int(x.strip()) for x in os.getenv("SOURCE_CHANNELS", "").split(",") if x.strip()
]

# 转发频率控制
MIN_INTERVAL = float(os.getenv("MIN_INTERVAL", 2))
MAX_INTERVAL = float(os.getenv("MAX_INTERVAL", 5))
SCAN_INTERVAL = int(os.getenv("SCAN_INTERVAL", 60))

# 相册缓冲时间 (秒)
ALBUM_WAIT_TIME = 4.0

# 文本长度限制
MAX_CAPTION_LENGTH = int(os.getenv("MAX_CAPTION_LENGTH", 1024))

# 过滤配置
MIN_SIZE_MB = float(os.getenv("MIN_SIZE_MB", 0))
MAX_SIZE_MB = float(os.getenv("MAX_SIZE_MB", 0))
MIN_DURATION = int(os.getenv("MIN_DURATION", 0))
MAX_DURATION = int(os.getenv("MAX_DURATION", 0))

# ==================== 客户端初始化 ====================
client = TelegramClient(
    "user_session",
    API_ID,
    API_HASH,
    device_model="TGForwardBot",
    system_version="Pro",
    app_version="2.1"
)

# 转发队列: 存放 List[Message]
forward_queue = asyncio.Queue(maxsize=200)

# 相册缓冲区
pending_albums: Dict[int, List[Message]] = {}

# ==================== 异步数据库类 ====================
class AsyncDB:
    def __init__(self, path):
        self.path = path
        self.conn: Optional[aiosqlite.Connection] = None
        self.lock = asyncio.Lock()

    async def connect(self):
        self.conn = await aiosqlite.connect(self.path)
        await self.conn.execute("PRAGMA journal_mode=WAL;")
        await self.conn.execute("PRAGMA synchronous=NORMAL;")
        
        await self.conn.execute("""
            CREATE TABLE IF NOT EXISTS videos (
                video_key TEXT PRIMARY KEY,
                ts DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        await self.conn.execute("""
            CREATE TABLE IF NOT EXISTS progress (
                channel_id TEXT PRIMARY KEY,
                last_msg_id INTEGER
            )
        """)
        await self.conn.commit()

    async def close(self):
        if self.conn:
            await self.conn.close()

    async def seen(self, key):
        async with self.lock:
            async with self.conn.execute("SELECT 1 FROM videos WHERE video_key=?", (key,)) as cursor:
                return await cursor.fetchone() is not None

    async def mark(self, key):
        async with self.lock:
            await self.conn.execute("INSERT OR IGNORE INTO videos (video_key) VALUES (?)", (key,))
            await self.conn.commit()

    async def get_last(self, cid):
        async with self.lock:
            async with self.conn.execute("SELECT last_msg_id FROM progress WHERE channel_id=?", (str(cid),)) as cursor:
                r = await cursor.fetchone()
                return r[0] if r else 0

    async def set_last(self, cid, mid):
        async with self.lock:
            # 只有当新的 ID 比旧的大时才更新,防止乱序
            # 注意:SQLite 没有直接的 MAX UPDATE 语法,这里简化处理,依靠 Python 逻辑保证
            await self.conn.execute(
                "INSERT OR REPLACE INTO progress (channel_id, last_msg_id) VALUES (?, ?)",
                (str(cid), mid)
            )
            await self.conn.commit()

db: AsyncDB = None # type: ignore

# ==================== 工具函数 ====================

AD_PATTERNS = [
    r"https?://\S+",
    r"t\.me/\S+",
    r"@\w+",
    r"Via .*",
    r"\[.*?\]\(https?://.*?\)"
]

def clean_caption(text: str) -> str:
    if not text: return ""
    for p in AD_PATTERNS:
        text = re.sub(p, "", text, flags=re.I)
    return text.strip()[:MAX_CAPTION_LENGTH]

def get_unique_key(msg: Message) -> str:
    if hasattr(msg, 'media') and hasattr(msg.media, 'document'):
        return str(msg.media.document.id)
    # 对于没有文件的消息,使用 chat_id + message_id (虽然这里我们只处理视频)
    return f"msg_{msg.chat_id}_{msg.id}"

def is_video(msg: Message) -> bool:
    if not msg.media or not isinstance(msg.media, MessageMediaDocument):
        return False
    
    doc = msg.media.document
    if not doc.mime_type.startswith("video"):
        return False

    video_attr = next((a for a in doc.attributes if isinstance(a, DocumentAttributeVideo)), None)
    if not video_attr or getattr(video_attr, "round_message", False):
        return False

    if any(attr.to_dict().get('_') == 'DocumentAttributeSticker' for attr in doc.attributes):
        return False

    size_mb = doc.size / (1024 * 1024)
    if MIN_SIZE_MB > 0 and size_mb < MIN_SIZE_MB: return False
    if MAX_SIZE_MB > 0 and size_mb > MAX_SIZE_MB: return False

    duration = video_attr.duration
    if MIN_DURATION > 0 and duration < MIN_DURATION: return False
    if MAX_DURATION > 0 and duration > MAX_DURATION: return False

    return True

# ==================== 相册处理逻辑 ====================

async def process_album_later(grouped_id):
    await asyncio.sleep(ALBUM_WAIT_TIME)
    
    if grouped_id in pending_albums:
        messages = pending_albums.pop(grouped_id)
        if messages:
            messages.sort(key=lambda x: x.id)
            await forward_queue.put(messages)
            logger.info(f"📦 相册入队: {len(messages)} 个文件 (Group: {grouped_id})")

async def handle_incoming_message(message: Message):
    if message.grouped_id:
        gid = message.grouped_id
        is_new_group = gid not in pending_albums
        
        if is_new_group:
            pending_albums[gid] = []
            asyncio.create_task(process_album_later(gid))
        
        pending_albums[gid].append(message)
    else:
        await forward_queue.put([message])

# ==================== 转发 Worker ====================

async def forward_worker():
    logger.info("🔧 转发 Worker 已启动")
    
    while True:
        try:
            batch = await forward_queue.get()
            
            # 1. 过滤已存在的视频
            to_send = []
            for msg in batch:
                key = get_unique_key(msg)
                if not await db.seen(key):
                    to_send.append(msg)
            
            if not to_send:
                forward_queue.task_done()
                continue

            # 2. 处理 Caption (取第一条有字的,或者合并)
            caption = ""
            for msg in batch:
                if msg.text: # 使用 msg.text 兼容性更好
                    cleaned = clean_caption(msg.text)
                    if cleaned:
                        caption = cleaned
                        break
            
            # 3. 准备发送
            media_list = [m.media for m in to_send]
            logger.info(f"📤 正在发送 {len(media_list)} 个文件")
            
            # 使用 send_file 自动处理单个文件或相册列表
            await client.send_file(
                TARGET_CHANNEL,
                file=media_list,
                caption=caption,
                supports_streaming=True
            )

            # 4. 标记视频已处理
            for msg in to_send:
                await db.mark(get_unique_key(msg))

            # 随机延迟
            await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))

        except FloodWaitError as e:
            logger.warning(f"⏳ FloodWait: 暂停 {e.seconds} 秒")
            await asyncio.sleep(e.seconds + 2)
            # 可选:失败重试逻辑
            # await forward_queue.put(batch) 
            
        except (ChatForwardsRestrictedError, SecurityError):
            logger.error("⛔ 无法发送:权限受限")
        except rpcerrorlist.MediaEmptyError:
            logger.error("❌ 媒体文件丢失或无法访问")
        except Exception as e:
            logger.error(f"❌ Worker 异常: {e}", exc_info=True)
            await asyncio.sleep(5)
        finally:
            forward_queue.task_done()

# ==================== 扫描逻辑 (已修复进度问题) ====================

async def scan_channel(channel_id):
    logger.info(f"🔭 开始监控频道: {channel_id}")
    while True:
        try:
            last_id = await db.get_last(channel_id)
            # logger.debug(f"🔍 扫描 {channel_id} 从 ID: {last_id}")
            
            max_scanned_id = last_id
            found_count = 0
            
            # 批量获取消息,reverse=True 表示从旧到新
            async for msg in client.iter_messages(channel_id, min_id=last_id, limit=50, reverse=True):
                # 更新当前批次扫描到的最大 ID
                if msg.id > max_scanned_id:
                    max_scanned_id = msg.id
                
                if is_video(msg):
                    # 检查是否已经在库(防止 worker 还没处理完导致的重复入队)
                    key = get_unique_key(msg)
                    if not await db.seen(key):
                        await handle_incoming_message(msg)
                        found_count += 1
            
            # 【关键修复】无论是否找到视频,只要扫描了消息,就更新进度
            if max_scanned_id > last_id:
                await db.set_last(channel_id, max_scanned_id)
                # logger.info(f"✅ 进度更新: {channel_id} -> {max_scanned_id}")

            if found_count == 0:
                # 如果没找到视频,也确实没有新消息了,等待时间长一点
                pass
                
        except Exception as e:
            logger.error(f"⚠️ 扫描频道 {channel_id} 出错: {e}")
            await asyncio.sleep(10)
        
        await asyncio.sleep(SCAN_INTERVAL)

# ==================== 实时监听 ====================

@client.on(events.NewMessage(chats=SOURCE_CHANNELS))
async def realtime_handler(event):
    # 实时消息除了处理视频,也可以顺便更新一下进度 ID
    try:
        if is_video(event.message):
            logger.info(f"⚡ 实时捕获: {event.chat_id} - ID: {event.id}")
            await handle_incoming_message(event.message)
        
        # 可选:实时更新进度,防止扫描器重复扫描
        # await db.set_last(event.chat_id, event.id)
    except Exception as e:
        logger.error(f"实时处理错误: {e}")

# ==================== 主程序 ====================

async def main():
    # 1. 启动客户端
    await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
    
    me = await client.get_me()
    logger.info(f"✅ 登录成功: {me.first_name} (@{me.username})")

    # 2. 初始化数据库
    global db
    db = AsyncDB(f"bot_data_{TARGET_CHANNEL}.db")
    await db.connect()

    # 3. 启动后台任务
    asyncio.create_task(forward_worker())
    
    for cid in SOURCE_CHANNELS:
        asyncio.create_task(scan_channel(cid))

    logger.info("🚀 服务运行中...")
    
    try:
        await client.run_until_disconnected()
    finally:
        await db.close()
        logger.info("💾 数据库已关闭")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注