TG-forward-videos-plus3

import asyncio
import os
import random
import logging
import hashlib
from typing import List, Dict
import aiosqlite
from telethon import TelegramClient
from telethon.tl.types import MessageMediaDocument, DocumentAttributeVideo, Message
from telethon.errors import (
    FloodWaitError, 
    SecurityError, 
    FileReferenceExpiredError,
    rpcerrorlist
)
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# ==================== 📁 文件夹初始化 ====================
DB_FOLDER = "database"
SESSION_FOLDER = "session"

for folder in [DB_FOLDER, SESSION_FOLDER]:
    if not os.path.exists(folder):
        os.makedirs(folder)

# ==================== 🛠️ 配置读取帮助函数 ====================
def get_env_float(key, default=0.0):
    val = os.getenv(key, "")
    if not val: return default
    try:
        return float(val)
    except ValueError:
        print(f"⚠️ 配置错误: {key} 必须是数字,当前为 '{val}',已重置为 {default}")
        return default

def get_env_int(key, default=0):
    val = os.getenv(key, "")
    if not val: return default
    try:
        return int(val)
    except ValueError:
        return default

# ==================== ⚙️ 基础配置 ====================
API_ID = get_env_int("API_ID")
API_HASH = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")

MIN_SIZE_MB = get_env_float("MIN_SIZE_MB", 0)
MAX_SIZE_MB = get_env_float("MAX_SIZE_MB", 0)
MIN_DURATION = get_env_int("MIN_DURATION", 0)
MAX_DURATION = get_env_int("MAX_DURATION", 0)

STOP_AFTER_DUPLICATES = 100 
MAX_SCAN_COUNT = 22000 

MIN_INTERVAL = 2
MAX_INTERVAL = 5
ALBUM_WAIT_TIME = 3.0

# ==================== 📋 任务清单 ====================
TASK_CONFIG = [
    # ADULT INDUSTRY 到 town1
    {
        "sources": [-1001245945922],
        "target": -xxx,
        "limit": 300
    },
   
]

# ==================== 日志初始化 ====================
logging.basicConfig(
    level=logging.INFO, 
    format="%(asctime)s - %(message)s",
    datefmt="%H:%M:%S"
)
logger = logging.getLogger("AutoBot")

# --- 修改:Session 路径移动到 session 文件夹 ---
session_path = os.path.join(SESSION_FOLDER, "user_session")
client = TelegramClient(session_path, API_ID, API_HASH)

# 全局状态变量
forward_queue = None
pending_albums = {}
album_timers = {}
db = None

# ==================== 🗄️ 数据库类 ====================
class AsyncDB:
    def __init__(self, path):
        self.path = path
        self.conn = None
        self.lock = asyncio.Lock()

    async def connect(self):
        self.conn = await aiosqlite.connect(self.path)
        await self.conn.execute("CREATE TABLE IF NOT EXISTS videos (video_key TEXT PRIMARY KEY)")
        await self.conn.commit()

    async def close(self):
        if self.conn: await self.conn.close()

    async def seen(self, key):
        async with self.lock:
            async with self.conn.execute("SELECT 1 FROM videos WHERE video_key=?", (key,)) as cursor:
                return await cursor.fetchone() is not None

    async def mark(self, key):
        async with self.lock:
            await self.conn.execute("INSERT OR IGNORE INTO videos (video_key) VALUES (?)", (key,))
            await self.conn.commit()

# ==================== 🛠️ 核心工具函数 ====================

def get_unique_key(msg: Message) -> str:
    return f"{msg.chat_id}_{msg.media.document.id}"

def generate_safe_filename(sources: List[int], target: int) -> str:
    """修改:确保文件名包含文件夹路径"""
    sources_str = "_".join([str(abs(s)) for s in sources])
    target_str = str(abs(target))
    
    if len(sources_str) > 50:
        hash_object = hashlib.md5(sources_str.encode())
        short_hash = hash_object.hexdigest()[:8]
        preview = "_".join([str(abs(s)) for s in sources[:2]])
        fname = f"{preview}_etc_{short_hash}_to_{target_str}.db"
    else:
        fname = f"{sources_str}to{target_str}.db"
    
    # 拼接数据库文件夹路径
    return os.path.join(DB_FOLDER, fname)

def is_video(msg: Message) -> bool:
    if not msg.media or not isinstance(msg.media, MessageMediaDocument): 
        return False
    doc = msg.media.document
    if not doc.mime_type.startswith("video"): 
        return False
    video_attr = next((a for a in doc.attributes if isinstance(a, DocumentAttributeVideo)), None)
    if getattr(video_attr, "round_message", False): 
        return False
    size_mb = doc.size / (1024.0 * 1024.0)
    if MIN_SIZE_MB > 0 and size_mb < MIN_SIZE_MB: return False
    if MAX_SIZE_MB > 0 and size_mb > MAX_SIZE_MB: return False
    if video_attr:
        duration = video_attr.duration
        if MIN_DURATION > 0 and duration < MIN_DURATION: return False
        if MAX_DURATION > 0 and duration > MAX_DURATION: return False
    return True

# ==================== 🔄 异步逻辑 ====================

async def process_album_later(grouped_id):
    try:
        await asyncio.sleep(ALBUM_WAIT_TIME)
        if grouped_id in pending_albums:
            messages = pending_albums.pop(grouped_id)
            album_timers.pop(grouped_id, None)
            if messages:
                messages.sort(key=lambda x: x.id)
                await forward_queue.put(messages)
    except asyncio.CancelledError:
        pass

