import asyncio
import os
import random
import re
import logging
from logging.handlers import RotatingFileHandler
from typing import List, Dict, Optional
import aiosqlite
from telethon import TelegramClient, events
from telethon.tl.types import MessageMediaDocument, MessageMediaPhoto, Message
from telethon.errors import FloodWaitError
from dotenv import load_dotenv
# ==================== 1. 环境配置与初始化 ====================
DB_FOLDER = "database"
SESSION_FOLDER = "session"
for folder in [DB_FOLDER, SESSION_FOLDER]:
if not os.path.exists(folder):
os.makedirs(folder)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - [%(name)s] - %(message)s",
handlers=[
RotatingFileHandler(os.path.join(DB_FOLDER, "bot_work.log"), maxBytes=10*1024*1024, backupCount=5, encoding="utf-8"),
logging.StreamHandler()
]
)
logger = logging.getLogger("SuperForwarder")
load_dotenv()
API_ID = int(os.getenv("API_ID", 0))
API_HASH = os.getenv("API_HASH", "")
PHONE_NUMBER = os.getenv("PHONE_NUMBER", "")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD", "")
TARGET_CHANNEL = int(os.getenv("TARGET_CHANNEL", 0))
SOURCE_CHANNELS = [int(x.strip()) for x in os.getenv("SOURCE_CHANNELS", "").split(",") if x.strip()]
# 配置参数
MAX_WORKERS = int(os.getenv("MAX_WORKERS", 3))
MIN_INTERVAL = float(os.getenv("MIN_INTERVAL", 2.0))
MAX_INTERVAL = float(os.getenv("MAX_INTERVAL", 5.0))
MIN_SIZE_MB = float(os.getenv("MIN_SIZE_MB", 0))
ALBUM_WAIT_TIME = 4.0 # 增加到4秒以确保相册收集完整
# 广告过滤正则
AD_PATTERNS = [
r"https?://\S+",
r"t\.me/\S+",
r"@\w+",
r"Via .*",
r"\[.*?\]\(https?://.*?\)"
]
# ==================== 2. 数据库管理 ====================
class AsyncDB:
def __init__(self, path):
self.path = path
self.conn: Optional[aiosqlite.Connection] = None
async def connect(self):
self.conn = await aiosqlite.connect(self.path)
await self.conn.execute("PRAGMA journal_mode=WAL;")
await self.conn.execute("""
CREATE TABLE IF NOT EXISTS progress (
channel_id TEXT PRIMARY KEY,
last_msg_id INTEGER DEFAULT 0,
min_msg_id INTEGER DEFAULT 0
)""")
await self.conn.execute("CREATE TABLE IF NOT EXISTS media_history (media_key TEXT PRIMARY KEY)")
await self.conn.execute("CREATE INDEX IF NOT EXISTS idx_mkey ON media_history (media_key);")
await self.conn.commit()
async def is_seen(self, key):
async with self.conn.execute("SELECT 1 FROM media_history WHERE media_key=?", (key,)) as cursor:
return await cursor.fetchone() is not None
async def mark_seen(self, key):
await self.conn.execute("INSERT OR IGNORE INTO media_history (media_key) VALUES (?)", (key,))
await self.conn.commit()
async def get_prog(self, cid):
async with self.conn.execute("SELECT last_msg_id, min_msg_id FROM progress WHERE channel_id=?", (str(cid),)) as cursor:
r = await cursor.fetchone()
return r if r else (0, 0)
async def update_prog(self, cid, last_id=None, min_id=None):
if last_id:
await self.conn.execute("INSERT INTO progress (channel_id, last_msg_id) VALUES (?, ?) ON CONFLICT(channel_id) DO UPDATE SET last_msg_id=?", (str(cid), last_id, last_id))
if min_id:
await self.conn.execute("INSERT INTO progress (channel_id, min_msg_id) VALUES (?, ?) ON CONFLICT(channel_id) DO UPDATE SET min_msg_id=?", (str(cid), min_id, min_id))
await self.conn.commit()
# ==================== 3. 工具逻辑 ====================
def clean_caption(text: str) -> str:
if not text: return ""
for p in AD_PATTERNS:
text = re.sub(p, "", text, flags=re.I)
return text.strip()
def is_media(msg: Message) -> bool:
"""判断消息是否为需要转发的媒体(视频或图片)"""
if not msg.media: return False
# 1. 处理视频
if isinstance(msg.media, MessageMediaDocument):
if msg.media.document.mime_type.startswith("video"):
if MIN_SIZE_MB > 0 and (msg.media.document.size / 1048576) < MIN_SIZE_MB:
return False
return True
# 2. 处理图片
if isinstance(msg.media, MessageMediaPhoto):
return True
return False
def get_media_key(msg: Message) -> Optional[str]:
"""获取媒体的唯一标识符"""
if isinstance(msg.media, MessageMediaDocument):
return f"doc_{msg.media.document.id}"
if isinstance(msg.media, MessageMediaPhoto):
return f"pho_{msg.media.photo.id}"
return None
# ==================== 4. 相册聚合逻辑 ====================
forward_queue = asyncio.Queue(maxsize=500)
pending_albums: Dict[int, List[Message]] = {}
async def push_to_queue_later(grouped_id):
await asyncio.sleep(ALBUM_WAIT_TIME)
if grouped_id in pending_albums:
msgs = pending_albums.pop(grouped_id)
if msgs:
msgs.sort(key=lambda x: x.id)
await forward_queue.put(msgs)
async def handle_incoming(msg: Message):
if msg.grouped_id:
if msg.grouped_id not in pending_albums:
pending_albums[msg.grouped_id] = []
asyncio.create_task(push_to_queue_later(msg.grouped_id))
pending_albums[msg.grouped_id].append(msg)
else:
await forward_queue.put([msg])
# ==================== 5. 转发 Worker ====================
async def worker(wid):
while True:
batch = await forward_queue.get()
try:
# 1. 检查这组消息里是否包含视频 (如果只想要带视频的组,不想要纯图片组)
has_video = any(
isinstance(m.media, MessageMediaDocument) and
m.media.document.mime_type.startswith("video")
for m in batch
)
if not has_video:
# logger.debug(f"Worker-{wid} | 跳过纯图片组")
forward_queue.task_done()
continue
# 2. 过滤已转发过的媒体
to_send = []
for m in batch:
m_key = get_media_key(m)
if m_key and not await db.is_seen(m_key):
to_send.append(m)
if to_send:
# 3. 提取文案
caption = ""
for m in batch:
if m.text:
caption = clean_caption(m.text)
break
# 4. 执行发送
files = [m.media for m in to_send]
await client.send_file(TARGET_CHANNEL, file=files, caption=caption, supports_streaming=True)
# 5. 标记已见
for m in to_send:
m_key = get_media_key(m)
if m_key: await db.mark_seen(m_key)
logger.info(f"Worker-{wid} | 成功转发 {len(to_send)} 个媒体文件 (包含视频及其关联图片)")
await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))
except FloodWaitError as e:
logger.warning(f"Worker-{wid} | 触发限制,休眠 {e.seconds}s")
await asyncio.sleep(e.seconds + 5)
except Exception as e:
logger.error(f"Worker-{wid} 错误: {e}")
finally:
forward_queue.task_done()
# ==================== 6. 扫描任务 ====================
async def scan_latest_task(cid):
last_id, _ = await db.get_prog(cid)
if last_id == 0:
logger.info(f"频道 {cid} 初次运行,抓取最新 5000 条...")
async for msg in client.iter_messages(cid, limit=5000):
if is_media(msg): await handle_incoming(msg)
last_id = max(last_id, msg.id)
await db.update_prog(cid, last_id=last_id)
while True:
try:
async for msg in client.iter_messages(cid, min_id=last_id, reverse=True):
if is_media(msg): await handle_incoming(msg)
last_id = msg.id
await db.update_prog(cid, last_id=last_id)
await asyncio.sleep(60)
except Exception as e:
logger.error(f"实时同步出错: {e}")
await asyncio.sleep(30)
async def backfill_history_task(cid):
logger.info(f"历史补全启动: {cid}")
while True:
try:
_, min_id = await db.get_prog(cid)
if min_id <= 1:
logger.info(f"频道 {cid} 历史已全部同步完成。")
break
if forward_queue.qsize() < 100:
async for msg in client.iter_messages(cid, offset_id=min_id, limit=100):
if is_media(msg): await handle_incoming(msg)
min_id = msg.id
await db.update_prog(cid, min_id=min_id)
await asyncio.sleep(random.randint(20, 40))
else:
await asyncio.sleep(30)
except Exception as e:
logger.error(f"历史补全错误: {e}")
await asyncio.sleep(60)
# ==================== 7. 启动入口 ====================
s_tag = str(SOURCE_CHANNELS[0]) if SOURCE_CHANNELS else "unknown"
t_tag = str(TARGET_CHANNEL)
db_filename = f"{s_tag}to{t_tag}.db"
db_path = os.path.join(DB_FOLDER, db_filename)
session_path = os.path.join(SESSION_FOLDER, "forwarder_session")
client = TelegramClient(session_path, API_ID, API_HASH)
db = AsyncDB(db_path)
async def main():
await db.connect()
await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
logger.info(f"--- 登录成功 | 数据库: {db_filename} ---")
for i in range(MAX_WORKERS):
asyncio.create_task(worker(i+1))
for cid in SOURCE_CHANNELS:
asyncio.create_task(scan_latest_task(cid))
async def backfill_wrapper(c):
await asyncio.sleep(10) # 延迟启动历史补全
await backfill_history_task(c)
asyncio.create_task(backfill_wrapper(cid))
await client.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass