1. Serverless 简介

定义与核心概念

Serverless(无服务器架构)是一种云计算执行模型,开发者无需管理服务器即可运行代码。在 Serverless 模式下,云服务商负责服务器的搭建、维护和扩展工作,开发者只需关注业务逻辑的实现。

核心特征

  • 无需预配置或管理基础设施
  • 按实际执行时间计费(按需付费)
  • 自动扩缩容,零运维成本
  • 事件驱动触发执行

FaaS(函数即服务)

FaaS(Function as a Service)是 Serverless 的核心实现形式,允许开发者将单个函数部署到云端,通过事件触发执行。

主流 FaaS 平台

平台代表产品
AWSLambda
Google CloudCloud Functions
AzureAzure Functions
阿里云函数计算 FC

FaaS 特点

  • 函数粒度的资源分配
  • 毫秒级计费精度
  • 支持多种编程语言
  • 内置高可用和容错机制

BaaS(后端即服务)

BaaS(Backend as a Service)提供预构建的后端服务,开发者通过 API 调用即可使用数据库、认证、存储等功能。

常见 BaaS 服务

  • 数据库:Firebase Firestore、AWS DynamoDB、阿里云 TableStore
  • 身份认证:Auth0、AWS Cognito、阿里云 RAM
  • 文件存储:AWS S3、阿里云 OSS、Cloudflare R2
  • 消息队列:AWS SQS、Google Pub/Sub、阿里云 MNS

与传统架构对比

维度传统架构Serverless 架构
服务器管理需自行维护云服务商负责
扩缩容手动配置自动弹性伸缩
计费方式包月/包年按调用次数/执行时间
冷启动存在(首次调用较慢)
适用场景长期运行服务间歇性、事件驱动任务
运维成本
性能上限可预估平台限制

2. 主流平台

AWS Lambda

AWS Lambda 是亚马逊云服务的函数计算服务,支持多种运行时环境。

核心配置

# Python 示例:Lambda 处理器
import json
 
def lambda_handler(event, context):
    # 获取请求参数
    name = event.get('name', 'World')
    
    # 返回响应
    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': f'Hello, {name}!'
        })
    }

配置参数

参数默认值可调范围
内存128 MB128 ~ 10240 MB
超时时间3 秒1 秒 ~ 15 分钟
并发限制1000可申请提升

Google Cloud Functions

Google Cloud Functions 是谷歌云的 FaaS 服务,支持第一代和第二代(Cloud Functions v2)两个版本。

HTTP 触发器示例

// Node.js 示例
exports.helloHttp = (req, res) => {
  const name = req.query.name || req.body.name || 'World';
  res.send(`Hello, ${name}!`);
};

第二代特性

  • 基于 Cloud Run,支持更长时间运行
  • 支持直接调用,无需通过 HTTP
  • 可配置最小实例数,避免冷启动

Azure Functions

Azure Functions 是微软 Azure 的函数计算服务,提供多种编程模型和触发器。

依赖注入示例

public class MyFunction
{
    private readonly IMyService _myService;
    
    // 通过构造函数注入依赖
    public MyFunction(IMyService myService)
    {
        _myService = myService;
    }
    
    [FunctionName("MyFunction")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Function, "get", "post")] HttpRequest req)
    {
        var result = await _myService.ProcessAsync();
        return new OkObjectResult(result);
    }
}

阿里云函数计算

阿里云函数计算(FC)是国内领先的 FaaS 平台,提供更符合国内开发者的生态支持。

快速部署示例

# serverless.yaml
service: my-app
provider:
  name: aliyun
  runtime: python3.9
functions:
  hello:
    handler: handler.hello
    events:
      - http:
          path: /hello
          method: get

Cloudflare Workers(边缘计算)

Cloudflare Workers 是基于 V8 引擎的边缘函数服务,将计算能力部署到全球 300+ 边缘节点。

