import asyncio
import os
import random
import sqlite3
import re
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timezone
from telethon import TelegramClient, events
from telethon.tl.types import MessageMediaDocument, DocumentAttributeFilename, DocumentAttributeVideo
from telethon.errors import FloodWaitError, ChatForwardsRestrictedError, SecurityError
from dotenv import load_dotenv
# ==================== 0. 日志配置 ====================
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’,
handlers=[
RotatingFileHandler(“bot.log”, maxBytes=5*1024*1024, backupCount=5, encoding=’utf-8′),
logging.StreamHandler()
]
)
logger = logging.getLogger(“Bot”)
# ==================== 配置与数据库 ====================
load_dotenv()
START_TIME = datetime.now(timezone.utc)
class DBManager:
def __init__(self, db_path=”bot_data.db”):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.cursor = self.conn.cursor()
self._create_tables()
def _create_tables(self):
# 记录已发送的视频Key (去重)
self.cursor.execute(”’CREATE TABLE IF NOT EXISTS video_keys
(video_key TEXT PRIMARY KEY, target_msg_id INTEGER, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)”’)
# 记录每个频道的扫描进度
self.cursor.execute(”’CREATE TABLE IF NOT EXISTS channel_progress
(channel_id TEXT PRIMARY KEY, last_msg_id INTEGER)”’)
self.conn.commit()
def is_video_exists(self, video_key):
self.cursor.execute(“SELECT 1 FROM video_keys WHERE video_key = ?”, (video_key,))
return self.cursor.fetchone() is not None
def add_video_key(self, video_key, target_msg_id):
self.cursor.execute(“INSERT OR REPLACE INTO video_keys (video_key, target_msg_id) VALUES (?, ?)”, (video_key, target_msg_id))
self.conn.commit()
def get_progress(self, channel_id):
self.cursor.execute(“SELECT last_msg_id FROM channel_progress WHERE channel_id = ?”, (str(channel_id),))
res = self.cursor.fetchone()
return res[0] if res else 0
def update_progress(self, channel_id, last_msg_id):
self.cursor.execute(“INSERT OR REPLACE INTO channel_progress (channel_id, last_msg_id) VALUES (?, ?)”, (str(channel_id), last_msg_id))
self.conn.commit()
def get_today_count(self):
self.cursor.execute(“SELECT count(*) FROM video_keys WHERE timestamp >= date(‘now’)”)
return self.cursor.fetchone()[0]
db = DBManager()
# ==================== 环境变量处理 ====================
api_id = int(os.getenv(“API_ID”))
api_hash = os.getenv(“API_HASH”)
PHONE_NUMBER = os.getenv(“PHONE_NUMBER”)
TWO_STEP_PASSWORD = os.getenv(“TWO_STEP_PASSWORD”)
TARGET_CHANNEL = os.getenv(“TARGET_CHANNEL”) # 必须是单个 ID
ADMIN_ID = int(os.getenv(“ADMIN_ID”, 0))
def parse_channel(ch):
ch = ch.strip()
if ch.lstrip(“-“).isdigit():
return int(ch)
return ch
# 解析多个源频道
SOURCE_CHANNELS_PARSED = [
parse_channel(ch)
for ch in os.getenv(“SOURCE_CHANNELS”, “”).split(“,”)
if ch.strip()
]
SCAN_LIMIT = int(os.getenv(“SCAN_LIMIT”, 50))
MAX_CAPTION_LENGTH = int(os.getenv(“MAX_CAPTION_LENGTH”, 1024))
# ========== 🛡️ 全能过滤配置 (0为不限制) ==========
MIN_FILE_SIZE = int(os.getenv(“MIN_FILE_SIZE”, 0))
MAX_FILE_SIZE = int(os.getenv(“MAX_FILE_SIZE”, 0))
MIN_DURATION = int(os.getenv(“MIN_DURATION”, 0))
MAX_DURATION = int(os.getenv(“MAX_DURATION”, 0))
MIN_WIDTH = int(os.getenv(“MIN_WIDTH”, 0))
MIN_HEIGHT = int(os.getenv(“MIN_HEIGHT”, 0))
client = TelegramClient(“user_session”, api_id, api_hash)
forward_queue = asyncio.Queue(maxsize=100)
CHANNEL_NAME_CACHE = {} # 频道名称缓存池
# ==================== 广告清洗配置 ====================
AD_PATTERNS = [
r”@\w+”, r”https?://\S+”, r”t\.me/\S+”,
r”加入频道”, r”点击关注”, r”Поддержать проект.*”,
r”本台独家”, r”加微信”, r”招募”, r”更多资源”
]
def clean_caption(text):
if not text: return “”
for pattern in AD_PATTERNS:
text = re.sub(pattern, “”, text, flags=re.IGNORECASE)
text = re.sub(r’\n\s*\n‘, ‘\n’, text)
return text.strip()
# ==================== 🛠️ 核心:全能过滤函数 ====================
def is_video_eligible(message):
# 1. 基础格式检查
if not message.media or not isinstance(message.media, MessageMediaDocument): return False
doc = message.media.document
if not doc.mime_type or not doc.mime_type.startswith(“video”): return False
# 2. 文件大小检查
if MIN_FILE_SIZE > 0 and doc.size < MIN_FILE_SIZE: return False
if MAX_FILE_SIZE > 0 and doc.size > MAX_FILE_SIZE: return False
# 3. 提取视频属性 (时长、分辨率)
video_attr = next((a for a in doc.attributes if isinstance(a, DocumentAttributeVideo)), None)
if video_attr:
# 圆形视频过滤 (Video Notes)
if getattr(video_attr, ’round_message’, False): return False
# 时长检查
duration = video_attr.duration
if MIN_DURATION > 0 and duration < MIN_DURATION: return False
if MAX_DURATION > 0 and duration > MAX_DURATION: return False
# 画质/分辨率检查
width = video_attr.w
height = video_attr.h
if MIN_WIDTH > 0 and width < MIN_WIDTH: return False
if MIN_HEIGHT > 0 and height < MIN_HEIGHT: return False
return True
def get_video_key(message):
doc = message.media.document
filename = next((a.file_name for a in doc.attributes if isinstance(a, DocumentAttributeFilename)), “v.mp4”)
# 唯一ID: 文件名 + 大小 + 原始MessageID (简单且有效)
return f”{filename}_{doc.size}_{doc.id}”
# ==================== 消费者:转发出口 (优化版) ====================
async def worker():
logger.info(“👷 转发消费者已启动,等待任务…”)
while True:
msg, video_key, source_ch_id = await forward_queue.get()
try:
# 1. 尝试刷新消息对象(防止链接失效)
refreshed_msgs = await client.get_messages(int(source_ch_id), ids=[msg.id])
if refreshed_msgs and refreshed_msgs[0]:
msg = refreshed_msgs[0]
if not is_video_eligible(msg):
logger.warning(f”⚠️ 视频 {msg.id} 已不符合条件或失效,跳过”)
forward_queue.task_done()
continue
else:
logger.warning(f”⚠️ 无法刷新消息 {msg.id},尝试使用缓存旧引用”)
except Exception as e:
logger.warning(f”⚠️ 刷新消息失败: {e},尝试强行转发”)
# 2. 获取频道名称 (带缓存)
source_id_int = int(source_ch_id)
source_name = str(source_id_int) # 默认用ID
if source_id_int in CHANNEL_NAME_CACHE:
source_name = CHANNEL_NAME_CACHE[source_id_int]
else:
try:
# 仅第一次请求API
entity = await client.get_entity(source_id_int)
title = getattr(entity, ‘title’, str(source_id_int))
CHANNEL_NAME_CACHE[source_id_int] = title
source_name = title
except:
pass # 获取失败保持默认ID
caption = clean_caption(msg.message or “”)[:MAX_CAPTION_LENGTH]
while True:
try:
# 3. 发送文件
sent_msg = await client.send_file(TARGET_CHANNEL, file=msg.media, caption=caption, silent=True)
# 4. 记录数据库
db.add_video_key(video_key, sent_msg.id)
db.update_progress(source_ch_id, msg.id)
logger.info(f”✅ 转发成功 | 来源: {source_name} | ID: {msg.id}”)
# 5. 防风控延迟 (2-5秒)
await asyncio.sleep(random.uniform(1, 3))
break
except (ChatForwardsRestrictedError, SecurityError):
logger.error(f”⛔ 无法转发 (ID: {msg.id}): 源频道开启了内容保护,跳过”)
break
except FloodWaitError as e:
logger.warning(f”⏳ 触发风控 FloodWait,暂停 {e.seconds} 秒…”)
await asyncio.sleep(e.seconds + 5)
except Exception as e:
logger.error(f”❌ 转发异常: {e}”, exc_info=True)
break
forward_queue.task_done()
# ==================== 管理员指令 ====================
@client.on(events.NewMessage(pattern=’/status’))
async def status_handler(event):
if ADMIN_ID == 0 or event.sender_id != ADMIN_ID: return
uptime = datetime.now(timezone.utc) – START_TIME
today_count = db.get_today_count()
queue_size = forward_queue.qsize()
log_size = “0”
if os.path.exists(“bot.log”):
log_size = f”{os.path.getsize(‘bot.log’) / 1024 / 1024:.2f}”
msg = (
f”🤖 **机器人运行状态**\n”
f”━━━━━━━━━━━━━━\n”
f”⏱️ **运行**: {str(uptime).split(‘.’)[0]}\n”
f”📊 **今日**: {today_count}\n”
f”📥 **队列**: {queue_size}\n”
f”📝 **日志**: {log_size} MB\n”
f”📏 **缓存频道**: {len(CHANNEL_NAME_CACHE)} 个”
)
await event.reply(msg)
# ==================== 生产者 ====================
async def scan_channel(channel_input, semaphore):
async with semaphore:
try:
entity = await client.get_entity(channel_input)
channel_id = str(entity.id)
# 预存一下名字进缓存
CHANNEL_NAME_CACHE[entity.id] = getattr(entity, ‘title’, str(entity.id))
last_id = db.get_progress(channel_id)
logger.info(f”🔍 开始扫描: {CHANNEL_NAME_CACHE[entity.id]} (起点ID: {last_id})”)
# 扫描历史 (SCAN_LIMIT 控制条数)
async for msg in client.iter_messages(entity, min_id=last_id, limit=SCAN_LIMIT if SCAN_LIMIT > 0 else None, reverse=True):
if not is_video_eligible(msg): continue
key = get_video_key(msg)
if db.is_video_exists(key): continue
await forward_queue.put((msg, key, channel_id))
logger.info(f”📥 [历史] 入队: {msg.id}”)
logger.info(f”🏁 扫描完成: {CHANNEL_NAME_CACHE[entity.id]}”)
except Exception as e:
logger.error(f”❌ 扫描失败 {channel_input}: {e}”)
@client.on(events.NewMessage(chats=SOURCE_CHANNELS_PARSED))
async def handler(event):
if is_video_eligible(event.message):
key = get_video_key(event.message)
if not db.is_video_exists(key):
# 将消息推入队列
await forward_queue.put((event.message, key, str(event.chat_id)))
# 尝试简易记录名称 (如果是Entity)
chat = await event.get_chat()
if chat:
CHANNEL_NAME_CACHE[chat.id] = getattr(chat, ‘title’, str(chat.id))
logger.info(f”⚡ [实时] 入队: {event.message.id}”)
async def main():
await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
logger.info(f”🚀 启动成功 | 监听频道: {len(SOURCE_CHANNELS_PARSED)} 个 | 过滤生效中”)
# 启动消费者 (Worker)
asyncio.create_task(worker())
# 启动历史扫描 (Semaphore 限制并发数为 1,稳如老狗)
scan_semaphore = asyncio.Semaphore(1)
scan_tasks = [scan_channel(ch, scan_semaphore) for ch in SOURCE_CHANNELS_PARSED]
await asyncio.gather(*scan_tasks)
await client.run_until_disconnected()
if __name__ == “__main__”:
with client:
client.loop.run_until_complete(main())