Cursor免费版,无限续杯满血复活

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import time
import logging
import sys
import argparse
from typing import Optional
import random
from enum import Enum
from dotenv import load_dotenv
import datetime

# 设置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 尝试加载.env文件中的环境变量
load_dotenv()

# 导入必要的模块
try:
    from DrissionPage import Chromium, ChromiumOptions
    import requests
    import sqlalchemy
    from sqlalchemy import create_engine, Column, Integer, String, BigInteger, ForeignKey, Boolean
    from sqlalchemy.orm import sessionmaker, relationship, declarative_base
    import pymysql
except ImportError:
    logger.error("缺少必要的依赖,请安装: pip install DrissionPage requests python-dotenv sqlalchemy pymysql")
    sys.exit(1)

# 数据库配置
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 3306))
DB_USER = os.getenv('DB_USER', 'root')
DB_PASSWORD = os.getenv('DB_PASSWORD', '')
DB_NAME = os.getenv('DB_NAME', 'cursor_accounts')

# 创建数据库引擎和会话
database_url = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(database_url)
Session = sessionmaker(bind=engine)
Base = declarative_base()

# 定义账号模型
class Account(Base):
    __tablename__ = 'accounts'

    id = Column(Integer, primary_key=True, autoincrement=True)
    email = Column(String(255), unique=True, nullable=False)
    password = Column(String(255), nullable=False)
    first_name = Column(String(100), nullable=True)
    last_name = Column(String(100), nullable=True)
    create_time = Column(BigInteger, nullable=False)
    expire_time = Column(BigInteger, nullable=False)
    is_used = Column(Integer, default=0)  # 0: 未使用, 1: 已使用
    is_deleted = Column(Integer, default=0)  # 0: 未删除, 1: 已删除
    user_id = Column(Integer, nullable=True)  # 关联到用户ID

    def to_dict(self):
        data = {
            'id': self.id,
            'email': self.email,
            'password': self.password,
            'first_name': self.first_name,
            'last_name': self.last_name,
            'create_time': self.create_time,
            'expire_time': self.expire_time,
            'is_used': self.is_used,
            'is_deleted': self.is_deleted,
            'expire_time_fmt': datetime.datetime.fromtimestamp(self.expire_time).strftime('%Y-%m-%d %H:%M:%S')
        }
        data['user_id'] = self.user_id
        return data

# 确保表存在
def ensure_tables_exist():
    try:
        Base.metadata.create_all(engine)
        logger.info("数据库表已创建/确认存在")
    except Exception as e:
        logger.error(f"创建数据库表失败: {str(e)}")
        raise

# 保存账号到数据库
def save_account_to_db(account_data, user_id=None):
    session = Session()
    try:
        # 检查邮箱是否已存在
        existing = session.query(Account).filter_by(email=account_data['email']).first()
        if existing:
            logger.warning(f"邮箱 {account_data['email']} 已存在于数据库中")
            return False
        
        # 创建新账号记录
        account = Account(
            email=account_data['email'],
            password=account_data['password'],
            first_name=account_data['first_name'],
            last_name=account_data['last_name'],
            create_time=account_data['create_time'],
            expire_time=account_data['expire_time'],
            is_used=0,
            is_deleted=0,
            user_id=user_id
        )
        
        # 添加到数据库并提交
        session.add(account)
        session.commit()
        logger.info(f"账号 {account_data['email']} 已保存到数据库")
        return True
    except Exception as e:
        session.rollback()
        logger.error(f"保存账号到数据库失败: {str(e)}")
        return False
    finally:
        session.close()