优势

  • 超低延迟(全球平均 < 50ms)
  • 强隔离的 V8 沙箱环境
  • 免费的每日 100,000 次调用
  • 支持 WebAssembly 扩展

示例

export default {
  async fetch(request, env) {
    return new Response('Hello from the edge!');
  }
};

3. 核心概念

函数触发器

触发器是启动函数执行的事件源。常见的触发器类型:

触发器类型说明典型场景
HTTP 触发器通过 HTTP 请求调用API、Webhook
定时触发器Cron 表达式定时执行定时任务、数据清理
对象存储触发器文件上传/删除时触发图片处理、文件转换
消息队列触发器队列消息到来时触发异步处理、事件驱动
数据库触发器数据变更时触发实时同步、审计日志
API 网关触发器RESTful API 调用微服务、后端接口

事件驱动架构

事件驱动是 Serverless 的核心编程范式,函数响应特定事件而执行。

事件流架构

事件源 → 事件通道 → 函数处理 → 结果输出
   ↓
[OSS上传] → [MQTT] → [Lambda] → [DynamoDB写入]
   ↓
[用户上传图片]              [生成缩略图存储]

设计原则

  • 函数间通过事件通信,降低耦合
  • 使用消息队列确保事件可靠传递
  • 实现幂等性应对重复事件处理

冷启动优化

冷启动是指函数从零开始初始化实例的过程,会带来额外延迟。

影响冷启动时间的因素

因素影响程度优化方法
运行时环境选择轻量级运行时
代码包大小减少依赖,精简代码
初始化逻辑延迟加载,复用连接
内存配置适当增加内存

优化策略

  1. 预置并发:提前启动实例,保持 warm 状态
  2. 轻量化依赖:使用较小的运行时镜像
  3. 连接复用:在函数外初始化数据库/网络连接
  4. 代码分层:将初始化逻辑与处理逻辑分离
# 示例:连接复用
import os
 
# 全局变量,实例级别复用
db_connection = None
 
def init_db():
    global db_connection
    if db_connection is None:
        db_connection = create_db_connection(os.environ['DB_URL'])
    return db_connection
 
def handler(event, context):
    db = init_db()  # 复用已有连接
    result = db.query(event['sql'])
    return result

超时与内存配置

超时和内存是函数的核心配置参数,直接影响函数执行能力和费用。

配置建议

场景推荐超时推荐内存
快速 API 响应3~10 秒128~256 MB
文件处理30~300 秒512~1024 MB
数据批量处理5~15 分钟1024~2048 MB
AI 推理30~300 秒1536~3008 MB

计费公式


4. 开发实战

函数编写规范

标准函数签名

# Python
def handler(event, context):
    # event: 触发事件的数据
    # context: 运行环境信息
    return {'statusCode': 200, 'body': 'OK'}
// Node.js
exports.handler = async (event, context) => {
  return { statusCode: 200, body: 'OK' };
};

输入输出规范

  • 输入事件(event):包含触发源提供的所有数据
  • 上下文(context):包含函数运行时的环境信息
  • 返回值:可返回简单对象或响应数据

本地调试

使用 SAM Local(AWS)

# 启动本地 API
sam local start-api
 
# 调用本地函数
sam local invoke HelloWorldFunction -e event.json

使用 Functions Framework(Google Cloud)

# 安装框架
npm install @google-cloud/functions-framework
 
# 本地运行
functions-framework --target=helloHttp --port=8080

使用 Azure Functions Core Tools

# 启动本地运行时
func start
 
# 触发函数测试
func invoke hello --get

部署与版本管理

AWS Lambda 部署

# 打包函数代码
zip -r function.zip index.js node_modules/
 
# 更新函数
aws lambda update-function-code \
    --function-name my-function \
    --zip-file fileb://function.zip
 
# 发布版本
aws lambda publish-version \
    --function-name my-function

使用 Serverless Framework 部署

