Event-driven architectures enable scalable, loosely-coupled systems. After building event-driven platforms processing millions of events daily, I’ve learned patterns that scale and anti-patterns to avoid.

Message Queue Patterns

Use queues for async communication:

type MessageQueue struct {
    client *sqs.SQS
}

func (mq *MessageQueue) Publish(event *Event) error {
    body, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    _, err = mq.client.SendMessage(&sqs.SendMessageInput{
        QueueUrl:    aws.String(queueURL),
        MessageBody: aws.String(string(body)),
        MessageAttributes: map[string]*sqs.MessageAttributeValue{
            "EventType": {
                DataType:    aws.String("String"),
                StringValue: aws.String(event.Type),
            },
        },
    })
    
    return err
}

Event Sourcing

Store events as the source of truth:

type EventStore struct {
    db *sql.DB
}

func (es *EventStore) Append(streamID string, event *Event) error {
    _, err := es.db.Exec(`
        INSERT INTO events (stream_id, event_type, data, version)
        VALUES ($1, $2, $3, (
            SELECT COALESCE(MAX(version), 0) + 1
            FROM events WHERE stream_id = $1
        ))
    `, streamID, event.Type, event.Data)
    
    return err
}

CQRS Pattern

Separate read and write models:

// Write model - handles commands
type CommandHandler struct {
    eventStore *EventStore
}

func (ch *CommandHandler) CreateOrder(cmd *CreateOrderCommand) error {
    event := &OrderCreatedEvent{
        OrderID:  generateID(),
        UserID:   cmd.UserID,
        Items:    cmd.Items,
    }
    
    return ch.eventStore.Append(event.OrderID, event)
}

// Read model - optimized for queries
type QueryHandler struct {
    db *sql.DB
}

func (qh *QueryHandler) GetOrder(id string) (*Order, error) {
    var order Order
    err := qh.db.QueryRow(`
        SELECT id, user_id, status, total
        FROM orders WHERE id = $1
    `, id).Scan(&order.ID, &order.UserID, &order.Status, &order.Total)
    
    return &order, err
}

At-Least-Once Delivery

Handle idempotency:

func (h *Handler) ProcessEvent(event *Event) error {
    // Check if already processed
    if h.isDuplicate(event.ID) {
        return nil  // Idempotent - skip
    }
    
    // Process event
    if err := h.doWork(event); err != nil {
        return err  // Will retry
    }
    
    // Mark as processed
    return h.markProcessed(event.ID)
}

Dead Letter Queues

Handle failed events systematically:

type DLQHandler struct {
    mainQueue string
    dlq       string
    maxRetries int
}

func (h *DLQHandler) ProcessEvent(event *Event) error {
    retryCount := event.RetryCount

    err := h.handleEvent(event)
    if err != nil {
        if retryCount >= h.maxRetries {
            // Move to DLQ
            return h.sendToDLQ(event, err)
        }

        // Retry with backoff
        event.RetryCount++
        backoff := time.Duration(math.Pow(2, float64(retryCount))) * time.Second
        time.Sleep(backoff)

        return h.requeue(event)
    }

    return nil
}

func (h *DLQHandler) sendToDLQ(event *Event, err error) error {
    dlqEvent := &DLQEvent{
        OriginalEvent: event,
        FailureReason: err.Error(),
        FailedAt:      time.Now(),
        RetryCount:    event.RetryCount,
    }

    return h.publishToDLQ(dlqEvent)
}

Event Schema Evolution

Version events for backward compatibility:

type EventEnvelope struct {
    Version    int             `json:"version"`
    EventType  string          `json:"event_type"`
    EventID    string          `json:"event_id"`
    Timestamp  time.Time       `json:"timestamp"`
    Payload    json.RawMessage `json:"payload"`
}

type EventHandler struct {
    handlers map[int]EventProcessor
}

func (eh *EventHandler) Process(envelope *EventEnvelope) error {
    handler, exists := eh.handlers[envelope.Version]
    if !exists {
        return fmt.Errorf("unsupported event version: %d", envelope.Version)
    }

    return handler.Process(envelope.Payload)
}

// V1 handler
type OrderCreatedV1 struct {
    OrderID  string  `json:"order_id"`
    UserID   string  `json:"user_id"`
    Total    float64 `json:"total"`
}

// V2 handler (added currency field)
type OrderCreatedV2 struct {
    OrderID  string  `json:"order_id"`
    UserID   string  `json:"user_id"`
    Total    float64 `json:"total"`
    Currency string  `json:"currency"`  // New field
}

func (eh *EventHandler) init() {
    eh.handlers = map[int]EventProcessor{
        1: &OrderCreatedV1Processor{},
        2: &OrderCreatedV2Processor{},
    }
}

Saga Pattern

Coordinate distributed transactions:

type OrderSaga struct {
    sagaID string
    state  *SagaState
    store  SagaStore
}