class BrowserManager:
    def __init__(self):
        self.browser = None

    def init_browser(self, user_agent=None):
        """初始化浏览器"""
        co = self._get_browser_options(user_agent)
        self.browser = Chromium(co)
        return self.browser

    def _get_browser_options(self, user_agent=None):
        """获取浏览器配置"""
        co = ChromiumOptions()
        try:
            extension_path = self._get_extension_path("turnstilePatch")
            co.add_extension(extension_path)
        except FileNotFoundError as e:
            logging.warning(f"警告: {e}")

        co.set_pref("credentials_enable_service", False)
        co.set_argument("--hide-crash-restore-bubble")
        co.set_argument("--remote-debugging-port=9222")
        co.set_argument("--remote-debugging-address=0.0.0.0")
        co.set_argument("--no-sandbox")
        co.set_argument("--disable-gpu")
        if user_agent:
            co.set_user_agent(user_agent)

        co.headless()  # 生产环境使用无头模式

        return co

    def _get_extension_path(self, exname='turnstilePatch'):
        """获取插件路径"""
        root_dir = os.getcwd()
        extension_path = os.path.join(root_dir, exname)

        if hasattr(sys, "_MEIPASS"):
            extension_path = os.path.join(sys._MEIPASS, exname)

        if not os.path.exists(extension_path):
            raise FileNotFoundError(f"插件不存在: {extension_path}")

        return extension_path

    def quit(self):
        """关闭浏览器"""
        if self.browser:
            try:
                self.browser.quit()
            except:
                pass

class EmailVerificationHandler:
    def __init__(self, email: str, temp_email_address: str = None):
        self.email = email
        self.temp_email_address = temp_email_address or 'zoowayss@mailto.plus'
        self.session = requests.Session()

    def get_verification_code(self, max_retries=5, retry_interval=60):
        """
        获取验证码,带有重试机制。
        """
        for attempt in range(max_retries):
            try:
                logging.info(f"尝试获取验证码 (第 {attempt + 1}/{max_retries} 次)...")
                verify_code, first_id = self._get_latest_mail_code()
                if verify_code is not None and first_id is not None:
                    self._cleanup_mail(first_id)
                return verify_code
            except Exception as e:
                logging.error(f"获取验证码失败: {e}")
                if attempt < max_retries - 1:
                    logging.error(f"发生错误,{retry_interval} 秒后重试...")
                    time.sleep(retry_interval)
                else:
                    raise Exception(f"获取验证码失败且已达最大重试次数: {e}") from e
        raise Exception(f"经过 {max_retries} 次尝试后仍未获取到验证码。")

    def _get_latest_mail_code(self):
        # 获取邮件列表
        mail_list_url = f"https://tempmail.plus/api/mails?email={self.temp_email_address}&limit=20&epin={self.email}"
        logging.info(f"获取邮件列表: {mail_list_url}")
        mail_list_response = self.session.get(mail_list_url)
        mail_list_data = mail_list_response.json()
        logging.info(f"邮件列表: {mail_list_data}")
        time.sleep(0.5)
        if not mail_list_data.get("result"):
            return None, None

        # 获取最新邮件的ID
        first_id = mail_list_data.get("first_id")
        if not first_id:
            return None, None

        # 获取具体邮件内容
        mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.temp_email_address}&epin={self.email}"
        mail_detail_response = self.session.get(mail_detail_url)
        mail_detail_data = mail_detail_response.json()
        time.sleep(0.5)
        if not mail_detail_data.get("result"):
            return None, None

        # 从邮件文本中提取6位数字验证码
        mail_text = mail_detail_data.get("text", "")
        mail_subject = mail_detail_data.get("subject", "")
        logging.info(f"找到邮件主题: {mail_subject}")
        
        # 尝试匹配连续的6位数字
        import re
        code_match = re.search(r"(?<![a-zA-Z@.])\b\d{6}\b", mail_text)
        
        # 如果没有找到连续6位数字,尝试匹配带空格的6位数字
        if not code_match:
            # 匹配像 "9 7 7 1 8 2" 这样的格式
            space_code_match = re.search(r"(\d\s){5}\d", mail_text)
            if space_code_match:
                # 删除空格得到验证码
                return re.sub(r'\s', '', space_code_match.group()), first_id
        else:
            return code_match.group(), first_id
        
        return None, None
        
    def _cleanup_mail(self, first_id):
        # 构造删除请求的URL和数据
        delete_url = "https://tempmail.plus/api/mails/"
        payload = {
            "email": self.temp_email_address,
            "first_id": first_id,
            "epin": f"{self.email}",
        }

        # 最多尝试5次
        for _ in range(5):
            response = self.session.delete(delete_url, data=payload)
            try:
                result = response.json().get("result")
                if result is True:
                    return True
            except:
                pass
            # 如果失败,等待0.5秒后重试
            time.sleep(0.5)
        return False

