软件架构模式

软件架构模式是经过验证的解决方案模板,用于解决常见的系统设计问题。

分层架构(Layered Architecture)

最传统的架构模式,将系统分为多个层级:

┌─────────────────────────┐
│     Presentation        │ ← 用户界面、API网关
├─────────────────────────┤
│      Application        │ ← 用例编排、事务管理
├─────────────────────────┤
│        Domain           │ ← 业务逻辑、领域模型
├─────────────────────────┤
│      Infrastructure     │ ← 数据库、外部服务
└─────────────────────────┘

Python实现示例

# Domain层(核心业务逻辑)
class Order:
    def __init__(self, order_id, customer_id):
        self.order_id = order_id
        self.customer_id = customer_id
        self.items = []
        self.status = "pending"
    
    def add_item(self, product_id, quantity, price):
        self.items.append({
            "product_id": product_id,
            "quantity": quantity,
            "price": price
        })
    
    def calculate_total(self):
        return sum(item["quantity"] * item["price"] for item in self.items)
    
    def confirm(self):
        if not self.items:
            raise ValueError("Cannot confirm empty order")
        self.status = "confirmed"
 
# Application层(用例编排)
class OrderService:
    def __init__(self, order_repository, payment_service, notification_service):
        self.order_repository = order_repository
        self.payment_service = payment_service
        self.notification_service = notification_service
    
    def create_order(self, customer_id, items):
        order = Order(order_id=None, customer_id=customer_id)
        
        for item in items:
            order.add_item(item["product_id"], item["quantity"], item["price"])
        
        saved_order = self.order_repository.save(order)
        
        # 触发后续流程
        self._send_order_created_event(saved_order)
        
        return saved_order
    
    def confirm_order(self, order_id):
        order = self.order_repository.find_by_id(order_id)
        order.confirm()
        self.order_repository.save(order)
        
        self.payment_service.request_payment(order)
        
        return order
 
# Infrastructure层
class SQLAlchemyOrderRepository(OrderRepository):
    def __init__(self, session):
        self.session = session
    
    def save(self, order):
        db_order = OrderModel(order)
        self.session.add(db_order)
        self.session.commit()
        return db_order.to_domain()

六边形架构(Hexagonal Architecture)

核心业务逻辑与外部依赖完全解耦:

                    ┌────────────────────────┐
                    │      Application      │
                    │       Core            │
                    │   ┌──────────────┐   │
                    │   │   Domain     │   │
                    │   │   Model      │   │
                    │   └──────────────┘   │
                    │          ▲          │
                    │          │          │
                    │   ┌──────────────┐   │
                    │   │   Ports      │   │ ← 接口定义
                    │   │  (输入/输出)  │   │
                    │   └──────────────┘   │
                    └──────────┬───────────┘
                               │
         ┌─────────────────────┼─────────────────────┐
         │                     │                     │
┌────────┴────────┐   ┌────────┴────────┐   ┌────────┴────────┐
│   Adapters     │   │   Adapters      │   │   Adapters     │
│  (REST API)    │   │  (Database)     │   │  (Message Q)   │
└────────────────┘   └─────────────────┘   └────────────────┘
# Ports(接口定义)
class OrderRepository(Protocol):
    """输出端口:订单仓储接口"""
    def save(self, order: Order) -> Order: ...
    def find_by_id(self, order_id: str) -> Order: ...
 
class PaymentGateway(Protocol):
    """输出端口:支付网关接口"""
    def charge(self, amount: float, payment_method: str) -> PaymentResult: ...
 
class NotificationPort(Protocol):
    """输出端口:通知接口"""
    def send(self, recipient: str, message: str) -> None: ...
 
# Application Service
class OrderApplicationService:
    def __init__(
        self,
        order_repository: OrderRepository,
        payment_gateway: PaymentGateway,
        notification: NotificationPort
    ):
        self.order_repository = order_repository
        self.payment_gateway = payment_gateway
        self.notification = notification
    
    def place_order(self, customer_id: str, items: List[OrderItem]) -> Order:
        order = Order.create(customer_id, items)
        saved_order = self.order_repository.save(order)
        
        # 通过端口与外部交互,完全解耦
        result = self.payment_gateway.charge(
            amount=saved_order.total,
            payment_method="credit_card"
        )
        
        if result.success:
            saved_order.mark_paid()
            self.order_repository.save(saved_order)
            self.notification.send(
                saved_order.customer_id,
                f"Order {saved_order.id} confirmed!"
            )
        
        return saved_order
 
# Adapters(适配器实现)
class DjangoORMOrderRepository:
    """数据库适配器"""
    def __init__(self, model_class):
        self.model_class = model_class
    
    def save(self, order: Order) -> Order:
        obj = self.model_class(
            id=order.id,
            customer_id=order.customer_id,
            status=order.status
        )
        obj.save()
        return Order.from_dict(obj.to_dict())
 
class StripePaymentAdapter:
    """支付网关适配器"""
    def __init__(self, api_key: str):
        self.stripe = Stripe(api_key)
    
    def charge(self, amount: float, payment_method: str) -> PaymentResult:
        try:
            charge = self.stripe.Charge.create(
                amount=int(amount * 100),
                currency="usd",
                source=payment_method
            )
            return PaymentResult(success=True, transaction_id=charge.id)
        except Exception as e:
            return PaymentResult(success=False, error=str(e))

