OAuth 2.0与身份认证

OAuth 2.0是现代授权框架,让用户无需向第三方应用透露密码即可授权访问其数据。

OAuth 2.0核心概念

角色

角色说明
Resource Owner资源所有者(用户)
Client请求访问的第三方应用
Authorization Server授权服务器
Resource Server托管受保护资源的服务器

授权许可类型

类型适用场景
Authorization Code有后端服务器的Web应用
PKCE移动应用、单页应用
Client Credentials服务间通信
Device Code设备限制输入场景
Refresh Token访问令牌续期

Authorization Code 授权流程

     ┌──────────┐                           ┌──────────┐
     │   User   │                           │    App   │
     └────┬─────┘                           └────┬─────┘
          │                                      │
          │  1. 点击"使用Google登录"               │
          │─────────────────────────────────────►│
          │                                      │
          │  2. 重定向到授权服务器                 │
          │◄─────────────────────────────────────│
          │                                      │
          │  3. 输入凭证并同意授权                │
          │─────────────────────────────────────►│
          │       (授权服务器)                   │
          │                                      │
          │  4. 重定向回App with code            │
          │◄─────────────────────────────────────│
          │                                      │
          │  5. 用code交换access_token           │
          │─────────────────────────────────────►│
          │       (授权服务器)                   │
          │                                      │
          │  6. 返回access_token                 │
          │◄─────────────────────────────────────│
          │                                      │
          │  7. 用token访问资源                  │
          │─────────────────────────────────────►│
          │       (Resource Server)             │
          │                                      │

授权请求

GET /authorize?
    response_type=code&
    client_id=YOUR_CLIENT_ID&
    redirect_uri=https%3A%2F%2Fyour-app.com%2Fcallback&
    scope=openid%20profile%20email&
    state=random_state_string&
    code_challenge=S256_code_challenge&
    code_challenge_method=S256
Host: authorization-server.com

令牌交换

import requests
import base64
import hashlib
 
def exchange_code_for_token(auth_code, code_verifier):
    """用授权码交换访问令牌"""
    
    client_id = "your-client-id"
    client_secret = "your-client-secret"  # 后端保密
    redirect_uri = "https://your-app.com/callback"
    
    # code_verifier是PKCE的一部分
    code_challenge = base64.urlsafe_b64encode(
        hashlib.sha256(code_verifier.encode()).digest()
    ).decode().rstrip('=')
    
    response = requests.post("https://auth-server.com/token", data={
        "grant_type": "authorization_code",
        "code": auth_code,
        "redirect_uri": redirect_uri,
        "client_id": client_id,
        "code_verifier": code_verifier,  # PKCE验证
    })
    
    return response.json()

PKCE(Proof Key for Code Exchange)

PKCE防止授权码拦截攻击,移动端和SPA必须使用。

流程

// 1. 生成code_verifier(随机字符串)
function generateCodeVerifier() {
    const array = new Uint8Array(32);
    crypto.getRandomValues(array);
    return base64urlEncode(array);
}
 
// 2. 生成code_challenge
async function generateCodeChallenge(verifier) {
    const encoder = new TextEncoder();
    const data = encoder.encode(verifier);
    const digest = await crypto.subtle.digest('SHA-256', data);
    return base64urlEncode(new Uint8Array(digest));
}
 
// 3. 登录流程
async function login() {
    const codeVerifier = generateCodeVerifier();
    const codeChallenge = await generateCodeChallenge(codeVerifier);
    
    // 保存到sessionStorage
    sessionStorage.setItem('codeVerifier', codeVerifier);
    
    // 构建授权URL
    const authUrl = new URL('https://auth-server.com/authorize');
    authUrl.searchParams.set('client_id', 'your-client-id');
    authUrl.searchParams.set('response_type', 'code');
    authUrl.searchParams.set('redirect_uri', 'https://your-app.com/callback');
    authUrl.searchParams.set('scope', 'openid profile');
    authUrl.searchParams.set('code_challenge', codeChallenge);
    authUrl.searchParams.set('code_challenge_method', 'S256');
    authUrl.searchParams.set('state', generateRandomState());
    
    window.location.href = authUrl.toString();
}
 
