import asyncio
import os
import random
import logging
import hashlib
from typing import List, Dict
import aiosqlite
from telethon import TelegramClient
from telethon.tl.types import MessageMediaDocument, DocumentAttributeVideo, Message
from telethon.errors import (
FloodWaitError,
SecurityError,
FileReferenceExpiredError,
rpcerrorlist
)
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# ==================== 🛠️ 配置读取帮助函数 ====================
def get_env_float(key, default=0.0):
val = os.getenv(key, "")
if not val: return default
try:
return float(val)
except ValueError:
print(f"⚠️ 配置错误: {key} 必须是数字,当前为 '{val}',已重置为 {default}")
return default
def get_env_int(key, default=0):
val = os.getenv(key, "")
if not val: return default
try:
return int(val)
except ValueError:
return default
# ==================== ⚙️ 基础配置 ====================
API_ID = get_env_int("API_ID")
API_HASH = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")
# 🔍 过滤配置 (直接读取并打印,确保生效)
MIN_SIZE_MB = get_env_float("MIN_SIZE_MB", 0)
MAX_SIZE_MB = get_env_float("MAX_SIZE_MB", 0)
MIN_DURATION = get_env_int("MIN_DURATION", 0)
MAX_DURATION = get_env_int("MAX_DURATION", 0)
# 🛡️ 安全扫描
STOP_AFTER_DUPLICATES = 100 # 连续遇到多少个重复视频后停止扫描该频道
MAX_SCAN_COUNT = 10000 # 每个频道最大扫描历史消息数 (防止无限扫描)
# ⏱️ 频率控制
MIN_INTERVAL = 2
MAX_INTERVAL = 5
ALBUM_WAIT_TIME = 3.0
# ==================== 📋 任务清单 ====================
TASK_CONFIG = [
# 示例任务
# {
# "sources": [-1002101568388],
# "target": -1002976532877,
# "limit": 100
# },
{
"sources": [-1001532416024],
"target": -1003613293007,
"limit": 4000
},
]
# ==================== 日志初始化 ====================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%H:%M:%S"
)
logger = logging.getLogger("AutoBot")
client = TelegramClient("user_session", API_ID, API_HASH)
# 全局变量
forward_queue = None
pending_albums = {}
album_timers = {}
db = None
# ==================== 🗄️ 数据库类 ====================
class AsyncDB:
def __init__(self, path):
self.path = path
self.conn = None
self.lock = asyncio.Lock()
async def connect(self):
self.conn = await aiosqlite.connect(self.path)
await self.conn.execute("CREATE TABLE IF NOT EXISTS videos (video_key TEXT PRIMARY KEY)")
await self.conn.commit()
async def close(self):
if self.conn: await self.conn.close()
async def seen(self, key):
async with self.lock:
async with self.conn.execute("SELECT 1 FROM videos WHERE video_key=?", (key,)) as cursor:
return await cursor.fetchone() is not None
async def mark(self, key):
async with self.lock:
await self.conn.execute("INSERT OR IGNORE INTO videos (video_key) VALUES (?)", (key,))
await self.conn.commit()
# ==================== 🛠️ 核心工具函数 ====================
def get_unique_key(msg: Message) -> str:
"""生成唯一去重键"""
return f"{msg.chat_id}_{msg.media.document.id}"
def generate_safe_filename(sources: List[int], target: int) -> str:
"""生成安全的文件名,防止过长"""
sources_str = "_".join([str(abs(s)) for s in sources])
target_str = str(abs(target))
if len(sources_str) > 50: # 如果文件名太长,使用 Hash
hash_object = hashlib.md5(sources_str.encode())
short_hash = hash_object.hexdigest()[:8]
preview = "_".join([str(abs(s)) for s in sources[:2]])
return f"{preview}_etc_{short_hash}_to_{target_str}.db"
return f"{sources_str}to{target_str}.db"
def is_video(msg: Message) -> bool:
"""检查消息是否为符合条件的视频"""
if not msg.media or not isinstance(msg.media, MessageMediaDocument):
return False
doc = msg.media.document
# 1. 格式检查
if not doc.mime_type.startswith("video"):
return False
# 2. 排除圆形视频 (Video Note)
video_attr = next((a for a in doc.attributes if isinstance(a, DocumentAttributeVideo)), None)
if getattr(video_attr, "round_message", False):
return False
# 3. 大小检查 (核心修复点)
size_mb = doc.size / (1024.0 * 1024.0)
if MIN_SIZE_MB > 0 and size_mb < MIN_SIZE_MB:
# print(f"\r❌ [跳过] 视频太小: {size_mb:.2f}MB < {MIN_SIZE_MB}MB")
return False
if MAX_SIZE_MB > 0 and size_mb > MAX_SIZE_MB:
# print(f"\r❌ [跳过] 视频太大: {size_mb:.2f}MB > {MAX_SIZE_MB}MB")
return False
# 4. 时长检查
if video_attr:
duration = video_attr.duration
if MIN_DURATION > 0 and duration < MIN_DURATION: return False
if MAX_DURATION > 0 and duration > MAX_DURATION: return False
return True
# ==================== 🔄 异步逻辑 ====================
async def process_album_later(grouped_id):
"""延迟处理相册,等待一组消息到齐"""
try:
await asyncio.sleep(ALBUM_WAIT_TIME)
if grouped_id in pending_albums:
messages = pending_albums.pop(grouped_id)
album_timers.pop(grouped_id, None)
if messages:
messages.sort(key=lambda x: x.id)
await forward_queue.put(messages) # 整个列表作为一个相册放入队列
except asyncio.CancelledError:
pass
async def queue_message(message: Message):
"""将消息分类放入队列(单条或相册)"""
if message.grouped_id:
gid = message.grouped_id
if gid not in pending_albums: pending_albums[gid] = []
pending_albums[gid].append(message)
# 重置计时器
if gid in album_timers: album_timers[gid].cancel()
album_timers[gid] = asyncio.create_task(process_album_later(gid))
else:
await forward_queue.put([message]) # 包装成列表统一格式
async def worker(target_channel_id):
"""消费者:负责发送消息"""
processed_count = 0
while True:
try:
batch = await forward_queue.get()
if batch is None:
forward_queue.task_done()
break
files = [m.media for m in batch]
caption = batch[0].text or ""
try:
await client.send_file(target_channel_id, files, caption=caption)
processed_count += len(batch)
logger.info(f"✅ 发送成功 | 本次: {len(batch)} | 总计: {processed_count}")
for m in batch:
await db.mark(get_unique_key(m))
await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))
except FloodWaitError as e:
logger.warning(f"⏳ 触发流控 (FloodWait): 暂停 {e.seconds} 秒")
await asyncio.sleep(e.seconds + 2)
except FileReferenceExpiredError:
logger.error("❌ 文件引用过期,跳过")
except Exception as e:
logger.error(f"❌ 发送异常: {e}")
forward_queue.task_done()
except Exception as e:
logger.error(f"Worker Error: {e}")
async def scanner(source_channels, forward_limit):
"""生产者:扫描历史消息"""
all_collected_videos = []
for ch in source_channels:
channel_collected = 0
logger.info(f"🔍 正在扫描: {ch} (目标: {forward_limit})")
consecutive_duplicates = 0
scanned_count = 0
async for msg in client.iter_messages(ch, limit=MAX_SCAN_COUNT):
scanned_count += 1
if scanned_count % 200 == 0:
print(f"\r ...扫描深度: {scanned_count} 条", end="")
if not is_video(msg):
continue
# 检查数据库
if await db.seen(get_unique_key(msg)):
consecutive_duplicates += 1
if consecutive_duplicates >= STOP_AFTER_DUPLICATES:
print(f"\n🛑 [频道 {ch}] 连续 {STOP_AFTER_DUPLICATES} 条重复,停止扫描本频道。")
break
else:
consecutive_duplicates = 0
all_collected_videos.append(msg)
channel_collected += 1
print(f"\r📦 [频道 {ch}] 命中: {channel_collected}/{forward_limit}", end="")
if channel_collected >= forward_limit:
print(f"\n✅ [频道 {ch}] 额度已满")
break
print("") # 换行
# 排序与入队
if all_collected_videos:
logger.info(f"📊 扫描完成,共找到 {len(all_collected_videos)} 个新视频。正在按时间排序...")
all_collected_videos.sort(key=lambda x: x.date) # 旧 -> 新
for msg in all_collected_videos:
await queue_message(msg)
# 等待最后的相册分组
if pending_albums:
logger.info("⏳ 等待相册分组完成...")
while pending_albums or album_timers:
finished = [k for k, t in album_timers.items() if t.done()]
for k in finished: del album_timers[k]
if not album_timers and not pending_albums: break
await asyncio.sleep(1)
await asyncio.sleep(1)
else:
logger.info("⚠️ 没有发现任何新视频。")
await forward_queue.put(None) # 通知 Worker 结束
async def run_single_task(task):
global db, forward_queue, pending_albums, album_timers
sources = task['sources']
target = task['target']
limit = task.get('limit', 200)
db_name = generate_safe_filename(sources, target)
logger.info(f"\n🚀 === 启动任务: {db_name} ===")
# 初始化
forward_queue = asyncio.Queue()
pending_albums = {}
album_timers = {}
db = AsyncDB(db_name)
await db.connect()
worker_task = asyncio.create_task(worker(target))
try:
await scanner(sources, limit)
await forward_queue.join()
except Exception as e:
logger.error(f"任务崩溃: {e}")
finally:
if not worker_task.done(): worker_task.cancel()
await db.close()
logger.info(f"🏁 === 任务结束 ===\n")
async def main():
print("="*40)
print("🎥 Auto-Forwarder Pro 启动")
print(f"📉 最小限制: {MIN_SIZE_MB} MB")
print(f"📈 最大限制: {MAX_SIZE_MB} MB (0 表示不限)")
print("="*40)
await client.start(PHONE_NUMBER)
try:
for task in TASK_CONFIG:
await run_single_task(task)
except KeyboardInterrupt:
print("\n👋 用户停止")
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())