事件驱动架构(Event-Driven Architecture)

┌──────────┐    Event    ┌──────────┐    Event    ┌──────────┐
│ Producer │ ──────────► │  Event   │ ──────────► │ Consumer │
│   App    │             │   Bus    │             │   App    │
└──────────┘             └──────────┘             └──────────┘
     │                                               │
     │                                               │
     ▼                                               ▼
┌──────────┐             ┌──────────┐            ┌──────────┐
│  Event   │             │  Event   │            │  Event   │
│  Store   │             │  Broker  │            │ Handler  │
└──────────┘             └──────────┘            └──────────┘

事件溯源(Event Sourcing)

from dataclasses import dataclass
from typing import List
import uuid
from datetime import datetime
 
@dataclass
class Event:
    event_id: str
    event_type: str
    timestamp: datetime
    payload: dict
 
class EventStore:
    def __init__(self):
        self.events: List[Event] = []
    
    def append(self, event_type: str, payload: dict):
        event = Event(
            event_id=str(uuid.uuid4()),
            event_type=event_type,
            timestamp=datetime.now(),
            payload=payload
        )
        self.events.append(event)
        return event
    
    def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]:
        return [e for e in self.events if e.payload.get("id") == aggregate_id]
    
    def replay_events(self, aggregate_id: str, reducer):
        events = self.get_events_for_aggregate(aggregate_id)
        return reduce(reducer, events, None)
 
# 聚合根
class Order:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.status = "created"
        self.items = []
    
    @classmethod
    def from_events(cls, events: List[Event]):
        order = cls(events[0].payload["id"])
        for event in events:
            order._apply(event)
        return order
    
    def _apply(self, event: Event):
        if event.event_type == "OrderItemAdded":
            self.items.append(event.payload["item"])
        elif event.event_type == "OrderConfirmed":
            self.status = "confirmed"
    
    def add_item(self, item: dict, event_store: EventStore):
        event_store.append("OrderItemAdded", {
            "id": self.order_id,
            "item": item
        })
        self.items.append(item)
    
    def confirm(self, event_store: EventStore):
        event_store.append("OrderConfirmed", {"id": self.order_id})
        self.status = "confirmed"

CQRS(命令查询职责分离)

# 命令端(写入)
class OrderCommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def handle_create_order(self, command: CreateOrderCommand):
        order_id = str(uuid.uuid4())
        
        # 创建订单事件
        self.event_store.append("OrderCreated", {
            "id": order_id,
            "customer_id": command.customer_id,
            "items": command.items,
            "timestamp": datetime.now().isoformat()
        })
        
        return order_id
 
# 查询端(读取)
class OrderQueryHandler:
    def __init__(self, read_db: ReadDatabase):
        self.read_db = read_db
    
    def get_order_summary(self, order_id: str) -> OrderSummaryDTO:
        # 从物化视图读取
        return self.read_db.queries.get_order_summary(order_id)
    
    def get_customer_orders(self, customer_id: str) -> List[OrderSummaryDTO]:
        return self.read_db.queries.get_orders_by_customer(customer_id)
 
# 物化视图更新
class ReadModelProjector:
    def __init__(self, read_db: WriteDatabase):
        self.read_db = read_db
    
    def project(self, event: Event):
        if event.event_type == "OrderCreated":
            self.read_db.execute("""
                INSERT INTO order_summaries (id, customer_id, status, total)
                VALUES (?, ?, ?, ?)
            """, [
                event.payload["id"],
                event.payload["customer_id"],
                "created",
                sum(item["price"] * item["quantity"] for item in event.payload["items"])
            ])

微服务架构模式

服务发现

# 服务注册中心
class ServiceRegistry:
    def __init__(self):
        self.services = {}
    
    def register(self, service_name: str, instance_id: str, host: str, port: int):
        key = f"{service_name}/{instance_id}"
        self.services[key] = {
            "service_name": service_name,
            "instance_id": instance_id,
            "host": host,
            "port": port,
            "status": "UP",
            "last_heartbeat": time.time()
        }
    
    def deregister(self, service_name: str, instance_id: str):
        key = f"{service_name}/{instance_id}"
        if key in self.services:
            del self.services[key]
    
    def find_instances(self, service_name: str) -> List[Dict]:
        return [
            inst for key, inst in self.services.items()
            if inst["service_name"] == service_name and inst["status"] == "UP"
        ]
 
# 客户端负载均衡
class ClientSideLoadBalancer:
    def __init__(self, registry: ServiceRegistry):
        self.registry = registry
        self.strategy = RoundRobinStrategy()
    
    def call_service(self, service_name: str, path: str):
        instances = self.registry.find_instances(service_name)
        
        if not instances:
            raise ServiceUnavailable(f"No instances of {service_name}")
        
        selected = self.strategy.select(instances)
        
        url = f"http://{selected['host']}:{selected['port']}{path}"
        return requests.get(url)

熔断器(Circuit Breaker)

import time
from enum import Enum
 
class CircuitState(Enum):
    CLOSED = "closed"      # 正常
    OPEN = "open"          # 熔断
    HALF_OPEN = "half_open"  # 半开
 
class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60, success_threshold=2):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.success_threshold = success_threshold
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenException("Circuit is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

参考