TG-getlist

import os

import csv

import datetime

from telethon import TelegramClient

from dotenv import load_dotenv

# === 1. 读取配置 ===

load_dotenv()

API_ID = os.getenv(“API_ID”)

API_HASH = os.getenv(“API_HASH”)

PHONE_NUMBER = os.getenv(“PHONE_NUMBER”)

TWO_STEP_PASSWORD = os.getenv(“TWO_STEP_PASSWORD”)

if not API_ID or not API_HASH:

    print(“❌ 错误: 请确保 .env 文件中配置了 API_ID 和 API_HASH”)

    exit(1)

client = TelegramClient(‘user_session’, int(API_ID), API_HASH)

def get_bot_api_id(entity, entity_type):

    “””

    根据实体类型将 Telethon ID 转换为 Bot API ID

    Bot API 规则:

    – 频道/超级群: -100 + ID

    – 普通小群: – + ID

    – 用户: ID (不变)

    “””

    raw_id = entity.id

    if entity_type in [“频道”, “超级群”, “频道(未知)”]:

        return int(f”-100{raw_id}”)

    elif entity_type == “普通群”:

        return int(f”-{raw_id}”)

    else:

        return raw_id

async def list_and_export_chats():

    “””列出并导出账号加入的所有频道和群组”””

    print(“📃 正在获取对话列表,请稍候…”)

    chat_data_list = []

    async for dialog in client.iter_dialogs():

        entity = dialog.entity

        entity_type = “未知”

        # — 分类逻辑 —

        if dialog.is_user:

            entity_type = “私聊”

        elif dialog.is_channel:

            if getattr(entity, ‘broadcast’, False):

                entity_type = “频道”

            elif getattr(entity, ‘megagroup’, False):

                entity_type = “超级群”

            else:

                entity_type = “频道(未知)”

        elif dialog.is_group:

            entity_type = “普通群”

        # 过滤掉私聊 (如果需要私聊ID,注释掉下面这行)

        if entity_type != “私聊”:

            # 获取两种格式的 ID

            raw_id = entity.id

            bot_api_id = get_bot_api_id(entity, entity_type)

            chat_info = {

                “类型”: entity_type,

                “名称”: dialog.name,

                “Bot_API_ID”: bot_api_id,  # ✅ 新增:可以直接给 Bot 用的 ID

                “原始_ID”: raw_id,         # Telethon 用的原始 ID

                “用户名”: getattr(entity, ‘username’, ‘无’) or ‘无’,

                “成员数”: getattr(entity, ‘participants_count’, ‘未知’)

            }

            chat_data_list.append(chat_info)

            print(f”[{entity_type}] {dialog.name} | Bot_ID: {bot_api_id}”)

    # === 导出到 CSV 文件 ===

    if chat_data_list:

        timestamp = datetime.datetime.now().strftime(“%Y%m%d_%H%M%S”)

        filename = f”telegram_chats_{timestamp}.csv”

        # 更新表头

        headers = [“类型”, “名称”, “Bot_API_ID”, “原始_ID”, “用户名”, “成员数”]

        try:

            with open(filename, mode=’w’, encoding=’utf-8-sig’, newline=”) as f:

                writer = csv.DictWriter(f, fieldnames=headers)

                writer.writeheader()

                writer.writerows(chat_data_list)

            print(“-” * 50)

            print(f”✅ 成功!共导出 {len(chat_data_list)} 个群组/频道。”)

            print(f”📁 文件已保存为: {os.path.abspath(filename)}”)

        except Exception as e:

            print(f”❌ 导出文件失败: {e}”)

    else:

        print(“⚠️ 未找到任何群组或频道。”)

async def main():

    await client.start(phone=PHONE_NUMBER, password=TWO_STEP_PASSWORD)

    print(“✅ 登录成功”)

    await list_and_export_chats()

if __name__ == “__main__”:

    with client:

        client.loop.run_until_complete(main())

TG-forward-videos-enterprise

import asyncio

import os

import random

import sqlite3

import re

import logging

from logging.handlers import RotatingFileHandler

from datetime import datetime, timezone

from telethon import TelegramClient, events