type SagaState struct {
    OrderID        string
    PaymentID      string
    InventoryReserved bool
    PaymentProcessed  bool
    Status         string // Pending, Completed, Failed, Compensating
}

func (s *OrderSaga) Execute() error {
    // Step 1: Reserve inventory
    if err := s.reserveInventory(); err != nil {
        return s.compensate(err)
    }
    s.state.InventoryReserved = true
    s.store.Save(s.state)

    // Step 2: Process payment
    if err := s.processPayment(); err != nil {
        return s.compensate(err)
    }
    s.state.PaymentProcessed = true
    s.store.Save(s.state)

    // Step 3: Confirm order
    if err := s.confirmOrder(); err != nil {
        return s.compensate(err)
    }

    s.state.Status = "Completed"
    s.store.Save(s.state)
    return nil
}

func (s *OrderSaga) compensate(err error) error {
    s.state.Status = "Compensating"
    s.store.Save(s.state)

    // Compensate in reverse order
    if s.state.PaymentProcessed {
        s.refundPayment()
    }

    if s.state.InventoryReserved {
        s.releaseInventory()
    }

    s.state.Status = "Failed"
    s.store.Save(s.state)

    return err
}

Event Replay

Rebuild state from events:

type EventReplayer struct {
    eventStore EventStore
    projections map[string]Projection
}

func (er *EventReplayer) Rebuild(streamID string, projection Projection) error {
    // Fetch all events for stream
    events, err := er.eventStore.GetEvents(streamID, 0, -1)
    if err != nil {
        return err
    }

    // Reset projection state
    projection.Reset()

    // Replay events in order
    for _, event := range events {
        if err := projection.Apply(event); err != nil {
            return fmt.Errorf("failed to apply event %s: %w", event.ID, err)
        }
    }

    return nil
}

// Example projection
type OrderProjection struct {
    OrderID    string
    Status     string
    Items      []OrderItem
    TotalAmount int64
}

func (op *OrderProjection) Apply(event *Event) error {
    switch event.Type {
    case "OrderCreated":
        data := event.Data.(*OrderCreatedEvent)
        op.OrderID = data.OrderID
        op.Status = "Created"
        op.Items = data.Items

    case "PaymentProcessed":
        data := event.Data.(*PaymentProcessedEvent)
        op.TotalAmount = data.Amount
        op.Status = "Paid"

    case "OrderShipped":
        op.Status = "Shipped"

    case "OrderCancelled":
        op.Status = "Cancelled"
    }

    return nil
}

Stream Processing

Process events in real-time:

type StreamProcessor struct {
    consumer StreamConsumer
    processors []EventProcessor
}

func (sp *StreamProcessor) Start(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()

        default:
            event, err := sp.consumer.Poll(100 * time.Millisecond)
            if err != nil {
                continue
            }

            // Process event through pipeline
            for _, processor := range sp.processors {
                if err := processor.Process(event); err != nil {
                    // Handle error, potentially send to DLQ
                    sp.handleError(event, err)
                    break
                }
            }

            // Commit offset
            sp.consumer.Commit(event.Offset)
        }
    }
}

// Example processors
type FilterProcessor struct {
    predicate func(*Event) bool
}

func (fp *FilterProcessor) Process(event *Event) error {
    if !fp.predicate(event) {
        return ErrFiltered
    }
    return nil
}

type TransformProcessor struct {
    transformer func(*Event) (*Event, error)
}

func (tp *TransformProcessor) Process(event *Event) error {
    transformed, err := tp.transformer(event)
    if err != nil {
        return err
    }
    *event = *transformed
    return nil
}

type AggregateProcessor struct {
    window time.Duration
    buffer []*Event
    flush  func([]*Event) error
}

func (ap *AggregateProcessor) Process(event *Event) error {
    ap.buffer = append(ap.buffer, event)

    if len(ap.buffer) >= 100 || time.Since(ap.buffer[0].Timestamp) > ap.window {
        if err := ap.flush(ap.buffer); err != nil {
            return err
        }
        ap.buffer = ap.buffer[:0]
    }

    return nil
}

Monitoring Event Flows

Track event processing metrics:

type EventMetrics struct {
    published    prometheus.Counter
    processed    prometheus.Counter
    failed       prometheus.Counter
    duration     prometheus.Histogram
    queueDepth   prometheus.Gauge
}

func NewEventMetrics() *EventMetrics {
    return &EventMetrics{
        published: prometheus.NewCounter(prometheus.CounterOpts{
            Name: "events_published_total",
            Help: "Total number of events published",
        }),
        processed: prometheus.NewCounterVec(prometheus.CounterOpts{
            Name: "events_processed_total",
            Help: "Total number of events processed",
        }, []string{"event_type", "status"}),
        failed: prometheus.NewCounterVec(prometheus.CounterOpts{
            Name: "events_failed_total",
            Help: "Total number of failed events",
        }, []string{"event_type", "error_type"}),
        duration: prometheus.NewHistogram(prometheus.HistogramOpts{
            Name:    "event_processing_duration_seconds",
            Help:    "Event processing duration",
            Buckets: prometheus.ExponentialBuckets(0.001, 2, 10),
        }),
        queueDepth: prometheus.NewGaugeVec(prometheus.GaugeOpts{
            Name: "event_queue_depth",
            Help: "Number of events waiting in queue",
        }, []string{"queue_name"}),
    }
}

