Platform engineering is about building internal tools and workflows that enable product teams to ship faster with less friction. After building platforms serving 12+ engineering teams and 60+ microservices, I’ll share what works and what doesn’t when creating developer-centric infrastructure.

What is Platform Engineering?

Platform engineering creates a curated set of tools, APIs, and workflows that abstract away infrastructure complexity while providing flexibility where needed.

Before Platform:
Team β†’ Learn Kubernetes β†’ Configure networking β†’ Set up monitoring β†’
Configure logging β†’ Set up CI/CD β†’ Configure database β†’ Deploy

After Platform:
Team β†’ `platform deploy my-service` β†’ Done

The Self-Service Platform

Service Scaffolding

# CLI tool for creating new services
import click
import yaml
from jinja2 import Template

@click.group()
def cli():
    """Internal developer platform CLI"""
    pass

@cli.command()
@click.argument('service_name')
@click.option('--language', type=click.Choice(['python', 'java', 'go', 'node']))
@click.option('--type', type=click.Choice(['api', 'worker', 'cron']))
def create(service_name, language, type):
    """Create a new service with best practices baked in"""

    click.echo(f"Creating {language} {type} service: {service_name}")

    # Generate from templates
    template_dir = f"templates/{language}/{type}"

    # 1. Generate application code
    generate_app_scaffold(service_name, template_dir)

    # 2. Generate Dockerfile with security best practices
    generate_dockerfile(service_name, language)

    # 3. Generate Kubernetes manifests
    generate_k8s_manifests(service_name, type)

    # 4. Generate CI/CD pipeline
    generate_cicd(service_name, language)

    # 5. Set up monitoring
    generate_monitoring_config(service_name)

    # 6. Set up logging
    generate_logging_config(service_name)

    # 7. Create database if needed
    if click.confirm('Does this service need a database?'):
        db_type = click.prompt('Database type',
                              type=click.Choice(['postgres', 'mysql', 'mongo']))
        provision_database(service_name, db_type)

    click.echo(f"βœ“ Service {service_name} created successfully!")
    click.echo(f"  Repository: https://github.com/myorg/{service_name}")
    click.echo(f"  Next steps:")
    click.echo(f"    cd {service_name}")
    click.echo(f"    platform deploy --env dev")

def generate_k8s_manifests(service_name, type):
    """Generate production-ready Kubernetes manifests"""

    deployment_template = Template("""
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ service_name }}
  labels:
    app: {{ service_name }}
    team: {{ team }}
spec:
  replicas: 3
  selector:
    matchLabels:
      app: {{ service_name }}
  template:
    metadata:
      labels:
        app: {{ service_name }}
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "{{ metrics_port }}"
    spec:
      serviceAccountName: {{ service_name }}
      containers:
      - name: {{ service_name }}
        image: {{ registry }}/{{ service_name }}:{{ "{{" }} .Values.version {{ "}}" }}
        ports:
        - containerPort: {{ app_port }}
        - containerPort: {{ metrics_port }}
          name: metrics
        env:
        - name: LOG_LEVEL
          value: info
        - name: ENVIRONMENT
          value: {{ "{{" }} .Values.environment {{ "}}" }}
        envFrom:
        - secretRef:
            name: {{ service_name }}-secrets
        - configMapRef:
            name: {{ service_name }}-config
        resources:
          requests:
            memory: "{{ memory_request }}"
            cpu: "{{ cpu_request }}"
          limits:
            memory: "{{ memory_limit }}"
            cpu: "{{ cpu_limit }}"
        livenessProbe:
          httpGet:
            path: /health
            port: {{ app_port }}
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: {{ app_port }}
          initialDelaySeconds: 5
          periodSeconds: 5
        securityContext:
          runAsNonRoot: true
          runAsUser: 1000
          readOnlyRootFilesystem: true
          allowPrivilegeEscalation: false
    """)

    manifest = deployment_template.render(
        service_name=service_name,
        team=get_team_from_context(),
        registry="myregistry.com",
        app_port=8080,
        metrics_port=9090,
        memory_request="256Mi",
        memory_limit="512Mi",
        cpu_request="200m",
        cpu_limit="500m"
    )

    with open(f"{service_name}/k8s/deployment.yaml", "w") as f:
        f.write(manifest)

