import asyncio
import os
import random
import logging
import hashlib
from typing import List, Dict
import aiosqlite
from telethon import TelegramClient
from telethon.tl.types import MessageMediaDocument, DocumentAttributeVideo, Message
from telethon.errors import (
FloodWaitError,
SecurityError,
FileReferenceExpiredError,
rpcerrorlist
)
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# ==================== 📁 文件夹初始化 ====================
DB_FOLDER = "database"
SESSION_FOLDER = "session"
for folder in [DB_FOLDER, SESSION_FOLDER]:
if not os.path.exists(folder):
os.makedirs(folder)
# ==================== 🛠️ 配置读取帮助函数 ====================
def get_env_float(key, default=0.0):
val = os.getenv(key, "")
if not val: return default
try:
return float(val)
except ValueError:
print(f"⚠️ 配置错误: {key} 必须是数字,当前为 '{val}',已重置为 {default}")
return default
def get_env_int(key, default=0):
val = os.getenv(key, "")
if not val: return default
try:
return int(val)
except ValueError:
return default
# ==================== ⚙️ 基础配置 ====================
API_ID = get_env_int("API_ID")
API_HASH = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")
MIN_SIZE_MB = get_env_float("MIN_SIZE_MB", 0)
MAX_SIZE_MB = get_env_float("MAX_SIZE_MB", 0)
MIN_DURATION = get_env_int("MIN_DURATION", 0)
MAX_DURATION = get_env_int("MAX_DURATION", 0)
STOP_AFTER_DUPLICATES = 100
MAX_SCAN_COUNT = 22000
MIN_INTERVAL = 2
MAX_INTERVAL = 5
ALBUM_WAIT_TIME = 3.0
# ==================== 📋 任务清单 ====================
TASK_CONFIG = [
# ADULT INDUSTRY 到 town1
{
"sources": [-1001245945922],
"target": -xxx,
"limit": 300
},
]
# ==================== 日志初始化 ====================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%H:%M:%S"
)
logger = logging.getLogger("AutoBot")
# --- 修改:Session 路径移动到 session 文件夹 ---
session_path = os.path.join(SESSION_FOLDER, "user_session")
client = TelegramClient(session_path, API_ID, API_HASH)
# 全局状态变量
forward_queue = None
pending_albums = {}
album_timers = {}
db = None
# ==================== 🗄️ 数据库类 ====================
class AsyncDB:
def __init__(self, path):
self.path = path
self.conn = None
self.lock = asyncio.Lock()
async def connect(self):
self.conn = await aiosqlite.connect(self.path)
await self.conn.execute("CREATE TABLE IF NOT EXISTS videos (video_key TEXT PRIMARY KEY)")
await self.conn.commit()
async def close(self):
if self.conn: await self.conn.close()
async def seen(self, key):
async with self.lock:
async with self.conn.execute("SELECT 1 FROM videos WHERE video_key=?", (key,)) as cursor:
return await cursor.fetchone() is not None
async def mark(self, key):
async with self.lock:
await self.conn.execute("INSERT OR IGNORE INTO videos (video_key) VALUES (?)", (key,))
await self.conn.commit()
# ==================== 🛠️ 核心工具函数 ====================
def get_unique_key(msg: Message) -> str:
return f"{msg.chat_id}_{msg.media.document.id}"
def generate_safe_filename(sources: List[int], target: int) -> str:
"""修改:确保文件名包含文件夹路径"""
sources_str = "_".join([str(abs(s)) for s in sources])
target_str = str(abs(target))
if len(sources_str) > 50:
hash_object = hashlib.md5(sources_str.encode())
short_hash = hash_object.hexdigest()[:8]
preview = "_".join([str(abs(s)) for s in sources[:2]])
fname = f"{preview}_etc_{short_hash}_to_{target_str}.db"
else:
fname = f"{sources_str}to{target_str}.db"
# 拼接数据库文件夹路径
return os.path.join(DB_FOLDER, fname)
def is_video(msg: Message) -> bool:
if not msg.media or not isinstance(msg.media, MessageMediaDocument):
return False
doc = msg.media.document
if not doc.mime_type.startswith("video"):
return False
video_attr = next((a for a in doc.attributes if isinstance(a, DocumentAttributeVideo)), None)
if getattr(video_attr, "round_message", False):
return False
size_mb = doc.size / (1024.0 * 1024.0)
if MIN_SIZE_MB > 0 and size_mb < MIN_SIZE_MB: return False
if MAX_SIZE_MB > 0 and size_mb > MAX_SIZE_MB: return False
if video_attr:
duration = video_attr.duration
if MIN_DURATION > 0 and duration < MIN_DURATION: return False
if MAX_DURATION > 0 and duration > MAX_DURATION: return False
return True
# ==================== 🔄 异步逻辑 ====================
async def process_album_later(grouped_id):
try:
await asyncio.sleep(ALBUM_WAIT_TIME)
if grouped_id in pending_albums:
messages = pending_albums.pop(grouped_id)
album_timers.pop(grouped_id, None)
if messages:
messages.sort(key=lambda x: x.id)
await forward_queue.put(messages)
except asyncio.CancelledError:
pass
async def queue_message(message: Message):
if message.grouped_id:
gid = message.grouped_id
if gid not in pending_albums: pending_albums[gid] = []
pending_albums[gid].append(message)
if gid in album_timers: album_timers[gid].cancel()
album_timers[gid] = asyncio.create_task(process_album_later(gid))
else:
await forward_queue.put([message])
async def worker(target_channel_id):
processed_count = 0
while True:
try:
batch = await forward_queue.get()
if batch is None:
forward_queue.task_done()
break
files = [m.media for m in batch]
caption = batch[0].text or ""
try:
await client.send_file(target_channel_id, files, caption=caption)
processed_count += len(batch)
logger.info(f"✅ 发送成功 | 本次: {len(batch)} | 总计: {processed_count}")
for m in batch:
await db.mark(get_unique_key(m))
await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))
except FloodWaitError as e:
logger.warning(f"⏳ 触发流控: 暂停 {e.seconds} 秒")
await asyncio.sleep(e.seconds + 2)
except FileReferenceExpiredError:
logger.error("❌ 文件引用过期,跳过")
except Exception as e:
logger.error(f"❌ 发送异常: {e}")
forward_queue.task_done()
except Exception as e:
logger.error(f"Worker Error: {e}")
async def scanner(source_channels, forward_limit):
all_collected_videos = []
for ch in source_channels:
channel_collected = 0
logger.info(f"🔍 正在扫描: {ch} (目标: {forward_limit})")
consecutive_duplicates = 0
scanned_count = 0
async for msg in client.iter_messages(ch, limit=MAX_SCAN_COUNT):
scanned_count += 1
if scanned_count % 200 == 0:
print(f"\r ...扫描深度: {scanned_count} 条", end="")
if not is_video(msg): continue
if await db.seen(get_unique_key(msg)):
consecutive_duplicates += 1
if consecutive_duplicates >= STOP_AFTER_DUPLICATES:
print(f"\n🛑 [频道 {ch}] 连续 {STOP_AFTER_DUPLICATES} 条重复,停止扫描。")
break
else:
consecutive_duplicates = 0
all_collected_videos.append(msg)
channel_collected += 1
print(f"\r📦 [频道 {ch}] 命中: {channel_collected}/{forward_limit}", end="")
if channel_collected >= forward_limit:
print(f"\n✅ [频道 {ch}] 额度已满")
break
print("")
if all_collected_videos:
logger.info(f"📊 找到 {len(all_collected_videos)} 个新视频。排序入队...")
all_collected_videos.sort(key=lambda x: x.date)
for msg in all_collected_videos:
await queue_message(msg)
if pending_albums:
logger.info("⏳ 等待相册分组...")
while pending_albums or album_timers:
finished = [k for k, t in album_timers.items() if t.done()]
for k in finished: del album_timers[k]
if not album_timers and not pending_albums: break
await asyncio.sleep(0.5)
else:
logger.info("⚠️ 没有发现任何新视频。")
await forward_queue.put(None)
async def run_single_task(task):
global db, forward_queue, pending_albums, album_timers
sources = task['sources']
target = task['target']
limit = task.get('limit', 200)
# 这里 generate_safe_filename 已经包含 database 路径
full_db_path = generate_safe_filename(sources, target)
logger.info(f"\n🚀 >>> 启动任务: {os.path.basename(full_db_path)} <<<")
# 彻底重置状态
forward_queue = asyncio.Queue()
pending_albums = {}
album_timers = {}
db = AsyncDB(full_db_path)
await db.connect()
worker_task = asyncio.create_task(worker(target))
try:
await scanner(sources, limit)
await forward_queue.join()
except Exception as e:
logger.error(f"❌ 任务运行异常: {e}")
finally:
if not worker_task.done():
worker_task.cancel()
await db.close()
logger.info(f"🏁 >>> 任务结束: {os.path.basename(full_db_path)} <<<\n")
# ==================== 🚀 主程序入口 ====================
async def main():
print("="*50)
print("🎥 Auto-Forwarder Pro (Task Sequencer)")
print(f"📉 最小视频限制: {MIN_SIZE_MB} MB")
print(f"📈 最大视频限制: {MAX_SIZE_MB if MAX_SIZE_MB > 0 else '不限'} MB")
print("="*50)
if not TASK_CONFIG:
logger.error("❌ TASK_CONFIG 为空,请在代码中配置任务列表!")
return
await client.start(PHONE_NUMBER)
try:
total_tasks = len(TASK_CONFIG)
for index, task in enumerate(TASK_CONFIG):
await run_single_task(task)
if index < total_tasks - 1:
wait_sec = 10
logger.info(f"⏳ 任务 [{index + 1}/{total_tasks}] 已完成。")
logger.info(f"⏳ 冷却中... {wait_sec} 秒后执行下一个任务...")
await asyncio.sleep(wait_sec)
else:
logger.info("🎉 所有任务已按顺序执行完毕!")
except KeyboardInterrupt:
print("\n👋 用户手动停止程序")
except Exception as e:
logger.error(f"🔥 系统致命错误: {e}")
finally:
await client.disconnect()
logger.info("🔌 已安全退出并断开连接")
if __name__ == "__main__":
asyncio.run(main())