Event-driven architectures enable scalable, loosely-coupled systems. After building event-driven platforms processing millions of events daily, I’ve learned patterns that scale and anti-patterns to avoid.
Message Queue Patterns
Use queues for async communication:
type MessageQueue struct {
client *sqs.SQS
}
func (mq *MessageQueue) Publish(event *Event) error {
body, err := json.Marshal(event)
if err != nil {
return err
}
_, err = mq.client.SendMessage(&sqs.SendMessageInput{
QueueUrl: aws.String(queueURL),
MessageBody: aws.String(string(body)),
MessageAttributes: map[string]*sqs.MessageAttributeValue{
"EventType": {
DataType: aws.String("String"),
StringValue: aws.String(event.Type),
},
},
})
return err
}
Event Sourcing
Store events as the source of truth:
type EventStore struct {
db *sql.DB
}
func (es *EventStore) Append(streamID string, event *Event) error {
_, err := es.db.Exec(`
INSERT INTO events (stream_id, event_type, data, version)
VALUES ($1, $2, $3, (
SELECT COALESCE(MAX(version), 0) + 1
FROM events WHERE stream_id = $1
))
`, streamID, event.Type, event.Data)
return err
}
CQRS Pattern
Separate read and write models:
// Write model - handles commands
type CommandHandler struct {
eventStore *EventStore
}
func (ch *CommandHandler) CreateOrder(cmd *CreateOrderCommand) error {
event := &OrderCreatedEvent{
OrderID: generateID(),
UserID: cmd.UserID,
Items: cmd.Items,
}
return ch.eventStore.Append(event.OrderID, event)
}
// Read model - optimized for queries
type QueryHandler struct {
db *sql.DB
}
func (qh *QueryHandler) GetOrder(id string) (*Order, error) {
var order Order
err := qh.db.QueryRow(`
SELECT id, user_id, status, total
FROM orders WHERE id = $1
`, id).Scan(&order.ID, &order.UserID, &order.Status, &order.Total)
return &order, err
}
At-Least-Once Delivery
Handle idempotency:
func (h *Handler) ProcessEvent(event *Event) error {
// Check if already processed
if h.isDuplicate(event.ID) {
return nil // Idempotent - skip
}
// Process event
if err := h.doWork(event); err != nil {
return err // Will retry
}
// Mark as processed
return h.markProcessed(event.ID)
}
Dead Letter Queues
Handle failed events systematically:
type DLQHandler struct {
mainQueue string
dlq string
maxRetries int
}
func (h *DLQHandler) ProcessEvent(event *Event) error {
retryCount := event.RetryCount
err := h.handleEvent(event)
if err != nil {
if retryCount >= h.maxRetries {
// Move to DLQ
return h.sendToDLQ(event, err)
}
// Retry with backoff
event.RetryCount++
backoff := time.Duration(math.Pow(2, float64(retryCount))) * time.Second
time.Sleep(backoff)
return h.requeue(event)
}
return nil
}
func (h *DLQHandler) sendToDLQ(event *Event, err error) error {
dlqEvent := &DLQEvent{
OriginalEvent: event,
FailureReason: err.Error(),
FailedAt: time.Now(),
RetryCount: event.RetryCount,
}
return h.publishToDLQ(dlqEvent)
}
Event Schema Evolution
Version events for backward compatibility:
type EventEnvelope struct {
Version int `json:"version"`
EventType string `json:"event_type"`
EventID string `json:"event_id"`
Timestamp time.Time `json:"timestamp"`
Payload json.RawMessage `json:"payload"`
}
type EventHandler struct {
handlers map[int]EventProcessor
}
func (eh *EventHandler) Process(envelope *EventEnvelope) error {
handler, exists := eh.handlers[envelope.Version]
if !exists {
return fmt.Errorf("unsupported event version: %d", envelope.Version)
}
return handler.Process(envelope.Payload)
}
// V1 handler
type OrderCreatedV1 struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Total float64 `json:"total"`
}
// V2 handler (added currency field)
type OrderCreatedV2 struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Total float64 `json:"total"`
Currency string `json:"currency"` // New field
}
func (eh *EventHandler) init() {
eh.handlers = map[int]EventProcessor{
1: &OrderCreatedV1Processor{},
2: &OrderCreatedV2Processor{},
}
}
Saga Pattern
Coordinate distributed transactions:
type OrderSaga struct {
sagaID string
state *SagaState
store SagaStore
}
type SagaState struct {
OrderID string
PaymentID string
InventoryReserved bool
PaymentProcessed bool
Status string // Pending, Completed, Failed, Compensating
}
func (s *OrderSaga) Execute() error {
// Step 1: Reserve inventory
if err := s.reserveInventory(); err != nil {
return s.compensate(err)
}
s.state.InventoryReserved = true
s.store.Save(s.state)
// Step 2: Process payment
if err := s.processPayment(); err != nil {
return s.compensate(err)
}
s.state.PaymentProcessed = true
s.store.Save(s.state)
// Step 3: Confirm order
if err := s.confirmOrder(); err != nil {
return s.compensate(err)
}
s.state.Status = "Completed"
s.store.Save(s.state)
return nil
}
func (s *OrderSaga) compensate(err error) error {
s.state.Status = "Compensating"
s.store.Save(s.state)
// Compensate in reverse order
if s.state.PaymentProcessed {
s.refundPayment()
}
if s.state.InventoryReserved {
s.releaseInventory()
}
s.state.Status = "Failed"
s.store.Save(s.state)
return err
}
Event Replay
Rebuild state from events:
type EventReplayer struct {
eventStore EventStore
projections map[string]Projection
}
func (er *EventReplayer) Rebuild(streamID string, projection Projection) error {
// Fetch all events for stream
events, err := er.eventStore.GetEvents(streamID, 0, -1)
if err != nil {
return err
}
// Reset projection state
projection.Reset()
// Replay events in order
for _, event := range events {
if err := projection.Apply(event); err != nil {
return fmt.Errorf("failed to apply event %s: %w", event.ID, err)
}
}
return nil
}
// Example projection
type OrderProjection struct {
OrderID string
Status string
Items []OrderItem
TotalAmount int64
}
func (op *OrderProjection) Apply(event *Event) error {
switch event.Type {
case "OrderCreated":
data := event.Data.(*OrderCreatedEvent)
op.OrderID = data.OrderID
op.Status = "Created"
op.Items = data.Items
case "PaymentProcessed":
data := event.Data.(*PaymentProcessedEvent)
op.TotalAmount = data.Amount
op.Status = "Paid"
case "OrderShipped":
op.Status = "Shipped"
case "OrderCancelled":
op.Status = "Cancelled"
}
return nil
}
Stream Processing
Process events in real-time:
type StreamProcessor struct {
consumer StreamConsumer
processors []EventProcessor
}
func (sp *StreamProcessor) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
event, err := sp.consumer.Poll(100 * time.Millisecond)
if err != nil {
continue
}
// Process event through pipeline
for _, processor := range sp.processors {
if err := processor.Process(event); err != nil {
// Handle error, potentially send to DLQ
sp.handleError(event, err)
break
}
}
// Commit offset
sp.consumer.Commit(event.Offset)
}
}
}
// Example processors
type FilterProcessor struct {
predicate func(*Event) bool
}
func (fp *FilterProcessor) Process(event *Event) error {
if !fp.predicate(event) {
return ErrFiltered
}
return nil
}
type TransformProcessor struct {
transformer func(*Event) (*Event, error)
}
func (tp *TransformProcessor) Process(event *Event) error {
transformed, err := tp.transformer(event)
if err != nil {
return err
}
*event = *transformed
return nil
}
type AggregateProcessor struct {
window time.Duration
buffer []*Event
flush func([]*Event) error
}
func (ap *AggregateProcessor) Process(event *Event) error {
ap.buffer = append(ap.buffer, event)
if len(ap.buffer) >= 100 || time.Since(ap.buffer[0].Timestamp) > ap.window {
if err := ap.flush(ap.buffer); err != nil {
return err
}
ap.buffer = ap.buffer[:0]
}
return nil
}
Monitoring Event Flows
Track event processing metrics:
type EventMetrics struct {
published prometheus.Counter
processed prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
queueDepth prometheus.Gauge
}
func NewEventMetrics() *EventMetrics {
return &EventMetrics{
published: prometheus.NewCounter(prometheus.CounterOpts{
Name: "events_published_total",
Help: "Total number of events published",
}),
processed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "events_processed_total",
Help: "Total number of events processed",
}, []string{"event_type", "status"}),
failed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "events_failed_total",
Help: "Total number of failed events",
}, []string{"event_type", "error_type"}),
duration: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "event_processing_duration_seconds",
Help: "Event processing duration",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10),
}),
queueDepth: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "event_queue_depth",
Help: "Number of events waiting in queue",
}, []string{"queue_name"}),
}
}
func (em *EventMetrics) RecordPublished() {
em.published.Inc()
}
func (em *EventMetrics) RecordProcessed(eventType, status string, duration time.Duration) {
em.processed.WithLabelValues(eventType, status).Inc()
em.duration.Observe(duration.Seconds())
}
func (em *EventMetrics) RecordFailed(eventType, errorType string) {
em.failed.WithLabelValues(eventType, errorType).Inc()
}
Query event metrics:
# Event processing rate
rate(events_processed_total[5m])
# Error rate by event type
rate(events_failed_total[5m]) / rate(events_published_total[5m])
# Processing latency p99
histogram_quantile(0.99, rate(event_processing_duration_seconds_bucket[5m]))
# Queue backlog
event_queue_depth
Event-Driven Security
Secure event flows:
type SecureEventPublisher struct {
signer EventSigner
encryptor EventEncryptor
}
func (sep *SecureEventPublisher) Publish(event *Event) error {
// Encrypt sensitive data
if event.ContainsSensitiveData() {
encrypted, err := sep.encryptor.Encrypt(event.Payload)
if err != nil {
return err
}
event.Payload = encrypted
event.Encrypted = true
}
// Sign event for integrity
signature, err := sep.signer.Sign(event)
if err != nil {
return err
}
event.Signature = signature
return sep.publisher.Publish(event)
}
type SecureEventConsumer struct {
verifier EventVerifier
decryptor EventDecryptor
}
func (sec *SecureEventConsumer) Consume(event *Event) error {
// Verify signature
if err := sec.verifier.Verify(event); err != nil {
return fmt.Errorf("invalid event signature: %w", err)
}
// Decrypt if needed
if event.Encrypted {
decrypted, err := sec.decryptor.Decrypt(event.Payload)
if err != nil {
return err
}
event.Payload = decrypted
event.Encrypted = false
}
return sec.processEvent(event)
}
Testing Event-Driven Systems
Test event flows comprehensively:
func TestEventSourcing(t *testing.T) {
// Setup in-memory event store
store := NewInMemoryEventStore()
handler := NewOrderCommandHandler(store)
// Execute commands
orderID := "order-123"
handler.CreateOrder(&CreateOrderCommand{
OrderID: orderID,
UserID: "user-456",
Items: []OrderItem{{ProductID: "prod-1", Quantity: 2}},
})
handler.ProcessPayment(&ProcessPaymentCommand{
OrderID: orderID,
Amount: 5000,
})
// Verify events
events, _ := store.GetEvents(orderID, 0, -1)
assert.Len(t, events, 2)
assert.Equal(t, "OrderCreated", events[0].Type)
assert.Equal(t, "PaymentProcessed", events[1].Type)
// Rebuild state from events
projection := &OrderProjection{}
for _, event := range events {
projection.Apply(event)
}
assert.Equal(t, "Paid", projection.Status)
assert.Equal(t, int64(5000), projection.TotalAmount)
}
func TestSagaCompensation(t *testing.T) {
saga := NewOrderSaga("saga-123")
// Simulate payment failure
saga.inventoryService = &MockInventoryService{
reserveErr: nil,
}
saga.paymentService = &MockPaymentService{
processErr: errors.New("payment declined"),
}
err := saga.Execute()
assert.Error(t, err)
// Verify compensation occurred
assert.False(t, saga.state.InventoryReserved) // Should be released
assert.Equal(t, "Failed", saga.state.Status)
}
Conclusion
Event-driven architectures enable scalable, decoupled systems through:
- Message queues for asynchronous communication
- Event sourcing for auditability and replay
- CQRS for read/write optimization
- Idempotency to handle at-least-once delivery
- Saga pattern for distributed transactions
- Dead letter queues for failed event handling
- Schema evolution for backward compatibility
- Stream processing for real-time analytics
- Comprehensive monitoring of event flows
- Security through encryption and signing
Design event-driven systems for at-least-once delivery, handle duplicates gracefully through idempotency, version your events for evolution, implement sagas for distributed transactions, and monitor event flows comprehensively.
The power of event-driven architectures lies in their ability to scale independently, evolve gracefully, and provide complete audit trails. However, they introduce complexity in debugging, data consistency, and operational management. Invest in observability, testing, and documentation to make event-driven systems maintainable and reliable.