// 4. 回调处理
async function handleCallback() {
    const urlParams = new URLSearchParams(window.location.search);
    const code = urlParams.get('code');
    const state = urlParams.get('state');
    const codeVerifier = sessionStorage.getItem('codeVerifier');
    
    // 用code和code_verifier交换token
    const tokenResponse = await fetch('https://auth-server.com/token', {
        method: 'POST',
        headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
        body: new URLSearchParams({
            grant_type: 'authorization_code',
            code: code,
            redirect_uri: 'https://your-app.com/callback',
            client_id: 'your-client-id',
            code_verifier: codeVerifier
        })
    });
    
    const tokens = await tokenResponse.json();
    // tokens.access_token, tokens.id_token, tokens.refresh_token
}

OIDC(OpenID Connect)

OIDC在OAuth 2.0基础上提供身份认证,返回ID Token。

ID Token结构

ID Token是JWT格式:

{
  "iss": "https://auth-server.com",
  "sub": "user123",           // 用户唯一标识
  "aud": "your-client-id",    // 接收方
  "exp": 1699999999,          // 过期时间
  "iat": 1699996399,          // 签发时间
  "nonce": "random-nonce",
  "name": "John Doe",
  "email": "john@example.com",
  "picture": "https://..."
}

验证ID Token

import jwt
from jwt import PyJWKClient
 
def verify_id_token(id_token, client_id):
    # 获取公钥
    jwks_url = "https://auth-server.com/.well-known/jwks.json"
    jwks_client = PyJWKClient(jwks_url)
    signing_key = jwks_client.get_signing_key_from_jwt(id_token)
    
    # 验证并解码
    claims = jwt.decode(
        id_token,
        signing_key.key,
        algorithms=["RS256"],
        audience=client_id,
        options={"verify_exp": True}
    )
    
    return claims

Client Credentials(客户端凭证)

适用于服务间通信,无需用户交互:

def get_service_token():
    """获取服务到服务访问令牌"""
    
    response = requests.post("https://auth-server.com/token", data={
        "grant_type": "client_credentials",
        "client_id": "service-client-id",
        "client_secret": "service-client-secret",
        "scope": "api://backend-api/.default"
    })
    
    return response.json()["access_token"]
 
# 使用令牌调用API
def call_backend_api(token):
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get("https://api.backend.com/data", headers=headers)
    return response.json()

Refresh Token刷新

def refresh_access_token(refresh_token):
    """使用刷新令牌获取新的访问令牌"""
    
    response = requests.post("https://auth-server.com/token", data={
        "grant_type": "refresh_token",
        "refresh_token": refresh_token,
        "client_id": "your-client-id",
        # client_secret如果客户端类型需要
    })
    
    return response.json()
    # {
    #     "access_token": "new-access-token",
    #     "refresh_token": "new-refresh-token",  # 可能轮转
    #     "expires_in": 3600
    # }

Token安全最佳实践

实践说明
HTTPS传输所有Token传输必须加密
短期Access TokenAccess Token有效期15分钟-1小时
Refresh Token轮转每次刷新生成新Refresh Token
Token存储Access Token存内存,Refresh Token存安全存储
PKCE必用移动端和SPA必须启用PKCE
令牌轮换检测到旧Token使用立即失效

JWT安全

import jwt
import datetime
 
# 签发Token
def create_tokens(user_id):
    now = datetime.datetime.utcnow()
    
    access_token = jwt.encode({
        "sub": user_id,
        "iat": now,
        "exp": now + datetime.timedelta(hours=1),
        "type": "access"
    }, SECRET_KEY, algorithm="HS256")
    
    refresh_token = jwt.encode({
        "sub": user_id,
        "iat": now,
        "exp": now + datetime.timedelta(days=7),
        "type": "refresh",
        "jti": generate_jti()  # 唯一标识,用于撤销
    }, SECRET_KEY, algorithm="HS256")
    
    return access_token, refresh_token
 
# 验证Token
def verify_token(token, expected_type="access"):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        
        if payload["type"] != expected_type:
            raise ValueError("Invalid token type")
        
        # 检查是否在撤销列表
        if is_token_revoked(payload["jti"]):
            raise ValueError("Token has been revoked")
        
        return payload
    except jwt.ExpiredSignatureError:
        raise ValueError("Token expired")
    except jwt.InvalidTokenError:
        raise ValueError("Invalid token")

参考