短信接口防盗刷:技术方案与实战实现 2026-03-13 16:42:43 分类:热点新闻
相关标签:         短信接口         群发短信         106短信平台         手机短信验证码接口         国际短信         网站在线客服系统
,实现一个具备完整防盗刷能力的短信接口,核心包含:接口鉴权、频率限制、验证码生成 / 校验、异常监控等功能。

短信接口防盗刷:技术方案与实战实现


短信验证码是用户身份验证、交易确认的核心手段,一旦短信接口被恶意利用(盗刷),不仅会造成企业短信费用的巨额损失,还可能导致用户信息泄露、资金被盗等严重后果。本文将从盗刷场景、防护原理、技术实现三个维度,详解如何为短信接口构建全方位的防盗刷体系,并提供可直接落地的代码示例。
一、短信接口盗刷的常见场景
批量恶意调用:攻击者通过脚本循环调用接口,批量获取验证码,消耗企业短信额度;
验证码爆破:结合手机号枚举 + 验证码猜测,尝试登录 / 支付等敏感操作;
接口参数篡改:伪造请求参数(如手机号、签名),绕过基础校验;
重放攻击:截取合法请求包,重复提交消耗短信资源。
二、短信接口防盗刷核心防护策略
针对上述场景,需从「请求层 - 校验层 - 风控层」三层构建防护体系:
请求层:接口鉴权、频率限制、IP / 设备黑白名单;
校验层:验证码有效期、唯一性、手机号合法性校验;
风控层:行为分析(如同一 IP / 设备绑定多手机号)、异常行为拦截。
三、技术实现(Python + Flask 示例)
以下以 Python Flask 框架为例,实现一个具备完整防盗刷能力的短信接口,核心包含:接口鉴权、频率限制、验证码生成 / 校验、异常监控等功能。
前置依赖
先安装所需依赖:
bash
运行
pip install flask redis pycryptodome python-dotenv

(Redis 用于存储验证码、频率限制计数;pycryptodome 用于接口签名加密;dotenv 管理配置)
完整代码实现
python
运行
import os
import time
import random
import hashlib from datetime import datetime, timedelta from dotenv import load_dotenv
from flask import Flask, request, jsonify
import redis

# 加载环境变量
load_dotenv()
app = Flask(__name__)

# 初始化 Redis 连接(用于存储验证码、频率限制)
redis_client = redis.Redis(
    host=os.getenv("REDIS_HOST", "localhost"),
    port=int(os.getenv("REDIS_PORT", 6379)),
    password=os.getenv("REDIS_PASSWORD", ""),
    db=0,
    decode_responses=True
)

# 配置项(可放入 .env 文件)
SMS_EXPIRE_SECONDS = 300  # 验证码有效期 5 分钟
REQUEST_LIMIT = 5  # 单个IP/手机号 1分钟内最多请求5次
API_SECRET = os.getenv("API_SECRET", "your_secret_key")  # 接口鉴权密钥
WHITELIST_IPS = ["127.0.0.1"]  # IP白名单(可扩展为数据库存储)

# ---------------------- 核心工具函数 ----------------------
def generate_verification_code(length=6):
    """生成6位数字验证码"""
    return ''.join([str(random.randint(0, 9)) for _ in range(length)])

def check_request_frequency(key, limit=REQUEST_LIMIT, expire=60):
    """
    检查请求频率(防批量调用)
    :param key: 限流标识(IP/手机号)
    :param limit: 限制次数
    :param expire: 时间窗口(秒)
    :return: True-超出限制,False-未超出
    """
    current = redis_client.incr(key)
    if current == 1:
        redis_client.expire(key, expire)
    return current > limit

def verify_api_signature(params, signature):
    """
    验证接口签名(防参数篡改)
    :param params: 请求参数(字典)
    :param signature: 客户端传入的签名
    :return: True-签名合法,False-非法
    """
    # 1. 按参数名排序
    sorted_keys = sorted(params.keys())
    # 2. 拼接参数(key=value)
    sign_str = ''.join([f"{k}={params[k]}" for k in sorted_keys])
    # 3. 拼接密钥并加密
    sign_str += API_SECRET
    actual_sign = hashlib.md5(sign_str.encode()).hexdigest()
    # 4. 对比签名
    return actual_sign == signature