class EmailGenerator:
    def __init__(
        self,
        domain: str = None
    ):
        self.domain = domain or os.getenv('EMAIL_DOMAIN', '你的邮箱')
        self.names = self.load_names()
        self.default_first_name = self.generate_random_name()
        self.default_last_name = self.generate_random_name()

    def load_names(self):
        try:
            with open("names-dataset.txt", "r") as file:
                return file.read().split()
        except FileNotFoundError:
            logging.warning("names_file_not_found")
            # 默认名字集合
            return ["John", "Jane", "Alex", "Emma", "Michael", "Olivia", "William", "Sophia", 
                    "James", "Isabella", "Robert", "Mia", "David", "Charlotte", "Joseph", "Amelia"]

    def generate_random_name(self):
        """生成随机用户名"""
        return random.choice(self.names)

    def generate_email(self, length=4):
        """生成随机邮箱地址"""
        length = random.randint(0, length)  # 生成0到length之间的随机整数
        timestamp = str(int(time.time()))[-length:]  # 使用时间戳的最后length位数字
        return f"{self.default_first_name}{timestamp}@{self.domain}"
        
    def generate_random_password(self, length=12):
        """生成随机密码"""
        chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*"
        return "".join(random.choices(chars, k=length))

    def get_account_info(self):
        """获取完整的账号信息"""
        return {
            "email": self.generate_email(),
            "password": self.generate_random_password(),
            "first_name": self.default_first_name,
            "last_name": self.default_last_name,
        }

class TurnstileError(Exception):
    """Turnstile验证相关异常"""
    pass

class VerificationStatus(Enum):
    """验证状态枚举"""
    
    PASSWORD_PAGE = "@name=password"
    CAPTCHA_PAGE = "@data-index=0"
    ACCOUNT_SETTINGS = "Account Settings"

def check_verification_success(tab) -> Optional[VerificationStatus]:
    """
    检查验证是否成功
    """
    for status in VerificationStatus:
        if tab.ele(status.value):
            logging.info(f"验证成功: {status.name}")
            return status
    return None

def handle_turnstile(tab, max_retries: int = 2, retry_interval: tuple = (1, 2)) -> bool:
    """
    处理Turnstile验证
    """
    logging.info("检测Turnstile")
    retry_count = 0

    try:
        while retry_count < max_retries:
            retry_count += 1
            logging.debug(f"重试验证计数: {retry_count}")

            try:
                # 定位验证框架元素
                challenge_check = (
                    tab.ele("@id=cf-turnstile", timeout=2)
                    .child()
                    .shadow_root.ele("tag:iframe")
                    .ele("tag:body")
                    .sr("tag:input")
                )

                if challenge_check:
                    logging.info("检测到Turnstile")
                    # 点击验证前随机延迟
                    time.sleep(random.uniform(1, 3))
                    challenge_check.click()
                    time.sleep(2)

                    # 检查验证结果
                    if check_verification_success(tab):
                        logging.info("Turnstile验证通过")
                        return True

            except Exception as e:
                logging.debug(f"当前尝试失败: {str(e)}")

            # 检查是否已验证
            if check_verification_success(tab):
                return True

            # 下一次尝试前随机延迟
            time.sleep(random.uniform(*retry_interval))

        # 超过最大重试次数
        logging.error(f"经过{max_retries}次尝试后验证失败")
        logging.error(
            "更多信息请访问开源项目: https://github.com/chengazhen/cursor-auto-free"
        )
        return False

    except Exception as e:
        error_msg = f"Turnstile验证过程中出错: {str(e)}"
        logging.error(error_msg)
        raise TurnstileError(error_msg)

def save_screenshot(tab, stage: str, timestamp: bool = True) -> None:
    """
    保存页面截图
    """
    try:
        # 创建截图目录
        screenshot_dir = "screenshots"
        if not os.path.exists(screenshot_dir):
            os.makedirs(screenshot_dir)

        # 生成文件名
        if timestamp:
            filename = f"cursor_{stage}_{int(time.time())}.png"
        else:
            filename = f"cursor_{stage}.png"

        filepath = os.path.join(screenshot_dir, filename)

        # 保存截图
        tab.get_screenshot(filepath)
        logging.debug(f"截图已保存: {filepath}")
    except Exception as e:
        logging.warning(f"保存截图失败: {str(e)}")

