Microservices dengan Python
Dahulu, aplikasi dibangun sebagai satu kesatuan raksasa (Monolith). Jika satu bagian rusak, seluruh aplikasi bisa mati. Jika ingin update satu fitur kecil, seluruh aplikasi harus di-deploy ulang.
Microservices memecahkan masalah ini dengan memecah aplikasi menjadi layanan-layanan kecil yang independen. Setiap layanan menjalankan prosesnya sendiri dan berkomunikasi melalui mekanisme ringan, biasanya HTTP API atau Message Queue.
ποΈ Monolith vs Microservices
| Aspek | Monolith | Microservices |
|---|---|---|
| Deployment | Semua sekaligus | Deploy per service |
| Scaling | Scale seluruh app | Scale service tertentu saja |
| Teknologi | Satu stack | Bisa beda-beda (Polyglot) |
| Kegagalan | Satu error = semua down | Service lain tetap jalan |
| Kompleksitas | Mudah di awal | Kompleks tapi terstruktur |
| Cocok untuk | MVP, startup awal | Sistem besar, tim banyak |
β¨ Karakteristik Microservices
- Single Responsibility: Setiap service hanya fokus pada satu domain bisnis
- Independen: Bisa di-deploy, di-update, dan di-scale secara terpisah
- Terdesentralisasi: Setiap service boleh menggunakan teknologi/database yang berbeda
- Resilien: Kegagalan satu service tidak mematikan sistem secara keseluruhan
- Observable: Mudah di-monitor dan di-debug dengan logging terpusat
π Python untuk Microservices
Python sangat cocok untuk microservices karena:
| Framework | Keunggulan | Use Case |
|---|---|---|
| FastAPI | Async, auto-docs, type hints | REST API, high-performance |
| Flask | Minimalis, fleksibel | Simple services, internal tools |
| Nameko | RPC, message queues | Event-driven architecture |
| gRPC | Binary protocol, super cepat | Inter-service communication |
π Contoh Arsitektur: Toko Online
Alih-alih satu kode besar, kita membagi aplikasi menjadi beberapa service:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β API Gateway β
β (Kong / Nginx / Traefik) β
ββββββββββββ¬βββββββββββββββββββ¬βββββββββββββββββββ¬βββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ
β User Service β β Product Service β β Order Service β
β FastAPI :8001 β β FastAPI :8002 β β FastAPI :8003 β
β β β β β β
β βββ PostgreSQL β β βββ MongoDB β β βββ PostgreSQL β
ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ
β β β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β
ββββββββββββΌβββββββββββ
β Message Queue β
β (RabbitMQ) β
ββββββββββββ¬βββββββββββ
β
ββββββββββββΌβββββββββββ
β Notification Serviceβ
β (Email, SMS) β
βββββββββββββββββββββββ
π¦ Implementasi: User Service
Mari buat User Service menggunakan FastAPI:
Struktur Folder
user-service/
βββ app/
β βββ __init__.py
β βββ main.py # Entry point FastAPI
β βββ config.py # Konfigurasi
β βββ models.py # Database models
β βββ schemas.py # Pydantic schemas
β βββ crud.py # Database operations
β βββ routers/
β βββ __init__.py
β βββ users.py # User endpoints
βββ requirements.txt
βββ Dockerfile
βββ docker-compose.yml
Main Application
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import users
from .config import settings
app = FastAPI(
title="User Service",
description="Microservice untuk manajemen user",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(users.router, prefix="/users", tags=["Users"])
@app.get("/health")
async def health_check():
"""Health check endpoint untuk Kubernetes/Docker"""
return {"status": "healthy", "service": "user-service"}
User Models & Schemas
# app/models.py
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True)
username = Column(String(100), unique=True, index=True)
hashed_password = Column(String(255))
full_name = Column(String(200))
is_active = Column(Boolean, default=True)
is_verified = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# app/schemas.py
from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import Optional
class UserCreate(BaseModel):
email: EmailStr
username: str
password: str
full_name: str
class UserResponse(BaseModel):
id: int
email: str
username: str
full_name: str
is_active: bool
is_verified: bool
created_at: datetime
class Config:
from_attributes = True
class UserLogin(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
User Router/Endpoints
# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from .. import crud, schemas
from ..database import get_db
router = APIRouter()
@router.post("/register", response_model=schemas.UserResponse, status_code=201)
async def register_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
"""Registrasi user baru"""
# Cek apakah email sudah terdaftar
existing_user = crud.get_user_by_email(db, user.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email sudah terdaftar"
)
# Buat user baru
return crud.create_user(db, user)
@router.get("/{user_id}", response_model=schemas.UserResponse)
async def get_user(user_id: int, db: Session = Depends(get_db)):
"""Ambil data user berdasarkan ID"""
user = crud.get_user(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User tidak ditemukan"
)
return user
@router.get("/", response_model=List[schemas.UserResponse])
async def list_users(skip: int = 0, limit: int = 50, db: Session = Depends(get_db)):
"""Daftar semua user dengan pagination"""
return crud.get_users(db, skip=skip, limit=limit)
@router.post("/login", response_model=schemas.TokenResponse)
async def login(credentials: schemas.UserLogin, db: Session = Depends(get_db)):
"""Login dan dapatkan JWT token"""
user = crud.authenticate_user(db, credentials.email, credentials.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email atau password salah"
)
access_token = crud.create_access_token(data={"sub": str(user.id)})
return {"access_token": access_token}
π¦ Implementasi: Product Service
Contoh service yang berbeda - menggunakan MongoDB:
# product-service/app/main.py
from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from typing import List, Optional
from bson import ObjectId
app = FastAPI(title="Product Service")
# MongoDB connection
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.tokoonline
products_collection = db.products
class Product(BaseModel):
name: str
description: str
price: float
stock: int
category: str
image_url: Optional[str] = None
class ProductResponse(Product):
id: str
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "product-service"}
@app.get("/products", response_model=List[ProductResponse])
async def get_products(
skip: int = 0,
limit: int = 20,
category: Optional[str] = None
):
"""Ambil daftar produk dengan filter opsional"""
query = {}
if category:
query["category"] = category
cursor = products_collection.find(query).skip(skip).limit(limit)
products = []
async for product in cursor:
product["id"] = str(product["_id"])
del product["_id"]
products.append(ProductResponse(**product))
return products
@app.get("/products/{product_id}", response_model=ProductResponse)
async def get_product(product_id: str):
"""Ambil detail produk"""
product = await products_collection.find_one({"_id": ObjectId(product_id)})
if not product:
raise HTTPException(status_code=404, detail="Produk tidak ditemukan")
product["id"] = str(product["_id"])
del product["_id"]
return ProductResponse(**product)
@app.post("/products", response_model=ProductResponse, status_code=201)
async def create_product(product: Product):
"""Buat produk baru"""
result = await products_collection.insert_one(product.dict())
created = await products_collection.find_one({"_id": result.inserted_id})
created["id"] = str(created["_id"])
del created["_id"]
return ProductResponse(**created)
@app.put("/products/{product_id}/stock")
async def update_stock(product_id: str, quantity: int):
"""Update stok produk (dipanggil oleh Order Service)"""
result = await products_collection.update_one(
{"_id": ObjectId(product_id)},
{"$inc": {"stock": quantity}}
)
if result.modified_count == 0:
raise HTTPException(status_code=404, detail="Produk tidak ditemukan")
return {"message": "Stok berhasil diupdate"}
π Komunikasi Antar Service
1. Synchronous (HTTP Request)
Order Service perlu data user? Ia akan "menelpon" User Service:
# order-service/app/services/user_client.py
import httpx
from typing import Optional
from ..config import settings
class UserServiceClient:
"""Client untuk komunikasi dengan User Service"""
def __init__(self):
self.base_url = settings.USER_SERVICE_URL # http://user-service:8001
async def get_user(self, user_id: int) -> Optional[dict]:
"""Ambil data user dari User Service"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{self.base_url}/users/{user_id}",
timeout=5.0 # Timeout 5 detik
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
# Log error dan return None (graceful degradation)
print(f"Error calling User Service: {e}")
return None
async def verify_token(self, token: str) -> Optional[dict]:
"""Verifikasi JWT token"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{self.base_url}/users/verify",
headers={"Authorization": f"Bearer {token}"}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError:
return None
# Penggunaan di Order Service
user_client = UserServiceClient()
@app.post("/orders")
async def create_order(order: OrderCreate, token: str = Depends(oauth2_scheme)):
# Verifikasi user
user = await user_client.verify_token(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
# Proses order...
2. Asynchronous (Message Queue)
Untuk operasi yang tidak perlu response langsung (seperti kirim email), gunakan Message Queue:
# notification-service/app/main.py
import pika
import json
from fastapi import FastAPI, BackgroundTasks
app = FastAPI(title="Notification Service")
# RabbitMQ connection
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()
channel.queue_declare(queue='notifications', durable=True)
def send_email(to: str, subject: str, body: str):
"""Fungsi untuk kirim email (bisa pakai SendGrid, Mailgun, dll)"""
print(f"Sending email to {to}: {subject}")
# Implementasi kirim email sebenarnya
pass
def callback(ch, method, properties, body):
"""Callback saat ada message baru di queue"""
message = json.loads(body)
if message["type"] == "welcome_email":
send_email(
to=message["email"],
subject="Selamat Datang di Toko Online!",
body=f"Halo {message['name']}, terima kasih telah bergabung!"
)
elif message["type"] == "order_confirmation":
send_email(
to=message["email"],
subject=f"Konfirmasi Order #{message['order_id']}",
body=f"Order Anda sedang diproses. Total: Rp {message['total']:,}"
)
ch.basic_ack(delivery_tag=method.delivery_tag)
# Start consuming messages
channel.basic_consume(queue='notifications', on_message_callback=callback)
@app.on_event("startup")
async def startup():
import threading
thread = threading.Thread(target=channel.start_consuming)
thread.daemon = True
thread.start()
# user-service: Kirim message ke queue setelah registrasi
import pika
import json
def publish_notification(message: dict):
"""Publish message ke RabbitMQ"""
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()
channel.basic_publish(
exchange='',
routing_key='notifications',
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # Persistent
)
connection.close()
# Di endpoint register
@router.post("/register")
async def register_user(user: UserCreate, db: Session = Depends(get_db)):
new_user = crud.create_user(db, user)
# Kirim welcome email via message queue (async)
publish_notification({
"type": "welcome_email",
"email": new_user.email,
"name": new_user.full_name
})
return new_user
π³ Multi-Service dengan Docker Compose
Jalankan semua service sekaligus:
# docker-compose.yml
version: '3.8'
services:
# API Gateway
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- user-service
- product-service
- order-service
# User Service
user-service:
build: ./user-service
ports:
- "8001:8000"
environment:
- DATABASE_URL=postgresql://user:password@postgres-user:5432/userdb
- JWT_SECRET=your-secret-key
depends_on:
- postgres-user
- rabbitmq
# Product Service
product-service:
build: ./product-service
ports:
- "8002:8000"
environment:
- MONGODB_URL=mongodb://mongodb:27017/products
depends_on:
- mongodb
# Order Service
order-service:
build: ./order-service
ports:
- "8003:8000"
environment:
- DATABASE_URL=postgresql://user:password@postgres-order:5432/orderdb
- USER_SERVICE_URL=http://user-service:8000
- PRODUCT_SERVICE_URL=http://product-service:8000
depends_on:
- postgres-order
- rabbitmq
# Notification Service
notification-service:
build: ./notification-service
environment:
- RABBITMQ_URL=amqp://rabbitmq:5672
depends_on:
- rabbitmq
# Databases
postgres-user:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=userdb
volumes:
- postgres-user-data:/var/lib/postgresql/data
postgres-order:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=orderdb
volumes:
- postgres-order-data:/var/lib/postgresql/data
mongodb:
image: mongo:6
volumes:
- mongodb-data:/data/db
# Message Queue
rabbitmq:
image: rabbitmq:3-management
ports:
- "15672:15672" # Management UI
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
volumes:
postgres-user-data:
postgres-order-data:
mongodb-data:
Jalankan dengan:
docker-compose up -d
π API Gateway dengan Nginx
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream user_service {
server user-service:8000;
}
upstream product_service {
server product-service:8000;
}
upstream order_service {
server order-service:8000;
}
server {
listen 80;
# User Service
location /api/users {
proxy_pass http://user_service/users;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# Product Service
location /api/products {
proxy_pass http://product_service/products;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# Order Service
location /api/orders {
proxy_pass http://order_service/orders;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# Health checks
location /health/user {
proxy_pass http://user_service/health;
}
location /health/product {
proxy_pass http://product_service/health;
}
location /health/order {
proxy_pass http://order_service/health;
}
}
---
## β οΈ Tantangan Microservices
> *"With great power comes great responsibility."*
| Challenge | Solusi |
|-----------|--------|
| **Distributed Tracing** | Jaeger, Zipkin, atau OpenTelemetry |
| **Logging Terpusat** | ELK Stack (Elasticsearch, Logstash, Kibana) |
| **Service Discovery** | Consul, Eureka, atau Kubernetes DNS |
| **Circuit Breaker** | Library seperti `tenacity` atau `pybreaker` |
| **Data Consistency** | Saga Pattern, Event Sourcing |
| **API Versioning** | URL versioning (`/v1/users`) atau header |
### Contoh Circuit Breaker
```python
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def call_user_service(user_id: int):
"""Retry otomatis jika User Service error"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://user-service:8000/users/{user_id}")
response.raise_for_status()
return response.json()
π Monitoring dengan Prometheus & Grafana
Tambahkan metrics endpoint di setiap service:
# app/main.py
from prometheus_fastapi_instrumentator import Instrumentator
app = FastAPI(title="User Service")
# Auto-instrument dengan Prometheus metrics
Instrumentator().instrument(app).expose(app)
π Kesimpulan
Microservices adalah arsitektur pilihan untuk sistem skala besar seperti Netflix, Uber, atau Tokopedia. Namun untuk startup tahap awal, Monolith seringkali lebih efisien.
Kapan Harus Pakai Microservices?
β Gunakan Microservices jika:
- Tim development > 10 orang
- Butuh scale bagian tertentu saja
- Services perlu teknologi berbeda
- Downtime satu fitur tidak boleh mempengaruhi fitur lain
β Tetap Pakai Monolith jika:
- Tim kecil (< 5 developer)
- Produk masih MVP/eksperimen
- Tidak ada kebutuhan scale berbeda
- Ingin development lebih cepat
Tips Migrasi
- Mulai dengan Monolith β kemudian pecah sesuai kebutuhan
- Strangler Fig Pattern β migrasi bertahap, bukan big bang
- Start with Database per Service β ini yang paling penting
- Invest in DevOps β CI/CD, monitoring, logging harus matang dulu
Edit tutorial ini
Gabung Komunitas Developer & Kreator Digital
Dapatkan teman coding, sharing project, networking dengan expert, dan update teknologi terbaru.
Selamat! Anda telah sukses mendaftar di newsletter.