Deployment Automation

// Platform deployment service
package main

import (
    "context"
    "fmt"
    "time"
)

type DeploymentRequest struct {
    ServiceName string
    Version     string
    Environment string
    Team        string
}

type PlatformDeployer struct {
    k8sClient       *kubernetes.Clientset
    gitopsRepo      *git.Repository
    notifier        *Notifier
    healthChecker   *HealthChecker
}

func (p *PlatformDeployer) Deploy(ctx context.Context, req DeploymentRequest) error {
    fmt.Printf("Deploying %s version %s to %s\n",
        req.ServiceName, req.Version, req.Environment)

    // 1. Validate deployment request
    if err := p.validateDeployment(req); err != nil {
        return fmt.Errorf("validation failed: %w", err)
    }

    // 2. Run pre-deployment checks
    if err := p.preDeploymentChecks(req); err != nil {
        return fmt.Errorf("pre-deployment checks failed: %w", err)
    }

    // 3. Update GitOps repository
    if err := p.updateGitOpsRepo(req); err != nil {
        return fmt.Errorf("GitOps update failed: %w", err)
    }

    // 4. Wait for ArgoCD to sync
    if err := p.waitForSync(req); err != nil {
        return fmt.Errorf("sync failed: %w", err)
    }

    // 5. Health check new deployment
    if err := p.healthCheckDeployment(req, 5*time.Minute); err != nil {
        // Automatic rollback on health check failure
        p.rollback(req)
        return fmt.Errorf("health check failed, rolled back: %w", err)
    }

    // 6. Run smoke tests
    if err := p.runSmokeTests(req); err != nil {
        p.rollback(req)
        return fmt.Errorf("smoke tests failed, rolled back: %w", err)
    }

    // 7. Notify team
    p.notifier.NotifySuccess(req)

    fmt.Printf("βœ“ Deployment successful: %s@%s in %s\n",
        req.ServiceName, req.Version, req.Environment)

    return nil
}

func (p *PlatformDeployer) validateDeployment(req DeploymentRequest) error {
    // Check if service exists
    if !p.serviceExists(req.ServiceName) {
        return fmt.Errorf("service %s not found", req.ServiceName)
    }

    // Validate version format (semantic versioning)
    if !isValidVersion(req.Version) {
        return fmt.Errorf("invalid version format: %s", req.Version)
    }

    // Check if version exists in registry
    if !p.imageExists(req.ServiceName, req.Version) {
        return fmt.Errorf("image not found: %s:%s",
            req.ServiceName, req.Version)
    }

    // Validate environment
    validEnvs := []string{"dev", "staging", "production"}
    if !contains(validEnvs, req.Environment) {
        return fmt.Errorf("invalid environment: %s", req.Environment)
    }

    // Check deployment permissions
    if !p.hasDeployPermission(req.Team, req.Environment) {
        return fmt.Errorf("team %s cannot deploy to %s",
            req.Team, req.Environment)
    }

    return nil
}

func (p *PlatformDeployer) healthCheckDeployment(
    req DeploymentRequest,
    timeout time.Duration) error {

    deadline := time.Now().Add(timeout)

    for time.Now().Before(deadline) {
        healthy, err := p.healthChecker.Check(
            req.ServiceName,
            req.Environment,
        )

        if err != nil {
            return err
        }

        if healthy {
            return nil
        }

        time.Sleep(10 * time.Second)
    }

    return fmt.Errorf("deployment did not become healthy within %v", timeout)
}

Observability Integration