class Register(object):
    def __init__(self, first_name, last_name, email, password, temp_email_address):
        self.first_name = first_name
        self.last_name = last_name
        self.email = email
        self.password = password
        self.temp_email_address = temp_email_address
        self.tab = BrowserManager().init_browser(get_user_agent()).latest_tab
        self.sign_up_url = 'https://authenticator.cursor.sh/sign-up'
        self.settings_url = 'https://www.cursor.com/settings'
        self.login_url = 'https://authenticator.cursor.sh'
        
    def register(self):
        print(f"注册账号: {self.first_name} {self.last_name}, 邮箱: {self.email}, 密码: {self.password}")
        success = self.sign_up_account(self.tab)
        return success

    def sign_up_account(self, tab):
        logging.info("开始注册账号")
        logging.info(f"访问注册页面: {self.sign_up_url}")
        tab.get(self.sign_up_url)
        email_handler = EmailVerificationHandler(self.email, self.temp_email_address)
        time.sleep(5)
        save_screenshot(tab, "registration_page")
        try:
            if tab.ele("@name=first_name"):
                logging.info("填写个人信息")
                tab.actions.click("@name=first_name").input(self.first_name)
                logging.info(f"输入名字: {self.first_name}")
                time.sleep(random.uniform(1, 2))

                tab.actions.click("@name=last_name").input(self.last_name)
                logging.info(f"输入姓氏: {self.last_name}")
                time.sleep(random.uniform(1, 2))

                tab.actions.click("@name=email").input(self.email)
                logging.info(f"输入邮箱: {self.email}")
                time.sleep(random.uniform(1, 2))

                logging.info("提交个人信息")
                tab.actions.click("@type=submit")

        except Exception as e:
            logging.error(f"注册页面访问失败: {str(e)}")
            return False

        handle_turnstile(tab)

        try:
            if tab.ele("@name=password"):
                logging.info("设置密码")
                tab.ele("@name=password").input(self.password)
                time.sleep(random.uniform(1, 3))

                logging.info("提交密码")
                tab.ele("@type=submit").click()
                logging.info("密码设置完成")

        except Exception as e:
            logging.error(f"密码设置失败: {str(e)}")
            return False

        if tab.ele("This email is not available."):
            logging.error("注册失败: 邮箱已被使用")
            return False

        handle_turnstile(tab)

        while True:
            try:
                if tab.ele("Account Settings"):
                    logging.info("注册成功")
                    break
                if tab.ele("@data-index=0"):
                    logging.info("获取邮箱验证码")
                    code = email_handler.get_verification_code()
                    if not code:
                        logging.error("无法获取验证码")
                        return False

                    logging.info(f"收到验证码: {code}")
                    logging.info("输入验证码")
                    i = 0
                    for digit in code:
                        tab.ele(f"@data-index={i}").input(digit)
                        time.sleep(random.uniform(0.1, 0.3))
                        i += 1
                    logging.info("验证码输入完成")
                    break
            except Exception as e:
                logging.error(f"验证码处理过程错误: {str(e)}")
                return False

        handle_turnstile(tab)
        wait_time = random.randint(1, 2)
        for i in range(wait_time):
            logging.info(f"等待系统处理: 剩余{wait_time-i}秒")
            time.sleep(1)

        logging.info("获取账号信息")
        tab.get(self.settings_url)
        try:
            usage_selector = (
                "css:div.col-span-2 > div > div > div > div > "
                "div:nth-child(1) > div.flex.items-center.justify-between.gap-2 > "
                "span.font-mono.text-sm\\/\\[0\\.875rem\\]"
            )
            usage_ele = tab.ele(usage_selector)
            if usage_ele:
                usage_info = usage_ele.text
                total_usage = usage_info.split("/")[-1].strip()
                logging.info(f"账号使用限制: {total_usage}")
        except Exception as e:
            logging.error(f"获取账号使用信息失败: {str(e)}")

        logging.info("注册完成")
        account_info = f"Cursor账号信息 - 邮箱: {self.email}, 密码: {self.password}"
        logging.info(account_info)
        time.sleep(5)
        return True

