{"id":12,"date":"2025-12-20T08:28:52","date_gmt":"2025-12-20T08:28:52","guid":{"rendered":"https:\/\/wordpress-fz3fv.wasmer.app\/?p=12"},"modified":"2026-04-15T07:02:27","modified_gmt":"2026-04-15T07:02:27","slug":"tg-forward-videos","status":"publish","type":"post","link":"https:\/\/wp.bao.ksapp.me\/?p=12","title":{"rendered":"TG-forward-videos"},"content":{"rendered":"\n<pre class=\"wp-block-code\"><code>import asyncio\nimport os\nimport random\nimport re\nimport logging\nfrom logging.handlers import RotatingFileHandler\nfrom typing import List, Dict, Optional\nimport aiosqlite\nfrom telethon import TelegramClient, events\nfrom telethon.tl.types import MessageMediaDocument, MessageMediaPhoto, Message\nfrom telethon.errors import FloodWaitError\nfrom dotenv import load_dotenv\n\n# ==================== 1. \u73af\u5883\u914d\u7f6e\u4e0e\u521d\u59cb\u5316 ====================\nDB_FOLDER = \"database\"\nSESSION_FOLDER = \"session\"\nfor folder in &#91;DB_FOLDER, SESSION_FOLDER]:\n    if not os.path.exists(folder):\n        os.makedirs(folder)\n\nlogging.basicConfig(\n    level=logging.INFO,\n    format=\"%(asctime)s - %(levelname)s - &#91;%(name)s] - %(message)s\",\n    handlers=&#91;\n        RotatingFileHandler(os.path.join(DB_FOLDER, \"bot_work.log\"), maxBytes=10*1024*1024, backupCount=5, encoding=\"utf-8\"),\n        logging.StreamHandler()\n    ]\n)\nlogger = logging.getLogger(\"SuperForwarder\")\n\nload_dotenv()\nAPI_ID = int(os.getenv(\"API_ID\", 0))\nAPI_HASH = os.getenv(\"API_HASH\", \"\")\nPHONE_NUMBER = os.getenv(\"PHONE_NUMBER\", \"\")\nTWO_STEP_PASSWORD = os.getenv(\"TWO_STEP_PASSWORD\", \"\")\nTARGET_CHANNEL = int(os.getenv(\"TARGET_CHANNEL\", 0))\nSOURCE_CHANNELS = &#91;int(x.strip()) for x in os.getenv(\"SOURCE_CHANNELS\", \"\").split(\",\") if x.strip()]\n\n# \u914d\u7f6e\u53c2\u6570\nMAX_WORKERS = int(os.getenv(\"MAX_WORKERS\", 3)) \nMIN_INTERVAL = float(os.getenv(\"MIN_INTERVAL\", 2.0))\nMAX_INTERVAL = float(os.getenv(\"MAX_INTERVAL\", 5.0))\nMIN_SIZE_MB = float(os.getenv(\"MIN_SIZE_MB\", 0))\nALBUM_WAIT_TIME = 4.0  # \u589e\u52a0\u52304\u79d2\u4ee5\u786e\u4fdd\u76f8\u518c\u6536\u96c6\u5b8c\u6574\n\n# \u5e7f\u544a\u8fc7\u6ee4\u6b63\u5219\nAD_PATTERNS = &#91;\n    r\"https?:\/\/\\S+\", \n    r\"t\\.me\/\\S+\",     \n    r\"@\\w+\",          \n    r\"Via .*\",        \n    r\"\\&#91;.*?\\]\\(https?:\/\/.*?\\)\" \n]\n\n# ==================== 2. \u6570\u636e\u5e93\u7ba1\u7406 ====================\nclass AsyncDB:\n    def __init__(self, path):\n        self.path = path\n        self.conn: Optional&#91;aiosqlite.Connection] = None\n\n    async def connect(self):\n        self.conn = await aiosqlite.connect(self.path)\n        await self.conn.execute(\"PRAGMA journal_mode=WAL;\")\n        await self.conn.execute(\"\"\"\n            CREATE TABLE IF NOT EXISTS progress (\n                channel_id TEXT PRIMARY KEY, \n                last_msg_id INTEGER DEFAULT 0, \n                min_msg_id INTEGER DEFAULT 0\n            )\"\"\")\n        await self.conn.execute(\"CREATE TABLE IF NOT EXISTS media_history (media_key TEXT PRIMARY KEY)\")\n        await self.conn.execute(\"CREATE INDEX IF NOT EXISTS idx_mkey ON media_history (media_key);\")\n        await self.conn.commit()\n\n    async def is_seen(self, key):\n        async with self.conn.execute(\"SELECT 1 FROM media_history WHERE media_key=?\", (key,)) as cursor:\n            return await cursor.fetchone() is not None\n\n    async def mark_seen(self, key):\n        await self.conn.execute(\"INSERT OR IGNORE INTO media_history (media_key) VALUES (?)\", (key,))\n        await self.conn.commit()\n\n    async def get_prog(self, cid):\n        async with self.conn.execute(\"SELECT last_msg_id, min_msg_id FROM progress WHERE channel_id=?\", (str(cid),)) as cursor:\n            r = await cursor.fetchone()\n            return r if r else (0, 0)\n\n    async def update_prog(self, cid, last_id=None, min_id=None):\n        if last_id:\n            await self.conn.execute(\"INSERT INTO progress (channel_id, last_msg_id) VALUES (?, ?) ON CONFLICT(channel_id) DO UPDATE SET last_msg_id=?\", (str(cid), last_id, last_id))\n        if min_id:\n            await self.conn.execute(\"INSERT INTO progress (channel_id, min_msg_id) VALUES (?, ?) ON CONFLICT(channel_id) DO UPDATE SET min_msg_id=?\", (str(cid), min_id, min_id))\n        await self.conn.commit()\n\n# ==================== 3. \u5de5\u5177\u903b\u8f91 ====================\ndef clean_caption(text: str) -> str:\n    if not text: return \"\"\n    for p in AD_PATTERNS:\n        text = re.sub(p, \"\", text, flags=re.I)\n    return text.strip()\n\ndef is_media(msg: Message) -> bool:\n    \"\"\"\u5224\u65ad\u6d88\u606f\u662f\u5426\u4e3a\u9700\u8981\u8f6c\u53d1\u7684\u5a92\u4f53\uff08\u89c6\u9891\u6216\u56fe\u7247\uff09\"\"\"\n    if not msg.media: return False\n    \n    # 1. \u5904\u7406\u89c6\u9891\n    if isinstance(msg.media, MessageMediaDocument):\n        if msg.media.document.mime_type.startswith(\"video\"):\n            if MIN_SIZE_MB > 0 and (msg.media.document.size \/ 1048576) &lt; MIN_SIZE_MB:\n                return False\n            return True\n    \n    # 2. \u5904\u7406\u56fe\u7247\n    if isinstance(msg.media, MessageMediaPhoto):\n        return True\n        \n    return False\n\ndef get_media_key(msg: Message) -> Optional&#91;str]:\n    \"\"\"\u83b7\u53d6\u5a92\u4f53\u7684\u552f\u4e00\u6807\u8bc6\u7b26\"\"\"\n    if isinstance(msg.media, MessageMediaDocument):\n        return f\"doc_{msg.media.document.id}\"\n    if isinstance(msg.media, MessageMediaPhoto):\n        return f\"pho_{msg.media.photo.id}\"\n    return None\n\n# ==================== 4. \u76f8\u518c\u805a\u5408\u903b\u8f91 ====================\nforward_queue = asyncio.Queue(maxsize=500)\npending_albums: Dict&#91;int, List&#91;Message]] = {}\n\nasync def push_to_queue_later(grouped_id):\n    await asyncio.sleep(ALBUM_WAIT_TIME)\n    if grouped_id in pending_albums:\n        msgs = pending_albums.pop(grouped_id)\n        if msgs:\n            msgs.sort(key=lambda x: x.id)\n            await forward_queue.put(msgs)\n\nasync def handle_incoming(msg: Message):\n    if msg.grouped_id:\n        if msg.grouped_id not in pending_albums:\n            pending_albums&#91;msg.grouped_id] = &#91;]\n            asyncio.create_task(push_to_queue_later(msg.grouped_id))\n        pending_albums&#91;msg.grouped_id].append(msg)\n    else:\n        await forward_queue.put(&#91;msg])\n\n# ==================== 5. \u8f6c\u53d1 Worker ====================\nasync def worker(wid):\n    while True:\n        batch = await forward_queue.get()\n        try:\n            # 1. \u68c0\u67e5\u8fd9\u7ec4\u6d88\u606f\u91cc\u662f\u5426\u5305\u542b\u89c6\u9891 (\u5982\u679c\u53ea\u60f3\u8981\u5e26\u89c6\u9891\u7684\u7ec4\uff0c\u4e0d\u60f3\u8981\u7eaf\u56fe\u7247\u7ec4)\n            has_video = any(\n                isinstance(m.media, MessageMediaDocument) and \n                m.media.document.mime_type.startswith(\"video\") \n                for m in batch\n            )\n            \n            if not has_video:\n                # logger.debug(f\"Worker-{wid} | \u8df3\u8fc7\u7eaf\u56fe\u7247\u7ec4\")\n                forward_queue.task_done()\n                continue\n\n            # 2. \u8fc7\u6ee4\u5df2\u8f6c\u53d1\u8fc7\u7684\u5a92\u4f53\n            to_send = &#91;]\n            for m in batch:\n                m_key = get_media_key(m)\n                if m_key and not await db.is_seen(m_key):\n                    to_send.append(m)\n            \n            if to_send:\n                # 3. \u63d0\u53d6\u6587\u6848\n                caption = \"\"\n                for m in batch:\n                    if m.text:\n                        caption = clean_caption(m.text)\n                        break\n                \n                # 4. \u6267\u884c\u53d1\u9001\n                files = &#91;m.media for m in to_send]\n                await client.send_file(TARGET_CHANNEL, file=files, caption=caption, supports_streaming=True)\n                \n                # 5. \u6807\u8bb0\u5df2\u89c1\n                for m in to_send:\n                    m_key = get_media_key(m)\n                    if m_key: await db.mark_seen(m_key)\n                \n                logger.info(f\"Worker-{wid} | \u6210\u529f\u8f6c\u53d1 {len(to_send)} \u4e2a\u5a92\u4f53\u6587\u4ef6 (\u5305\u542b\u89c6\u9891\u53ca\u5176\u5173\u8054\u56fe\u7247)\")\n                await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))\n                \n        except FloodWaitError as e:\n            logger.warning(f\"Worker-{wid} | \u89e6\u53d1\u9650\u5236\uff0c\u4f11\u7720 {e.seconds}s\")\n            await asyncio.sleep(e.seconds + 5)\n        except Exception as e:\n            logger.error(f\"Worker-{wid} \u9519\u8bef: {e}\")\n        finally:\n            forward_queue.task_done()\n\n# ==================== 6. \u626b\u63cf\u4efb\u52a1 ====================\nasync def scan_latest_task(cid):\n    last_id, _ = await db.get_prog(cid)\n    if last_id == 0:\n        logger.info(f\"\u9891\u9053 {cid} \u521d\u6b21\u8fd0\u884c\uff0c\u6293\u53d6\u6700\u65b0 5000 \u6761...\")\n        async for msg in client.iter_messages(cid, limit=5000):\n            if is_media(msg): await handle_incoming(msg)\n            last_id = max(last_id, msg.id)\n        await db.update_prog(cid, last_id=last_id)\n\n    while True:\n        try:\n            async for msg in client.iter_messages(cid, min_id=last_id, reverse=True):\n                if is_media(msg): await handle_incoming(msg)\n                last_id = msg.id\n                await db.update_prog(cid, last_id=last_id)\n            await asyncio.sleep(60)\n        except Exception as e:\n            logger.error(f\"\u5b9e\u65f6\u540c\u6b65\u51fa\u9519: {e}\")\n            await asyncio.sleep(30)\n\nasync def backfill_history_task(cid):\n    logger.info(f\"\u5386\u53f2\u8865\u5168\u542f\u52a8: {cid}\")\n    while True:\n        try:\n            _, min_id = await db.get_prog(cid)\n            if min_id &lt;= 1: \n                logger.info(f\"\u9891\u9053 {cid} \u5386\u53f2\u5df2\u5168\u90e8\u540c\u6b65\u5b8c\u6210\u3002\")\n                break\n            \n            if forward_queue.qsize() &lt; 100:\n                async for msg in client.iter_messages(cid, offset_id=min_id, limit=100):\n                    if is_media(msg): await handle_incoming(msg)\n                    min_id = msg.id\n                await db.update_prog(cid, min_id=min_id)\n                await asyncio.sleep(random.randint(20, 40))\n            else:\n                await asyncio.sleep(30)\n        except Exception as e:\n            logger.error(f\"\u5386\u53f2\u8865\u5168\u9519\u8bef: {e}\")\n            await asyncio.sleep(60)\n\n# ==================== 7. \u542f\u52a8\u5165\u53e3 ====================\ns_tag = str(SOURCE_CHANNELS&#91;0]) if SOURCE_CHANNELS else \"unknown\"\nt_tag = str(TARGET_CHANNEL)\ndb_filename = f\"{s_tag}to{t_tag}.db\"\ndb_path = os.path.join(DB_FOLDER, db_filename)\n\nsession_path = os.path.join(SESSION_FOLDER, \"forwarder_session\")\nclient = TelegramClient(session_path, API_ID, API_HASH)\ndb = AsyncDB(db_path)\n\nasync def main():\n    await db.connect()\n    await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)\n    logger.info(f\"--- \u767b\u5f55\u6210\u529f | \u6570\u636e\u5e93: {db_filename} ---\")\n\n    for i in range(MAX_WORKERS):\n        asyncio.create_task(worker(i+1))\n\n    for cid in SOURCE_CHANNELS:\n        asyncio.create_task(scan_latest_task(cid))\n        async def backfill_wrapper(c):\n            await asyncio.sleep(10) # \u5ef6\u8fdf\u542f\u52a8\u5386\u53f2\u8865\u5168\n            await backfill_history_task(c)\n        asyncio.create_task(backfill_wrapper(cid))\n\n    await client.run_until_disconnected()\n\nif __name__ == \"__main__\":\n    try:\n        asyncio.run(main())\n    except KeyboardInterrupt:\n        pass<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-12","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=\/wp\/v2\/posts\/12","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=12"}],"version-history":[{"count":0,"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=\/wp\/v2\/posts\/12\/revisions"}],"wp:attachment":[{"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=12"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=12"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/wp.bao.ksapp.me\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=12"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}