# serverless.yml
service: my-service
provider:
  name: aws
  runtime: nodejs18.x
  stage: prod
  region: us-east-1
 
functions:
  api:
    handler: handler.api
    events:
      - http:
          path: /api/{id}
          method: get
    layers:
      - arn:aws:lambda:us-east-1:123456789:layer:common-lib:1
# 部署
serverless deploy
 
# 部署特定函数(快速迭代)
serverless deploy function -f api

版本管理策略

版本策略说明适用场景
基于时间的版本定期发布新版本常规迭代
语义化版本主版本.次版本.修订号需要严格兼容性管理
别名路由使用别名指向特定版本A/B 测试、灰度发布
蓝绿部署双版本切换需要快速回滚

5. 应用场景

API 网关

Serverless 适合构建轻量级 API 服务,按需扩展,无需预置服务器。

架构示例

客户端 → API Gateway → Lambda 函数 → DynamoDB
                          ↓
                    业务逻辑处理

RESTful API 实现

def handler(event, context):
    method = event['httpMethod']
    path = event['path']
    params = event.get('queryStringParameters', {})
    
    if method == 'GET' and path == '/users':
        return get_users(params)
    elif method == 'POST' and path == '/users':
        return create_user(json.loads(event['body']))
    elif method == 'DELETE' and path.startswith('/users/'):
        user_id = path.split('/')[-1]
        return delete_user(user_id)
    
    return {'statusCode': 404, 'body': 'Not Found'}

文件处理

文件上传触发函数自动处理,如图片压缩、格式转换、视频转码。

图片处理流程

用户上传图片 → OSS 触发器 → Lambda 触发处理
                              ↓
                        1. 下载原图
                        2. 压缩/裁剪/水印
                        3. 上传处理结果
                        4. 更新数据库

示例:图片缩略图生成

from PIL import Image
import boto3
 
s3 = boto3.client('s3')
 
def handler(event, context):
    # 获取触发事件的信息
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    
    # 下载原图
    download_path = '/tmp/original.jpg'
    s3.download_file(bucket, key, download_path)
    
    # 生成缩略图
    with Image.open(download_path) as img:
        img.thumbnail((200, 200))
        img.save('/tmp/thumbnail.jpg')
    
    # 上传到目标桶
    target_key = f'thumbnails/{key.split("/")[-1]}'
    s3.upload_file('/tmp/thumbnail.jpg', 'my-bucket', target_key)
    
    return {'statusCode': 200, 'thumbnail': target_key}

定时任务

替代传统 Cron job,无需维护定时服务,按执行时间付费。

定时触发器配置

functions:
  daily-report:
    handler: tasks.generate_report
    events:
      - schedule:
          rate: cron(0 2 * * *)  # 每天凌晨2点
          timezone: Asia/Shanghai

示例:数据清理任务

import boto3
from datetime import datetime, timedelta
 
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('UserSessions')
 
def handler(event, context):
    # 清理7天前的会话数据
    cutoff_date = (datetime.now() - timedelta(days=7)).isoformat()
    
    response = table.scan(
        FilterExpression='lastActivity < :cutoff',
        ExpressionAttributeValues={':cutoff': cutoff_date}
    )
    
    deleted_count = 0
    with table.batch_writer() as batch:
        for item in response['Items']:
            batch.delete_item(Key={'sessionId': item['sessionId']})
            deleted_count += 1
    
    return {'deleted': deleted_count, 'cutoff': cutoff_date}

实时数据处理

处理来自 IoT 设备、用户行为追踪、金融交易等实时数据流。

流处理架构

数据源 → Kinesis/DataFirehose → Lambda → S3/DynamoDB/ES
         (数据流)                    (逐条处理)

示例:日志实时分析

import json
import base64
from datetime import datetime
 