class ObservabilityPlatform:
    """
    Automatically set up monitoring, logging, and tracing for services
    """

    def setup_service_observability(self, service_name: str, team: str):
        """
        Configure complete observability stack for a service
        """
        print(f"Setting up observability for {service_name}")

        # 1. Create Prometheus ServiceMonitor
        self.create_service_monitor(service_name)

        # 2. Create default Grafana dashboards
        self.create_grafana_dashboards(service_name, team)

        # 3. Create default alerts
        self.create_alerts(service_name, team)

        # 4. Configure log aggregation
        self.setup_logging(service_name)

        # 5. Configure distributed tracing
        self.setup_tracing(service_name)

        print(f"βœ“ Observability configured for {service_name}")
        print(f"  Metrics: https://grafana.myorg.com/d/{service_name}")
        print(f"  Logs: https://kibana.myorg.com/app/logs?service={service_name}")
        print(f"  Traces: https://jaeger.myorg.com/search?service={service_name}")

    def create_service_monitor(self, service_name: str):
        """
        Create Prometheus ServiceMonitor for automatic scraping
        """
        service_monitor = {
            'apiVersion': 'monitoring.coreos.com/v1',
            'kind': 'ServiceMonitor',
            'metadata': {
                'name': service_name,
                'labels': {'app': service_name}
            },
            'spec': {
                'selector': {
                    'matchLabels': {'app': service_name}
                },
                'endpoints': [{
                    'port': 'metrics',
                    'interval': '30s',
                    'path': '/metrics'
                }]
            }
        }

        self.k8s_client.create_resource(service_monitor)

    def create_grafana_dashboards(self, service_name: str, team: str):
        """
        Create standard dashboards for service metrics
        """
        dashboard = {
            'dashboard': {
                'title': f'{service_name} - Overview',
                'tags': [team, service_name],
                'timezone': 'UTC',
                'panels': [
                    self.create_panel(
                        'Request Rate',
                        f'rate(http_requests_total{{service="{service_name}"}}[5m])'
                    ),
                    self.create_panel(
                        'Error Rate',
                        f'rate(http_requests_total{{service="{service_name}",status=~"5.."}}[5m])'
                    ),
                    self.create_panel(
                        'Latency (p99)',
                        f'histogram_quantile(0.99, http_request_duration_seconds{{service="{service_name}"}})'
                    ),
                    self.create_panel(
                        'CPU Usage',
                        f'rate(container_cpu_usage_seconds_total{{pod=~"{service_name}.*"}}[5m])'
                    ),
                    self.create_panel(
                        'Memory Usage',
                        f'container_memory_working_set_bytes{{pod=~"{service_name}.*"}}'
                    ),
                ]
            },
            'overwrite': True
        }

        self.grafana_client.create_dashboard(dashboard)

    def create_alerts(self, service_name: str, team: str):
        """
        Create default alerting rules
        """
        alerts = f"""
groups:
- name: {service_name}
  interval: 30s
  rules:
  - alert: HighErrorRate
    expr: |
      rate(http_requests_total{{service="{service_name}",status=~"5.."}}[5m])
      / rate(http_requests_total{{service="{service_name}"}}[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
      team: {team}
    annotations:
      summary: "High error rate for {service_name}"
      description: "Error rate is {{{{ $value }}}} (>5%)"

  - alert: HighLatency
    expr: |
      histogram_quantile(0.99,
        http_request_duration_seconds{{service="{service_name}"}}) > 1
    for: 5m
    labels:
      severity: warning
      team: {team}
    annotations:
      summary: "High latency for {service_name}"
      description: "p99 latency is {{{{ $value }}}}s"

  - alert: ServiceDown
    expr: up{{job="{service_name}"}} == 0
    for: 2m
    labels:
      severity: critical
      team: {team}
    annotations:
      summary: "{service_name} is down"
      description: "Service has been down for 2 minutes"

  - alert: PodCrashLooping
    expr: |
      rate(kube_pod_container_status_restarts_total{{pod=~"{service_name}.*"}}[15m]) > 0
    for: 5m
    labels:
      severity: warning
      team: {team}
    annotations:
      summary: "Pod crash looping for {service_name}"
      description: "Pod {{{{ $labels.pod }}}} is crash looping"
        """

        self.prometheus_client.create_alert_rules(alerts)