async def queue_message(message: Message):
    if message.grouped_id:
        gid = message.grouped_id
        if gid not in pending_albums: pending_albums[gid] = []
        pending_albums[gid].append(message)
        if gid in album_timers: album_timers[gid].cancel()
        album_timers[gid] = asyncio.create_task(process_album_later(gid))
    else:
        await forward_queue.put([message])

async def worker(target_channel_id):
    processed_count = 0
    while True:
        try:
            batch = await forward_queue.get()
            if batch is None: 
                forward_queue.task_done()
                break
            files = [m.media for m in batch]
            caption = batch[0].text or ""
            try:
                await client.send_file(target_channel_id, files, caption=caption)
                processed_count += len(batch)
                logger.info(f"✅ 发送成功 | 本次: {len(batch)} | 总计: {processed_count}")
                for m in batch: 
                    await db.mark(get_unique_key(m))
                await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))
            except FloodWaitError as e:
                logger.warning(f"⏳ 触发流控: 暂停 {e.seconds} 秒")
                await asyncio.sleep(e.seconds + 2)
            except FileReferenceExpiredError:
                logger.error("❌ 文件引用过期,跳过")
            except Exception as e:
                logger.error(f"❌ 发送异常: {e}")
            forward_queue.task_done()
        except Exception as e:
            logger.error(f"Worker Error: {e}")

async def scanner(source_channels, forward_limit):
    all_collected_videos = []
    for ch in source_channels:
        channel_collected = 0 
        logger.info(f"🔍 正在扫描: {ch} (目标: {forward_limit})")
        consecutive_duplicates = 0
        scanned_count = 0
        async for msg in client.iter_messages(ch, limit=MAX_SCAN_COUNT):
            scanned_count += 1
            if scanned_count % 200 == 0:
                print(f"\r   ...扫描深度: {scanned_count} 条", end="")
            if not is_video(msg): continue
            if await db.seen(get_unique_key(msg)):
                consecutive_duplicates += 1
                if consecutive_duplicates >= STOP_AFTER_DUPLICATES:
                    print(f"\n🛑 [频道 {ch}] 连续 {STOP_AFTER_DUPLICATES} 条重复,停止扫描。")
                    break
            else:
                consecutive_duplicates = 0
                all_collected_videos.append(msg)
                channel_collected += 1
                print(f"\r📦 [频道 {ch}] 命中: {channel_collected}/{forward_limit}", end="")
            if channel_collected >= forward_limit:
                print(f"\n✅ [频道 {ch}] 额度已满")
                break
        print("")

    if all_collected_videos:
        logger.info(f"📊 找到 {len(all_collected_videos)} 个新视频。排序入队...")
        all_collected_videos.sort(key=lambda x: x.date)
        for msg in all_collected_videos:
            await queue_message(msg)
        if pending_albums:
            logger.info("⏳ 等待相册分组...")
            while pending_albums or album_timers:
                finished = [k for k, t in album_timers.items() if t.done()]
                for k in finished: del album_timers[k]
                if not album_timers and not pending_albums: break
                await asyncio.sleep(0.5)
    else:
        logger.info("⚠️ 没有发现任何新视频。")
    await forward_queue.put(None)

async def run_single_task(task):
    global db, forward_queue, pending_albums, album_timers
    
    sources = task['sources']
    target = task['target']
    limit = task.get('limit', 200)

    # 这里 generate_safe_filename 已经包含 database 路径
    full_db_path = generate_safe_filename(sources, target)
    logger.info(f"\n🚀 >>> 启动任务: {os.path.basename(full_db_path)} <<<")
    
    # 彻底重置状态
    forward_queue = asyncio.Queue()
    pending_albums = {}
    album_timers = {}
    
    db = AsyncDB(full_db_path)
    await db.connect()
    
    worker_task = asyncio.create_task(worker(target))
    
    try:
        await scanner(sources, limit)
        await forward_queue.join()
    except Exception as e:
        logger.error(f"❌ 任务运行异常: {e}")
    finally:
        if not worker_task.done():
            worker_task.cancel()
        await db.close()
        logger.info(f"🏁 >>> 任务结束: {os.path.basename(full_db_path)} <<<\n")

# ==================== 🚀 主程序入口 ====================

async def main():
    print("="*50)
    print("🎥  Auto-Forwarder Pro (Task Sequencer)")
    print(f"📉  最小视频限制: {MIN_SIZE_MB} MB")
    print(f"📈  最大视频限制: {MAX_SIZE_MB if MAX_SIZE_MB > 0 else '不限'} MB")
    print("="*50)
    
    if not TASK_CONFIG:
        logger.error("❌ TASK_CONFIG 为空,请在代码中配置任务列表!")
        return

    await client.start(PHONE_NUMBER)
    
    try:
        total_tasks = len(TASK_CONFIG)
        for index, task in enumerate(TASK_CONFIG):
            await run_single_task(task)
            
            if index < total_tasks - 1:
                wait_sec = 10
                logger.info(f"⏳ 任务 [{index + 1}/{total_tasks}] 已完成。")
                logger.info(f"⏳ 冷却中... {wait_sec} 秒后执行下一个任务...")
                await asyncio.sleep(wait_sec)
            else:
                logger.info("🎉 所有任务已按顺序执行完毕!")

    except KeyboardInterrupt:
        print("\n👋 用户手动停止程序")
    except Exception as e:
        logger.error(f"🔥 系统致命错误: {e}")
    finally:
        await client.disconnect()
        logger.info("🔌 已安全退出并断开连接")

if __name__ == "__main__":
    asyncio.run(main())

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注