from telethon.tl.types import MessageMediaDocument, DocumentAttributeFilename, DocumentAttributeVideo

from telethon.errors import FloodWaitError, ChatForwardsRestrictedError, SecurityError

from dotenv import load_dotenv

# ==================== 0. 日志配置 ====================

logging.basicConfig(

    level=logging.INFO,

    format=’%(asctime)s – %(levelname)s – %(message)s’,

    handlers=[

        RotatingFileHandler(“bot.log”, maxBytes=5*1024*1024, backupCount=5, encoding=’utf-8′),

        logging.StreamHandler()

    ]

)

logger = logging.getLogger(“Bot”)

# ==================== 配置与数据库 ====================

load_dotenv()

START_TIME = datetime.now(timezone.utc)

class DBManager:

    def __init__(self, db_path=”bot_data.db”):

        self.conn = sqlite3.connect(db_path, check_same_thread=False)

        self.cursor = self.conn.cursor()

        self._create_tables()

    def _create_tables(self):

        # 记录已发送的视频Key (去重)

        self.cursor.execute(”’CREATE TABLE IF NOT EXISTS video_keys

                            (video_key TEXT PRIMARY KEY, target_msg_id INTEGER, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)”’)

        # 记录每个频道的扫描进度

        self.cursor.execute(”’CREATE TABLE IF NOT EXISTS channel_progress

                            (channel_id TEXT PRIMARY KEY, last_msg_id INTEGER)”’)

        self.conn.commit()

    def is_video_exists(self, video_key):

        self.cursor.execute(“SELECT 1 FROM video_keys WHERE video_key = ?”, (video_key,))

        return self.cursor.fetchone() is not None

    def add_video_key(self, video_key, target_msg_id):

        self.cursor.execute(“INSERT OR REPLACE INTO video_keys (video_key, target_msg_id) VALUES (?, ?)”, (video_key, target_msg_id))

        self.conn.commit()

    def get_progress(self, channel_id):

        self.cursor.execute(“SELECT last_msg_id FROM channel_progress WHERE channel_id = ?”, (str(channel_id),))

        res = self.cursor.fetchone()

        return res[0] if res else 0

    def update_progress(self, channel_id, last_msg_id):

        self.cursor.execute(“INSERT OR REPLACE INTO channel_progress (channel_id, last_msg_id) VALUES (?, ?)”, (str(channel_id), last_msg_id))

        self.conn.commit()

    def get_today_count(self):

        self.cursor.execute(“SELECT count(*) FROM video_keys WHERE timestamp >= date(‘now’)”)

        return self.cursor.fetchone()[0]

db = DBManager()

# ==================== 环境变量处理 ====================

api_id = int(os.getenv(“API_ID”))

api_hash = os.getenv(“API_HASH”)

PHONE_NUMBER = os.getenv(“PHONE_NUMBER”)

TWO_STEP_PASSWORD = os.getenv(“TWO_STEP_PASSWORD”)

TARGET_CHANNEL = os.getenv(“TARGET_CHANNEL”) # 必须是单个 ID

ADMIN_ID = int(os.getenv(“ADMIN_ID”, 0))

def parse_channel(ch):

    ch = ch.strip()

    if ch.lstrip(“-“).isdigit():

        return int(ch)

    return ch

# 解析多个源频道

SOURCE_CHANNELS_PARSED = [

    parse_channel(ch)

    for ch in os.getenv(“SOURCE_CHANNELS”, “”).split(“,”)

    if ch.strip()

]

SCAN_LIMIT = int(os.getenv(“SCAN_LIMIT”, 50))

MAX_CAPTION_LENGTH = int(os.getenv(“MAX_CAPTION_LENGTH”, 1024))

# ========== 🛡️ 全能过滤配置 (0为不限制) ==========

MIN_FILE_SIZE = int(os.getenv(“MIN_FILE_SIZE”, 0))  

MAX_FILE_SIZE = int(os.getenv(“MAX_FILE_SIZE”, 0))

MIN_DURATION = int(os.getenv(“MIN_DURATION”, 0))

MAX_DURATION = int(os.getenv(“MAX_DURATION”, 0))

MIN_WIDTH = int(os.getenv(“MIN_WIDTH”, 0))  

MIN_HEIGHT = int(os.getenv(“MIN_HEIGHT”, 0))

client = TelegramClient(“user_session”, api_id, api_hash)

forward_queue = asyncio.Queue(maxsize=100)

CHANNEL_NAME_CACHE = {} # 频道名称缓存池

# ==================== 广告清洗配置 ====================

AD_PATTERNS = [

    r”@\w+”, r”https?://\S+”, r”t\.me/\S+”,

    r”加入频道”, r”点击关注”, r”Поддержать проект.*”,

    r”本台独家”, r”加微信”, r”招募”, r”更多资源”

]

def clean_caption(text):

    if not text: return “”

    for pattern in AD_PATTERNS:

        text = re.sub(pattern, “”, text, flags=re.IGNORECASE)

    text = re.sub(r’\n\s*\n‘, ‘\n’, text)

    return text.strip()

# ==================== 🛠️ 核心:全能过滤函数 ====================

def is_video_eligible(message):

    # 1. 基础格式检查

    if not message.media or not isinstance(message.media, MessageMediaDocument): return False

    doc = message.media.document

    if not doc.mime_type or not doc.mime_type.startswith(“video”): return False

    # 2. 文件大小检查

    if MIN_FILE_SIZE > 0 and doc.size < MIN_FILE_SIZE: return False

    if MAX_FILE_SIZE > 0 and doc.size > MAX_FILE_SIZE: return False

    # 3. 提取视频属性 (时长、分辨率)

    video_attr = next((a for a in doc.attributes if isinstance(a, DocumentAttributeVideo)), None)

    if video_attr:

        # 圆形视频过滤 (Video Notes)

        if getattr(video_attr, ’round_message’, False): return False

        # 时长检查

        duration = video_attr.duration

        if MIN_DURATION > 0 and duration < MIN_DURATION: return False

        if MAX_DURATION > 0 and duration > MAX_DURATION: return False

        # 画质/分辨率检查

        width = video_attr.w

        height = video_attr.h

        if MIN_WIDTH > 0 and width < MIN_WIDTH: return False

        if MIN_HEIGHT > 0 and height < MIN_HEIGHT: return False

    return True

def get_video_key(message):

    doc = message.media.document

    filename = next((a.file_name for a in doc.attributes if isinstance(a, DocumentAttributeFilename)), “v.mp4”)

    # 唯一ID: 文件名 + 大小 + 原始MessageID (简单且有效)

    return f”{filename}_{doc.size}_{doc.id}”

# ==================== 消费者:转发出口 (优化版) ====================

async def worker():

    logger.info(“👷 转发消费者已启动,等待任务…”)

    while True:

        msg, video_key, source_ch_id = await forward_queue.get()

        try:

            # 1. 尝试刷新消息对象(防止链接失效)

            refreshed_msgs = await client.get_messages(int(source_ch_id), ids=[msg.id])

            if refreshed_msgs and refreshed_msgs[0]:

                msg = refreshed_msgs[0]

                if not is_video_eligible(msg):

                    logger.warning(f”⚠️ 视频 {msg.id} 已不符合条件或失效,跳过”)

                    forward_queue.task_done()

                    continue

            else:

                logger.warning(f”⚠️ 无法刷新消息 {msg.id},尝试使用缓存旧引用”)

        except Exception as e:

            logger.warning(f”⚠️ 刷新消息失败: {e},尝试强行转发”)

        # 2. 获取频道名称 (带缓存)

        source_id_int = int(source_ch_id)

        source_name = str(source_id_int) # 默认用ID

        if source_id_int in CHANNEL_NAME_CACHE:

             source_name = CHANNEL_NAME_CACHE[source_id_int]

        else:

            try:

                # 仅第一次请求API

                entity = await client.get_entity(source_id_int)

                title = getattr(entity, ‘title’, str(source_id_int))

                CHANNEL_NAME_CACHE[source_id_int] = title

                source_name = title

            except:

                pass # 获取失败保持默认ID

        caption = clean_caption(msg.message or “”)[:MAX_CAPTION_LENGTH]

        while True:

            try:

                # 3. 发送文件

                sent_msg = await client.send_file(TARGET_CHANNEL, file=msg.media, caption=caption, silent=True)

                # 4. 记录数据库

                db.add_video_key(video_key, sent_msg.id)

                db.update_progress(source_ch_id, msg.id)

                logger.info(f”✅ 转发成功 | 来源: {source_name} | ID: {msg.id}”)

                # 5. 防风控延迟 (2-5秒)

                await asyncio.sleep(random.uniform(1, 3))

                break

            except (ChatForwardsRestrictedError, SecurityError):

                logger.error(f”⛔ 无法转发 (ID: {msg.id}): 源频道开启了内容保护,跳过”)

                break

            except FloodWaitError as e:

                logger.warning(f”⏳ 触发风控 FloodWait,暂停 {e.seconds} 秒…”)

                await asyncio.sleep(e.seconds + 5)

            except Exception as e:

                logger.error(f”❌ 转发异常: {e}”, exc_info=True)

                break

        forward_queue.task_done()