def get_user_agent():
    """获取user_agent"""
    try:
        # 使用JavaScript获取user agent
        browser_manager = BrowserManager()
        browser = browser_manager.init_browser()
        user_agent = browser.latest_tab.run_js("return navigator.userAgent")
        browser_manager.quit()
        return user_agent.replace("HeadlessChrome", "Chrome")
    except Exception as e:
        logging.error(f"获取user agent失败: {str(e)}")
        return None

def create_account(domain=None, temp_email_address=None, user_id=None, save_to_db=True):
    """创建新的Cursor账号"""
    try:
        # 确保数据库表存在
        if save_to_db:
            try:
                ensure_tables_exist()
            except Exception as e:
                logger.error(f"数据库表检查/创建失败: {str(e)}")
                # 如果数据库操作失败但仍要继续注册账号,设置为不保存到数据库
                save_to_db = False
                
        # 生成随机名字和账号信息
        email_generator = EmailGenerator(domain=domain)
        account_info = email_generator.get_account_info()
        
        # 从返回的字典中获取信息
        email = account_info["email"]
        password = account_info["password"]
        first_name = account_info["first_name"]
        last_name = account_info["last_name"]
        
        # 如果要保存到数据库,先检查邮箱是否已存在
        if save_to_db:
            session = Session()
            try:
                existing_account = session.query(Account).filter_by(email=email).first()
                if existing_account:
                    session.close()
                    return {'status': 'error', 'message': '该邮箱已被使用,请重试'}
                session.close()
            except Exception as e:
                logger.error(f"检查邮箱是否存在失败: {str(e)}")
                session.close()
        
        # 注册账号
        registration = Register(first_name, last_name, email, password, temp_email_address)
        success = registration.register()
        
        if not success:
            return {'status': 'error', 'message': '注册失败,请稍后再试'}
        
        # 创建账号对象
        create_time = int(time.time())
        expire_time = create_time + (15 * 24 * 60 * 60)  # 15天后过期
        
        account_data = {
            'email': email,
            'password': password,
            'first_name': first_name,
            'last_name': last_name,
            'create_time': create_time,
            'expire_time': expire_time,
            'expire_time_fmt': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(expire_time))
        }
        
        # 保存到数据库
        if save_to_db:
            saved = save_account_to_db(account_data, user_id)
            if not saved:
                logger.warning("账号创建成功但保存到数据库失败")
        
        return {
            'status': 'success',
            'message': '新账号已创建' + (' 并保存到数据库' if save_to_db else ''),
            'account': account_data
        }
    
    except Exception as e:
        return {'status': 'error', 'message': str(e)}

def main():
    parser = argparse.ArgumentParser(description='Cursor账号自动注册工具')
    parser.add_argument('--domain', type=str, help='邮箱域名')
    parser.add_argument('--temp-email', type=str, help='临时邮箱地址')
    parser.add_argument('--user-id', type=int, help='用户ID(保存到数据库时使用)')
    parser.add_argument('--no-save', action='store_true', help='不保存到数据库')
    args = parser.parse_args()
    
    save_to_db = not args.no_save
    if save_to_db:
        print("账号将被保存到数据库")
    else:
        print("账号将不会被保存到数据库")
    
    result = create_account(
        domain=args.domain, 
        temp_email_address=args.temp_email, 
        user_id=args.user_id,
        save_to_db=save_to_db
    )
    
    if result.get('status') == 'success':
        print("\n==== 账号创建成功 ====")
        account = result.get('account')
        print(f"邮箱: {account.get('email')}")
        print(f"密码: {account.get('password')}")
        print(f"创建时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(account.get('create_time')))}")
        print(f"过期时间: {account.get('expire_time_fmt')}")
        print("=====================\n")
        return 0
    else:
        print(f"\n错误: {result.get('message')}")
        return 1

if __name__ == "__main__":
    sys.exit(main())

 

 

发表回复

后才能评论