func (em *EventMetrics) RecordPublished() {
    em.published.Inc()
}

func (em *EventMetrics) RecordProcessed(eventType, status string, duration time.Duration) {
    em.processed.WithLabelValues(eventType, status).Inc()
    em.duration.Observe(duration.Seconds())
}

func (em *EventMetrics) RecordFailed(eventType, errorType string) {
    em.failed.WithLabelValues(eventType, errorType).Inc()
}

Query event metrics:

# Event processing rate
rate(events_processed_total[5m])

# Error rate by event type
rate(events_failed_total[5m]) / rate(events_published_total[5m])

# Processing latency p99
histogram_quantile(0.99, rate(event_processing_duration_seconds_bucket[5m]))

# Queue backlog
event_queue_depth

Event-Driven Security

Secure event flows:

type SecureEventPublisher struct {
    signer    EventSigner
    encryptor EventEncryptor
}

func (sep *SecureEventPublisher) Publish(event *Event) error {
    // Encrypt sensitive data
    if event.ContainsSensitiveData() {
        encrypted, err := sep.encryptor.Encrypt(event.Payload)
        if err != nil {
            return err
        }
        event.Payload = encrypted
        event.Encrypted = true
    }

    // Sign event for integrity
    signature, err := sep.signer.Sign(event)
    if err != nil {
        return err
    }
    event.Signature = signature

    return sep.publisher.Publish(event)
}

type SecureEventConsumer struct {
    verifier  EventVerifier
    decryptor EventDecryptor
}

func (sec *SecureEventConsumer) Consume(event *Event) error {
    // Verify signature
    if err := sec.verifier.Verify(event); err != nil {
        return fmt.Errorf("invalid event signature: %w", err)
    }

    // Decrypt if needed
    if event.Encrypted {
        decrypted, err := sec.decryptor.Decrypt(event.Payload)
        if err != nil {
            return err
        }
        event.Payload = decrypted
        event.Encrypted = false
    }

    return sec.processEvent(event)
}

Testing Event-Driven Systems

Test event flows comprehensively:

func TestEventSourcing(t *testing.T) {
    // Setup in-memory event store
    store := NewInMemoryEventStore()
    handler := NewOrderCommandHandler(store)

    // Execute commands
    orderID := "order-123"
    handler.CreateOrder(&CreateOrderCommand{
        OrderID: orderID,
        UserID:  "user-456",
        Items:   []OrderItem{{ProductID: "prod-1", Quantity: 2}},
    })

    handler.ProcessPayment(&ProcessPaymentCommand{
        OrderID: orderID,
        Amount:  5000,
    })

    // Verify events
    events, _ := store.GetEvents(orderID, 0, -1)
    assert.Len(t, events, 2)
    assert.Equal(t, "OrderCreated", events[0].Type)
    assert.Equal(t, "PaymentProcessed", events[1].Type)

    // Rebuild state from events
    projection := &OrderProjection{}
    for _, event := range events {
        projection.Apply(event)
    }

    assert.Equal(t, "Paid", projection.Status)
    assert.Equal(t, int64(5000), projection.TotalAmount)
}

func TestSagaCompensation(t *testing.T) {
    saga := NewOrderSaga("saga-123")

    // Simulate payment failure
    saga.inventoryService = &MockInventoryService{
        reserveErr: nil,
    }
    saga.paymentService = &MockPaymentService{
        processErr: errors.New("payment declined"),
    }

    err := saga.Execute()
    assert.Error(t, err)

    // Verify compensation occurred
    assert.False(t, saga.state.InventoryReserved) // Should be released
    assert.Equal(t, "Failed", saga.state.Status)
}

Conclusion

Event-driven architectures enable scalable, decoupled systems through:

  1. Message queues for asynchronous communication
  2. Event sourcing for auditability and replay
  3. CQRS for read/write optimization
  4. Idempotency to handle at-least-once delivery
  5. Saga pattern for distributed transactions
  6. Dead letter queues for failed event handling
  7. Schema evolution for backward compatibility
  8. Stream processing for real-time analytics
  9. Comprehensive monitoring of event flows
  10. Security through encryption and signing

Design event-driven systems for at-least-once delivery, handle duplicates gracefully through idempotency, version your events for evolution, implement sagas for distributed transactions, and monitor event flows comprehensively.

The power of event-driven architectures lies in their ability to scale independently, evolve gracefully, and provide complete audit trails. However, they introduce complexity in debugging, data consistency, and operational management. Invest in observability, testing, and documentation to make event-driven systems maintainable and reliable.