# ==================== 管理员指令 ====================

@client.on(events.NewMessage(pattern=’/status’))

async def status_handler(event):

    if ADMIN_ID == 0 or event.sender_id != ADMIN_ID: return

    uptime = datetime.now(timezone.utc) – START_TIME

    today_count = db.get_today_count()

    queue_size = forward_queue.qsize()

    log_size = “0”

    if os.path.exists(“bot.log”):

        log_size = f”{os.path.getsize(‘bot.log’) / 1024 / 1024:.2f}”

    msg = (

        f”🤖 **机器人运行状态**\n”

        f”━━━━━━━━━━━━━━\n”

        f”⏱️ **运行**: {str(uptime).split(‘.’)[0]}\n”

        f”📊 **今日**: {today_count}\n”

        f”📥 **队列**: {queue_size}\n”

        f”📝 **日志**: {log_size} MB\n”

        f”📏 **缓存频道**: {len(CHANNEL_NAME_CACHE)} 个”

    )

    await event.reply(msg)

# ==================== 生产者 ====================

async def scan_channel(channel_input, semaphore):

    async with semaphore:

        try:

            entity = await client.get_entity(channel_input)

            channel_id = str(entity.id)

            # 预存一下名字进缓存

            CHANNEL_NAME_CACHE[entity.id] = getattr(entity, ‘title’, str(entity.id))

            last_id = db.get_progress(channel_id)

            logger.info(f”🔍 开始扫描: {CHANNEL_NAME_CACHE[entity.id]} (起点ID: {last_id})”)

            # 扫描历史 (SCAN_LIMIT 控制条数)

            async for msg in client.iter_messages(entity, min_id=last_id, limit=SCAN_LIMIT if SCAN_LIMIT > 0 else None, reverse=True):

                if not is_video_eligible(msg): continue

                key = get_video_key(msg)

                if db.is_video_exists(key): continue

                await forward_queue.put((msg, key, channel_id))

                logger.info(f”📥 [历史] 入队: {msg.id}”)

            logger.info(f”🏁 扫描完成: {CHANNEL_NAME_CACHE[entity.id]}”)

        except Exception as e:

            logger.error(f”❌ 扫描失败 {channel_input}: {e}”)

@client.on(events.NewMessage(chats=SOURCE_CHANNELS_PARSED))

async def handler(event):

    if is_video_eligible(event.message):

        key = get_video_key(event.message)

        if not db.is_video_exists(key):

            # 将消息推入队列

            await forward_queue.put((event.message, key, str(event.chat_id)))

            # 尝试简易记录名称 (如果是Entity)

            chat = await event.get_chat()

            if chat:

                CHANNEL_NAME_CACHE[chat.id] = getattr(chat, ‘title’, str(chat.id))

            logger.info(f”⚡ [实时] 入队: {event.message.id}”)

async def main():

    await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)

    logger.info(f”🚀 启动成功 | 监听频道: {len(SOURCE_CHANNELS_PARSED)} 个 | 过滤生效中”)

    # 启动消费者 (Worker)

    asyncio.create_task(worker())

    # 启动历史扫描 (Semaphore 限制并发数为 1,稳如老狗)

    scan_semaphore = asyncio.Semaphore(1)

    scan_tasks = [scan_channel(ch, scan_semaphore) for ch in SOURCE_CHANNELS_PARSED]

    await asyncio.gather(*scan_tasks)

    await client.run_until_disconnected()