def handler(event, context):
    results = []
    
    for record in event['records']:
        # 解码 Kinesis 数据
        payload = base64.b64decode(record['data']).decode('utf-8')
        log_entry = json.loads(payload)
        
        # 提取关键指标
        analysis = {
            'timestamp': log_entry['time'],
            'user_id': log_entry.get('userId'),
            'action': log_entry['action'],
            'duration_ms': log_entry.get('duration', 0)
        }
        
        # 检测异常行为
        if analysis['duration_ms'] > 5000:
            analysis['alert'] = 'high_latency'
        
        results.append({
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': json.dumps(analysis)
        })
    
    return {'records': results}

6. 最佳实践

幂等性设计

幂等性确保同一请求多次执行结果一致,是分布式系统的关键设计原则。

为什么需要幂等性

  • 云平台可能重试失败请求
  • 客户端超时后可能重复发送
  • 消息队列可能重复投递

实现方法

方法说明适用场景
唯一事务 ID使用 UUID 标记每次操作任何场景
数据库唯一索引约束防止重复插入写入操作
状态机校验检查当前状态再执行状态流转
去重表记录已处理的请求 ID需要精确去重

示例:基于唯一 ID 的幂等处理

import boto3
import uuid
 
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('PaymentTransactions')
 
def handler(event, context):
    transaction_id = event.get('transactionId') or str(uuid.uuid4())
    amount = event['amount']
    user_id = event['userId']
    
    try:
        # 尝试写入,依赖唯一索引防止重复
        table.put_item(
            Item={
                'transactionId': transaction_id,
                'userId': user_id,
                'amount': amount,
                'status': 'COMPLETED'
            },
            ConditionExpression='attribute_not_exists(transactionId)'
        )
        return {'success': True, 'transactionId': transaction_id}
    
    except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
        # 已存在,说明是重复请求
        return {'success': True, 'duplicate': True, 'transactionId': transaction_id}

错误处理

重试策略配置

functions:
  process-order:
    handler: handler.process
    retry:
      maxAttempts: 3
      backoff:
        base: exponential
        initialDelayMs: 1000
        maxDelayMs: 60000

错误分类处理

def handler(event, context):
    try:
        # 业务逻辑
        validate_input(event)
        result = process_order(event)
        return {'success': True, 'data': result}
    
    except ValidationError as e:
        # 参数错误,不重试
        return {
            'statusCode': 400,
            'error': 'VALIDATION_ERROR',
            'message': str(e)
        }
    
    except ExternalServiceError as e:
        # 外部服务错误,可重试
        raise RetryableError(f'External service failed: {e}')
    
    except Exception as e:
        # 未知错误,记录并上报
        log_error(context, event, e)
        return {
            'statusCode': 500,
            'error': 'INTERNAL_ERROR'
        }

性能优化

核心优化策略

  1. 减少代码包体积

    # 使用专业工具分析
    npm install -g size-limit
    npx size-limit
  2. 优化依赖

    # Python:只安装必要依赖
    pip install -r requirements.txt --target ./package
     
    # 避免体积过大的库
    # 不用 pillow,用 Pillow-simd
    # 不用完整版 pandas,用 polars(更轻量)
  3. 利用并发能力

    # 批量处理充分利用每次调用
    def handler(event, context):
        records = event['Records']
        
        # 串行处理(低效)
        # results = [process(r) for r in records]
        
        # 并行处理(高效)
        from concurrent.futures import ThreadPoolExecutor
        with ThreadPoolExecutor(max_workers=10) as executor:
            results = list(executor.map(process, records))
        
        return {'processed': len(results)}
  4. 连接复用与资源池化

    # 全局初始化,高效复用
    import boto3
     
    # Lambda 容器复用时,此连接保持有效
    s3_client = None
     
    def get_s3_client():
        global s3_client
        if s3_client is None:
            s3_client = boto3.client('s3')
        return s3_client
  5. 合理使用 Layer

    # 将公共依赖抽取为 Layer
    functions:
      api:
        handler: api.handler
        layers:
          - arn:aws:lambda:us-east-1:123456789:layer:common-utils:3

参考资料