Developer Portal

// Developer portal - central hub for all platform services
import express from 'express';

const app = express();

// Service catalog endpoint
app.get('/api/services', async (req, res) => {
  const services = await db.query(`
    SELECT
      s.name,
      s.description,
      s.team,
      s.repository_url,
      s.documentation_url,
      COUNT(DISTINCT d.id) as deployment_count,
      MAX(d.deployed_at) as last_deployed
    FROM services s
    LEFT JOIN deployments d ON s.id = d.service_id
    GROUP BY s.id
    ORDER BY s.name
  `);

  res.json(services);
});

// Service health status
app.get('/api/services/:name/health', async (req, res) => {
  const { name } = req.params;

  const health = await Promise.all([
    checkKubernetesHealth(name),
    checkMetricsHealth(name),
    checkDatabaseHealth(name),
  ]);

  const overall = health.every(h => h.status === 'healthy')
    ? 'healthy'
    : 'unhealthy';

  res.json({
    service: name,
    status: overall,
    checks: health,
    lastChecked: new Date(),
  });
});

// Service dependencies
app.get('/api/services/:name/dependencies', async (req, res) => {
  const { name } = req.params;

  // Analyze service mesh data to find dependencies
  const dependencies = await serviceMesh.getDependencies(name);

  res.json({
    service: name,
    upstream: dependencies.upstream,   // Services this depends on
    downstream: dependencies.downstream, // Services that depend on this
  });
});

// Deployment history
app.get('/api/services/:name/deployments', async (req, res) => {
  const { name } = req.params;

  const deployments = await db.query(`
    SELECT
      version,
      environment,
      deployed_by,
      deployed_at,
      status,
      rollback_at
    FROM deployments
    WHERE service_name = $1
    ORDER BY deployed_at DESC
    LIMIT 50
  `, [name]);

  res.json(deployments);
});

// Cost tracking
app.get('/api/services/:name/costs', async (req, res) => {
  const { name } = req.params;

  const costs = await costTracker.getServiceCosts(name, {
    period: req.query.period || '30d'
  });

  res.json({
    service: name,
    compute: costs.compute,
    storage: costs.storage,
    network: costs.network,
    total: costs.total,
    trend: costs.trend, // Increasing/decreasing/stable
  });
});

Database Self-Service