if __name__ == “__main__”:

    with client:

TG-forward-videos

import asyncio

import os

import random

import json

from datetime import datetime, timezone

from telethon import TelegramClient, events

from telethon.tl.types import (

    MessageMediaDocument,

    DocumentAttributeFilename,

    DocumentAttributeVideo

)

from telethon.errors import FloodWaitError

from dotenv import load_dotenv

# ==================== 配置读取 ====================

load_dotenv()

api_id = int(os.getenv(“API_ID”))

api_hash = os.getenv(“API_HASH”)

PHONE_NUMBER = os.getenv(“PHONE_NUMBER”)

TWO_STEP_PASSWORD = os.getenv(“TWO_STEP_PASSWORD”)

SOURCE_CHANNELS = [ch.strip() for ch in os.getenv(“SOURCE_CHANNELS”, “”).split(“,”) if ch.strip()]

TARGET_CHANNEL = os.getenv(“TARGET_CHANNEL”)

MIN_FILE_SIZE = int(os.getenv(“MIN_FILE_SIZE”, 200 * 1024 * 1024))

MAX_FORWARD_COUNT = int(os.getenv(“MAX_FORWARD_COUNT”, 0))

SCAN_LIMIT = int(os.getenv(“SCAN_LIMIT”, 50))   # 0 = 全量历史

MIN_DURATION = int(os.getenv(“MIN_DURATION”, 0))

MAX_DURATION = int(os.getenv(“MAX_DURATION”, 0))

MAX_CAPTION_LENGTH = int(os.getenv(“MAX_CAPTION_LENGTH”, 1024))

START_DATE = os.getenv(“START_DATE”)

END_DATE = os.getenv(“END_DATE”)

start_date = datetime.strptime(START_DATE, “%Y-%m-%d”).replace(tzinfo=timezone.utc) if START_DATE else None

end_date = datetime.strptime(END_DATE, “%Y-%m-%d”).replace(tzinfo=timezone.utc) if END_DATE else None

client = TelegramClient(“user_session”, api_id, api_hash)

VIDEO_KEYS_FILE = “video_keys.json”

PROGRESS_FILE = “channel_progress.json”

# ==================== JSON 持久化 ====================

def load_json(path, default):

    if os.path.exists(path):

        try:

            with open(path, “r”, encoding=”utf-8″) as f:

                return json.load(f)

        except Exception:

            return default

    return default

def save_json(path, data):

    try:

        with open(path, “w”, encoding=”utf-8″) as f:

            json.dump(data, f, ensure_ascii=False, indent=2)

    except Exception as e:

        print(f”❌ 保存 {path} 失败: {e}”)

existing_video_keys = load_json(VIDEO_KEYS_FILE, {})

channel_progress = load_json(PROGRESS_FILE, {})

# ==================== 工具函数 ====================

def get_duration_from_doc(doc):

    for attr in doc.attributes:

        if isinstance(attr, DocumentAttributeVideo):

            return attr.duration

    return 0

def get_video_key(message):

    doc = message.media.document

    filename = next(

        (a.file_name for a in doc.attributes if isinstance(a, DocumentAttributeFilename)),

        “video.mp4”

    )

    return f”{filename}_{doc.size}_{doc.id}”

def is_video_eligible(message):

    if not message.media or not isinstance(message.media, MessageMediaDocument):

        return False

    doc = message.media.document

    if not doc.mime_type or not doc.mime_type.startswith(“video”):

        return False

    if doc.size < MIN_FILE_SIZE:

        return False

    duration = get_duration_from_doc(doc)

    if MIN_DURATION > 0 and duration < MIN_DURATION:

        return False

    if MAX_DURATION > 0 and duration > MAX_DURATION:

        return False

    if start_date and message.date < start_date:

        return False

    if end_date and message.date > end_date:

        return False

    return True

# ==================== 转发函数 ====================