# ---------------------- 接口实现 ----------------------
@app.route("/send-sms", methods=["POST"])
def send_sms():
    """发送短信验证码接口(核心防刷逻辑)"""
    try:
        # 1. 获取请求参数
        req_data = request.json or {}
        mobile = req_data.get("mobile")
        sign = req_data.get("sign")
        timestamp = req_data.get("timestamp")
        client_ip = request.remote_addr

        # 2. 基础参数校验
        if not all([mobile, sign, timestamp]):
            return jsonify({"code": 400, "msg": "参数缺失"}), 400
       
        # 校验手机号格式(简单版,可扩展正则)
        if not (len(mobile) == 11 and mobile.isdigit()):
            return jsonify({"code": 400, "msg": "手机号格式错误"}), 400
       
        # 校验时间戳(防重放攻击,请求有效期5分钟)
        if abs(int(time.time()) - int(timestamp)) > 300:
            return jsonify({"code": 400, "msg": "请求已过期"}), 400

        # 3. IP白名单校验(白名单跳过后续限流,可选)
        if client_ip not in WHITELIST_IPS:
            # 4. 频率限制(IP+手机号双重限流)
            ip_limit_key = f"sms:limit:ip:{client_ip}"
            mobile_limit_key = f"sms:limit:mobile:{mobile}"
            if check_request_frequency(ip_limit_key):
                return jsonify({"code": 429, "msg": "IP请求过于频繁,请稍后再试"}), 429
            if check_request_frequency(mobile_limit_key):
                return jsonify({"code": 429, "msg": "手机号请求过于频繁,请稍后再试"}), 429

        # 5. 接口签名校验(防参数篡改)
        sign_params = {"mobile": mobile, "timestamp": timestamp}
        if not verify_api_signature(sign_params, sign):
            return jsonify({"code": 403, "msg": "签名验证失败"}), 403

        # 6. 生成并存储验证码(Redis)
        code = generate_verification_code()
        redis_key = f"sms:code:{mobile}"
        # 存储时拼接时间戳,便于校验有效期
        redis_client.set(
            redis_key,
            f"{code}|{int(time.time())}",
            ex=SMS_EXPIRE_SECONDS
        )

        # 7. 模拟发送短信(实际项目替换为短信服务商API)
        # sms_provider.send(mobile, f"您的验证码是:{code},有效期5分钟")
        print(f"【模拟发送】手机号:{mobile},验证码:{code}")

        return jsonify({"code": 200, "msg": "验证码发送成功"}), 200

    except Exception as e:
        app.logger.error(f"发送短信异常:{str(e)}")
        return jsonify({"code": 500, "msg": "服务器内部错误"}), 500

@app.route("/verify-sms", methods=["POST"])
def verify_sms():
    """验证码校验接口(防爆破)"""
    try:
        req_data = request.json or {}
        mobile = req_data.get("mobile")
        code = req_data.get("code")

        if not all([mobile, code]):
            return jsonify({"code": 400, "msg": "参数缺失"}), 400

        # 1. 获取存储的验证码
        redis_key = f"sms:code:{mobile}"
        stored_data = redis_client.get(redis_key)
        if not stored_data:
            return jsonify({"code": 400, "msg": "验证码不存在或已过期"}), 400

        # 2. 解析验证码和生成时间
        stored_code, create_time = stored_data.split("|")
        create_time = int(create_time)

        # 3. 校验验证码和有效期
        if code != stored_code:
            # 错误次数计数(防爆破,累计5次锁定10分钟)
            error_key = f"sms:error:{mobile}"
            error_count = redis_client.incr(error_key)
            if error_count == 1:
                redis_client.expire(error_key, 600)
            if error_count >= 5:
                return jsonify({"code": 403, "msg": "验证码错误次数过多,10分钟后重试"}), 403
            return jsonify({"code": 400, "msg": f"验证码错误,剩余{5-error_count}次机会"}), 400

        # 4. 校验通过,删除验证码(防重复使用)
        redis_client.delete(redis_key)
        return jsonify({"code": 200, "msg": "验证码校验成功"}), 200

    except Exception as e:
        app.logger.error(f"校验验证码异常:{str(e)}")
        return jsonify({"code": 500, "msg": "服务器内部错误"}), 500

if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=5000)

代码关键逻辑说明
频率限制:通过 Redis 的 incr 计数,限制单个 IP / 手机号 1 分钟内最多请求 5 次,避免批量调用;
接口签名:客户端请求时需按规则生成签名,服务端校验签名合法性,防止参数被篡改;
时间戳校验:限制请求有效期 5 分钟,防止重放攻击;
验证码防护:
验证码存储在 Redis 并设置有效期,避免永久有效;
校验错误次数累计,超过 5 次锁定手机号 10 分钟,防爆破;
校验成功后立即删除验证码,防止重复使用;
基础校验:手机号格式、参数完整性、IP 白名单(可扩展为动态管理)。
客户端调用示例(Python)
python
运行
import requests
import time
import hashlib

API_SECRET = "your_secret_key"
URL = "http://127.0.0.1:5000/send-sms"

def generate_sign(params, secret):
    sorted_keys = sorted(params.keys())
    sign_str = ''.join([f"{k}={params[k]}" for k in sorted_keys]) + secret
    return hashlib.md5(sign_str.encode()).hexdigest()

# 构造请求参数
mobile = "13800138000"
timestamp = str(int(time.time()))
sign_params = {"mobile": mobile, "timestamp": timestamp}
sign = generate_sign(sign_params, API_SECRET)

# 发送请求
response = requests.post(
    URL,
    json={
        "mobile": mobile,
        "timestamp": timestamp,
        "sign": sign
    }
)
print(response.json())

四、进阶防护建议
接入验证码滑块 / 图形验证:对新设备 / 新 IP,在调用短信接口前增加人机验证,过滤脚本攻击;
风控系统对接:结合用户行为(如注册时长、交易记录)、设备指纹(IMEI、设备号)、地域信息,识别异常请求;
短信内容管控:统一短信签名和模板,禁止自定义内容,防止利用接口发送违规信息;
监控告警:设置短信发送量阈值,当某时段发送量突增时,触发告警并自动限流;
接口日志审计:记录所有接口请求(IP、手机号、时间、状态),便于事后追溯盗刷行为。
总结
短信接口防盗刷的核心是「多层防护」:请求层(鉴权、限流)+ 校验层(验证码有效期、唯一性)+ 风控层(行为分析);
技术实现上,Redis 是高频限流、验证码存储的核心工具,接口签名可有效防止参数篡改;
基础防护(限流、签名、有效期)可解决 80% 的盗刷问题,进阶防护(风控、人机验证)可覆盖剩余 20% 的复杂场景。
通过上述方案,既能有效拦截恶意盗刷行为,又能保证合法用户的使用体验,是企业短信接口安全的核心保障。

在线客服系统