class DatabasePlatform:
    """
    Self-service database provisioning and management
    """

    def provision_database(self, service_name: str, db_type: str,
                          environment: str, team: str):
        """
        Provision a new database for a service
        """
        print(f"Provisioning {db_type} database for {service_name}")

        # 1. Create database instance (or use existing shared cluster)
        if environment == 'production':
            # Dedicated instance for production
            db_instance = self.create_dedicated_instance(
                name=f"{service_name}-{environment}",
                db_type=db_type,
                size='medium',
                backup_retention_days=30,
                multi_az=True
            )
        else:
            # Shared cluster for dev/staging
            db_instance = self.get_shared_cluster(db_type, environment)

        # 2. Create database and user
        db_name = f"{service_name}_{environment}"
        db_user = f"{service_name}_user"
        db_password = self.generate_secure_password()

        self.create_database(db_instance, db_name)
        self.create_user(db_instance, db_user, db_password, db_name)

        # 3. Store credentials in secret manager
        self.secrets_manager.create_secret(
            name=f"{service_name}-{environment}-db",
            data={
                'host': db_instance.endpoint,
                'port': db_instance.port,
                'database': db_name,
                'username': db_user,
                'password': db_password,
                'connection_string': self.build_connection_string(
                    db_type, db_instance, db_name, db_user, db_password
                )
            }
        )

        # 4. Configure backups
        self.configure_backups(db_instance, db_name)

        # 5. Set up monitoring
        self.setup_db_monitoring(db_instance, db_name, team)

        # 6. Grant service access via Kubernetes secret
        self.create_k8s_secret(service_name, environment, db_instance)

        print(f"βœ“ Database provisioned successfully")
        print(f"  Host: {db_instance.endpoint}")
        print(f"  Database: {db_name}")
        print(f"  Connection details stored in secret: {service_name}-{environment}-db")

        return {
            'host': db_instance.endpoint,
            'database': db_name,
            'secret_name': f"{service_name}-{environment}-db"
        }

    def create_migration_job(self, service_name: str, environment: str):
        """
        Create Kubernetes job for database migrations
        """
        job_manifest = {
            'apiVersion': 'batch/v1',
            'kind': 'Job',
            'metadata': {
                'name': f"{service_name}-migration-{int(time.time())}",
                'labels': {'app': service_name, 'type': 'migration'}
            },
            'spec': {
                'template': {
                    'spec': {
                        'restartPolicy': 'Never',
                        'containers': [{
                            'name': 'migration',
                            'image': f'myregistry/{service_name}:latest',
                            'command': ['./migrate.sh'],
                            'envFrom': [{
                                'secretRef': {
                                    'name': f"{service_name}-{environment}-db"
                                }
                            }]
                        }]
                    }
                },
                'backoffLimit': 3
            }
        }

        self.k8s_client.create_job(job_manifest)

Platform Metrics

class PlatformMetrics:
    """
    Track platform adoption and effectiveness
    """

    def collect_metrics(self):
        """
        Collect key platform metrics
        """
        return {
            # Developer productivity
            'deployment_frequency': self.get_deployment_frequency(),
            'lead_time_for_changes': self.get_lead_time(),
            'mean_time_to_recovery': self.get_mttr(),
            'change_failure_rate': self.get_change_failure_rate(),

            # Platform adoption
            'services_onboarded': self.count_services(),
            'platform_api_calls': self.count_api_calls(),
            'self_service_usage': self.measure_self_service_adoption(),

            # Developer satisfaction
            'nps_score': self.get_developer_nps(),
            'platform_incidents': self.count_platform_incidents(),

            # Cost efficiency
            'cost_per_service': self.calculate_cost_per_service(),
            'resource_utilization': self.measure_resource_utilization(),
        }

    def get_deployment_frequency(self):
        """How often do teams deploy?"""
        deployments_per_day = self.db.query("""
            SELECT COUNT(*) / COUNT(DISTINCT DATE(deployed_at))
            FROM deployments
            WHERE deployed_at > NOW() - INTERVAL '30 days'
        """)[0][0]

        return deployments_per_day

    def get_lead_time(self):
        """Time from commit to production"""
        avg_lead_time = self.db.query("""
            SELECT AVG(
                EXTRACT(EPOCH FROM (deployed_at - committed_at)) / 3600
            )
            FROM deployments d
            JOIN commits c ON d.commit_sha = c.sha
            WHERE d.environment = 'production'
              AND d.deployed_at > NOW() - INTERVAL '30 days'
        """)[0][0]

        return f"{avg_lead_time:.1f} hours"

Key Takeaways

  1. Abstract complexity, not flexibility: Provide sensible defaults but allow customization
  2. Self-service is critical: Teams should deploy without tickets
  3. Bake in best practices: Security, monitoring, and reliability should be automatic
  4. Measure platform value: Track deployment frequency, lead time, and developer satisfaction
  5. Dog food your platform: Platform team should use their own tools
  6. Documentation is product: Treat docs as first-class deliverables
  7. Iterate based on feedback: Regular retros with platform users

Platform engineering is about reducing cognitive load while maintaining velocity. Start with biggest pain points, automate incrementally, and always measure impact.