async def forward_message(message, video_key):

    caption = message.message or “”

    if len(caption) > MAX_CAPTION_LENGTH:

        caption = caption[:MAX_CAPTION_LENGTH] + “…”

    try:

        await client.send_file(

            TARGET_CHANNEL,

            file=message.media,

            caption=caption,

            silent=True

        )

        print(f”✅ 转发成功 message_id={message.id}”)

        existing_video_keys[video_key] = message.id

        save_json(VIDEO_KEYS_FILE, existing_video_keys)

        await asyncio.sleep(random.uniform(1, 4))

    except FloodWaitError as e:

        print(f”⏳ FloodWait {e.seconds}s”)

        await asyncio.sleep(e.seconds)

        await forward_message(message, video_key)

# ==================== 历史批量转发 ====================

async def batch_forward_latest_videos():

    for source_channel in SOURCE_CHANNELS:

        source = await client.get_entity(source_channel)

        channel_id = str(source.id)

        last_id = channel_progress.get(channel_id, 0)

        limit = SCAN_LIMIT if SCAN_LIMIT > 0 else None

        mode = “全量历史” if limit is None else f”最近 {limit} 条”

        print(f”\n📦 扫描 {source_channel}({mode}),last_message_id={last_id}”)

        count = 0

        async for message in client.iter_messages(

            source,

            min_id=last_id,

            limit=limit,

            reverse=True    # 🔑 旧 → 新

        ):

            if not is_video_eligible(message):

                continue

            video_key = get_video_key(message)

            if video_key in existing_video_keys:

                continue

            try:

                await forward_message(message, video_key)

                # ✅ 成功后推进断点

                channel_progress[channel_id] = message.id

                save_json(PROGRESS_FILE, channel_progress)

                count += 1

                if MAX_FORWARD_COUNT > 0 and count >= MAX_FORWARD_COUNT:

                    print(f”⏹️ 达到最大转发数量 {MAX_FORWARD_COUNT}”)

                    break

            except Exception as e:

                print(f”❌ message_id={message.id} 失败,中断本频道: {e}”)

                break

        print(f”📁 {source_channel} 本轮完成,转发 {count} 个视频”)

# ==================== 实时监听 ====================

@client.on(events.NewMessage(chats=SOURCE_CHANNELS))

async def handler(event):

    source = await event.get_chat()

    channel_id = str(source.id)

    message = event.message

    if not is_video_eligible(message):

        return

    video_key = get_video_key(message)

    if video_key in existing_video_keys:

        return

    await forward_message(message, video_key)

    # ✅ 实时消息同样推进断点

    channel_progress[channel_id] = message.id

    save_json(PROGRESS_FILE, channel_progress)

# ==================== 主程序 ====================

async def main():

    await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)

    print(“✅ Telegram 登录成功”)

    await batch_forward_latest_videos()

    print(“⏳ 开始实时监听新视频…”)

    await client.run_until_disconnected()

if __name__ == “__main__”:

    with client:

        client.loop.run_until_complete(main())

Koyeb成功注册秘诀

在当今的技术世界中,无服务器(serverless)计算正在迅速成为一种流行的选择。今天,我想向大家介绍一个非常有前途的平台——Koyeb,它致力于简化云计算的部署和管理。随着技术的不断进步,无服务器计算将变得越来越重要。Koyeb作为这一领域的新星,凭借其强大的功能和优雅的设计,正在吸引越来越多的开发者和企业加入。如果你还没有尝试过Koyeb,不妨现在就去体验一下,相信你会有意想不到的收获!

介绍

什么是Koyeb?

Koyeb是一家提供无服务器计算平台的公司。简单来说,它让开发者可以专注于编写和部署代码,而无需担心服务器配置、维护和扩展等繁琐事务。

Koyeb的主要特点

  1. 无服务器架构
    • 不用管理底层基础设施,只需专注于代码开发和部署。
  2. 自动扩展
    • 根据流量和负载自动调整计算资源,确保应用在高峰期稳定运行。
  3. 全球分布
    • 在全球多个地区有数据中心,可以选择最接近用户的数据中心,减少延迟。
  4. 多语言支持
    • 支持Python、Node.js、Go等多种编程语言和框架。
  5. 集成和自动化
    • 与GitHub、GitLab等版本控制系统无缝集成,支持CI/CD流水线,实现代码自动化部署。
  6. 高可用性和容错性
    • 基础设施设计考虑了高可用性和容错性,保障应用在故障时仍能运行。
  7. 管理和监控
    • 提供详细的应用监控和日志记录功能,帮助开发者了解应用的运行状况和性能瓶颈。
  8. 安全性
    • 提供数据加密、身份验证和权限管理等多种安全措施,保障数据和应用的安全。

为什么选择Koyeb?

Koyeb通过其简便、高效且灵活的云计算解决方案,为开发者提供了从个人项目到大型企业应用的全面支持。如果你希望简化云计算部署和管理过程,Koyeb无疑是一个值得考虑的选择。

注册准备

  • 国外大厂邮箱(OutlookGmail、自定义域名邮箱)
  • 指纹浏览器(Hubstudio
  • 国外网络环境(国外ISP)
  • 纯度检测网站(ISP检测
  • 注册koyeb网站(Koyeb

指纹浏览器

  1. 下载、安装、注册账号Koyeb成功注册秘诀-kejilandKoyeb成功注册秘诀-kejilandKoyeb成功注册秘诀-kejiland
  2. 新建环境Koyeb成功注册秘诀-kejiland
  3. 代理环境Sock5,提供一个Sock5代理协议IP:端口:账号:密码
    PS:时效性不知,请在评论区留言!(Socks5)104.164.11.167:8636:Misaka:Misaka
  4. 代理配置Koyeb成功注册秘诀-kejiland
  5. 打开内置浏览器,检测环境是否为ISP干净环境(Good IP),如果是可以下一步啦Koyeb成功注册秘诀-kejilandKoyeb成功注册秘诀-kejiland

注册koyeb

  • 注册koyeb,建议大厂邮箱注册。以下为注册成功。Koyeb成功注册秘诀-kejilandKoyeb成功注册秘诀-kejilandKoyeb成功注册秘诀-kejilandKoyeb成功注册秘诀-kejiland

常见问题

  1. 注册遇到绑卡,无法通过注册
    • 注册环境不干净,请使用国外ISP环境注册。
  2. 通过注册,但是部署完项目几天后项目被停止
    • koyeb长久不登陆一次会暂停项目服务;一般为7天。
  3. 通过注册,但是几天后被封号
    • 未ISP环境注册。
  4. 部署项目不成功,提示环境不安全和项目违反网站规则
    • 通过ISP环境首次注册成功后,部署任何项目都会成功并不会删号。
  5. 虽然注册成功,但是提示黄色信息认证
    • 代表未通过ISP环境注册,项目可部署,但几天后会封号,再次提交信息认证无效,可通过本教程重新注册。
  6. 免费服务
    • koyeb免费提供两个免费服务:德国和美国;仅支持创建一个免费项目。
  7. 付费服务
    • koyeb提供多个付费服务:新加坡、日本和美国,服务均5$/月,绑卡可每月享受5.5$。

注册成功

  • 用过以上截图注册成功后,网站会进入首页,首页不会出现任何信息认证提交或者绑卡信息,此为注册成功。

温馨提示

  • 请勿在Koyeb平台上部署任何违反其使用规则的内容。禁止大量注册账号或过度使用资源,以免造成资源浪费。请遵守平台的使用规范,合理利用资源,共同维护一个良好的使用环境。

Wasmer 部署 WordPress 并绑定自定义域名教程

随着云计算和容器化技术的普及,开发者和网站管理员们对轻量、高效、灵活的部署方式需求越来越大。Wasmer 作为一款强大的 WebAssembly 运行时,提供了在多种平台上高性能运行 WebAssembly 程序的能力。本文将详细介绍如何在 Wasmer 平台上部署 WordPress,安装并配置 Farallon 主题,同时绑定自定义域名,实现一个独立、个性化的博客网站。

什么是 Wasmer?

Wasmer.io 是一款开源的 WebAssembly 运行时,支持通过 WebAssembly 技术在服务器端运行各种应用。借助 Wasmer,你可以:

  • 运行用多种语言编写的 WebAssembly 模块。
  • 实现跨平台部署,节省资源。
  • 提高安全性,利用 WebAssembly 的沙盒机制。

将 WordPress 这种经典的 PHP + MySQL 网站运行于 Wasmer,既能利用现代技术优势,又能保证熟悉的 WordPress 体验。

网站介绍:WordPress + Farallon 主题

WordPress 作为全球最流行的内容管理系统(CMS),其强大、灵活和完善的生态为搭建博客提供了坚实基础。Farallon 是 WordPress 上一款优雅简洁、响应式设计的免费主题,适合写作和个人博客,具有:

  • 干净简洁的界面。
  • 高度可定制化。
  • 良好的移动端兼容性。
  • 支持多种布局和小工具。

通过此次部署,您将能搭建一个美观、轻便的个人博客。

一、准备工作

  1. 注册 Wasmer 账号:访问 https://wasmer.io 注册并登录。
  2. 购买或准备自定义域名:如从阿里云、华为云处购买,Hidns免费处申请。
  3. 注册Gcore托管平台:添加域名到Gcore,即可使用gcore的CDN和SSL。
  4. 准备 WordPress 镜像及数据库:wasmer自带一体化数据库,不需要额外的配置。

二、在 Wasmer 部署 WordPress

1. 注册并登录Wasmer

Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland

2. 部署 WordPress

Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland

3. 访问 WordPress 安装页面

访问地址:https://wordpress-mumt7.wasmer.app/

Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland

三、安装和配置 Farallon 主题

  1. 登录 WordPress 后台,访问 外观 > 主题
    Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
  2. 点击 添加新主题,上传压缩包 Farallon
    Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
    Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
  3. 找到主题后,点击 安装,安装完成后点击 启用
    Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
  4. 进入 自定义 > 主题选项 配置界面,根据需求调整网站配色、布局、小工具位置等。
  5. 发布首篇博客文章,体验主题效果。访问地址:https://www.2024921.xyz
    Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland

四、托管Gcore平台

1. 注册并托管

Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland

2. 更新名称服务器

复制ns1.gcorelabs.net``ns2.gcdn.services到HiDNS的名称服务器中。

Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland

五、绑定自定义域名

1. 申请域名

HiDNS 永久免费域名计划网址,申请域名时不要只发工单,也要把域名申请一下。
Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland

Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland

目前官方鼓励将域名使用于 Blog / 其他活跃的 Web 服务,并为域名申请 SSL 证书。符合该条件的用户可以直接注册,无须优惠劵,HiDNS 团队人员会在后台激活使用于“ Blog / 其他活跃的 Web 服务”的域名。域名激活后,7 天内完成 Blog 的绑定,会被设置为永久免费域名,只要域名不闲置、不滥用将永远免费。

2. 解析域名

登录域名管理控制台,添加一条 A 记录:

  • 主机记录:@ 或自定义子域名(如 www
  • 记录类型:A
  • 记录值:Wasmer 实例公网 IP 地址
  • TTL:默认即可

等待 DNS 生效(通常几分钟到一小时不等)。

3. 配置 Wasmer 绑定域名

在 Wasmer 平台:

  • 找到你的 WordPress 服务对应的项目设置。
  • 在“域名”或“网络”栏目,绑定你的自定义域名。Wasmer 部署 WordPress 并绑定自定义域名教程-kejilandWasmer 部署 WordPress 并绑定自定义域名教程-kejiland
  • 将Value值添加到Gcore的CDN中,即可绑定成功。Wasmer 部署 WordPress 并绑定自定义域名教程-kejiland
  • 绑定后,Wasmer 会自动配置监听你的域名请求。Wasmer 部署 WordPress 并绑定自定义域名教程-kejilandWasmer 部署 WordPress 并绑定自定义域名教程-kejiland

六、总结

通过 Wasmer,我们实现了现代、轻量级的 WordPress 部署方式,极大提升了网站的安全性和跨平台能力。搭配 Farallon 主题,赋予博客简洁优雅的外观体验。绑定自定义域名后,网站变得更具专业性。无论是个人博客还是小型站点,这套方案都非常适合。

如果你正在寻找一套高性能又易用的部署方案,试试 Wasmer 无疑是一个不错的选择!