Kafka เป็น Platform ที่พัฒนาขึ้นมาเพื่อการจัดการ event streaming ที่มีประสิทธิภาพสูง นิยมใช้กันมาก ในระบบขนาดใหญ่ แนวทางการ deploy platform มีหลากหลายรูปแบบ บทความนี้เรามาลองสร้าง Kafka platform บน Kubernetes ในรูปแบบ HA ( High Availability)
เราจะใช้ kind ในการจำลอง Kubernetes บน Docker รายละเอียดตามนี้เลย platform ได้เลย https://kind.sigs.k8s.io/docs/user/quick-start/#installation
kind create cluster
จากนั้นสร้างไฟล์ maifest.yaml ที่ใช้เก็บ service และ statefulset
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
spec:
clusterIP: None
ports:
- name: "9092"
port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
containers:
- name: kafka-container
image: doughgle/kafka-kraft
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: REPLICAS
value: "3"
- name: SERVICE
value: kafka-svc
- name: NAMESPACE
value: default
- name: SHARE_DIR
value: /mnt/kafka
- name: CLUSTER_ID
value: RyVvduGDm95gSjxNq
- name: DEFAULT_REPLICATION_FACTOR
value: "3"
- name: DEFAULT_MIN_INSYNC_REPLICAS
value: "2"
volumeMounts:
- name: data
mountPath: /mnt/kafka
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: "300Mi"
เสร็จ apply สร้าง object ตามที่เราต้องการ
❯ kubectl apply -f manifest.yaml && kubectl get pods
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 2m7s
kafka-1 1/1 Running 0 105s
kafka-2 1/1 Running 0 97s
statefulset kafka มีอะไรบ้าง
- replicas: 3 ควรเป็นจำนวนคี่ สามารถ support เรื่อง HA
- DEFAULT_REPLICATION_FACTOR เท่ากับ 3 คือค่าที่กำหนดการทำ replication ออกเป็น 3 ชุด คือ 3 broker
- DEFAULT_MIN_INSYNC_REPLICAS เท่ากับ 2 ต้องมี broker อย่างน้อย 2 ตัวที่ sync กันอยู่
- 9092 เปิดสำหรับ client ที่เป็น producers และ consumers
- 9093 สำหรับการติดต่อภายในระหว่าง Broker
ลองสร้าง event เพื่อทดสอบการทำงาน
เราต้องมี kafka client เพื่อเชื่อมต่อกัย broker ทั้ง 3 ตัว ของเราก่อน โดยสั่งรัน
kubectl run kafka-client --rm -ti --image bitnami/kafka:3.1.0 -- bash
ls เพื่อดู script ที่เราสามารถใช้งานได้
ls /opt/bitnami/kafka/bin
connect-distributed.sh
connect-mirror-maker.sh
connect-standalone.sh
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-cluster.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
ลองสร้าง topic ทดสอบ
ใช้ kafka-console-producer.sh เพื่อสร้าง topic ชื่อ test เมื่อ prompt แสดงขึ้นมาให้พิมพ์คำว่า kafka HA ส่งเป็น event
- –request-required-acks all หมายถึงต้องมี commited จากทุก broker
kafka-console-producer.sh \
--topic test \
--request-required-acks all \
--bootstrap-server kafka-0.kafka-svc.default.svc.cluster.local:9092, \
kafka-1.kafka-svc.default.svc.cluster.local:9092, \
kafka-2.kafka-svc.default.svc.cluster.local:9092 <กด Enter>
> kafka HA <กด Enter>
สร้าง consumer รอรับ event ที่เราเพิ่งสร้างไป
เราจะใช้ kafka-console-consumer.sh โดยเลือก topic เป็น test จะได้ event message ตามที่เราสร้างไว้ก่อนหน้านี้
/$ kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server kafka-0.kafka-svc.default.svc.cluster.local:9092,\
kafka-1.kafka-svc.default.svc.cluster.local:9092,\
kafka-2.kafka-svc.default.svc.cluster.local:9092 <กด Enter>
kafka HA [กด Ctrl+C]
Processed a total of 1 messages
ดูรายละเอียด topic test กันหน่อย
/$ kafka-topics.sh --describe \
--topic test \
--bootstrap-server kafka-0.kafka-svc.default.svc.cluster.local:9092,\
kafka-1.kafka-svc.default.svc.cluster.local:9092,\
kafka-2.kafka-svc.default.svc.cluster.local:9092
Topic: test TopicId: Zd46NMLkR8OlfLFPmIUOYQ PartitionCount: 1
ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
ทดสอบ HA โดยการลบบาง broker ออกจาก cluster
แก้ไข statefulset ของเราให้เลือกแค่ 2 replicas ( kafka-0, kafka-1)
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 2 # use only kafka-0 and kafa-1 terminate kafka-2
selector:
matchLabels:
app: kafka-app
แล้วสั่งเปลี่ยน config
kubectl apply -f manifest.yaml
แล้วใช้ get pod เพื่อความชัวร์ว่า เหลือ broker แค่ 2 ตัว
> kubectl get pods
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 30m7s
kafka-1 1/1 Running 0 30m7s
เราจะสร้าง event อีกหนึ่ง message ไปให้ broker ทั้งสองตัว
/$ kafka-console-producer.sh \
--topic test \
--request-required-acks all \
--bootstrap-server kafka-0.kafka-svc.default.svc.cluster.local:9092,\
kafka-1.kafka-svc.default.svc.cluster.local:9092 <กด Enter>
> kafka after 1 unavailable broker <กด Enter>
ทดสอบว่า event เราส่งได้เรียบร้อยไหม
/$ kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server kafka-0.kafka-svc.default.svc.cluster.local:9092,\
kafka-1.kafka-svc.default.svc.cluster.local:9092 <กด Enter>
kafka HA
kafka after 1 unavailable broker [กด Ctrl+C]
Processed a total of 2 messages
kafka-1 1/1 Running 0 30m7s
เอา kafka-2 ของเรากลับมาอีกครั้ง
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
spec:
serviceName: kafka-svc
replicas: 3 # เพิ่ม replicas เป็น 3 เหมือนเดิม
matchLabels:
app: kafka-app
kubectl apply -f manifest.yaml
รอจนกว่า kafka-2 ของเรากลับมา Running อีกครั้ง
> kubectl get pods
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 1 35m7s
kafka-1 1/1 Running 1 35m7s
kafka-2 1/1 Running 0 7s
ทดสอบการ sync event ระหว่าง broker
โดยใช้ kafka-console-consumer.sh แล้วกำหนด bootstrap-server เป็น kafka-2 broker แค่ตัวเดียว
/$ kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server kafka-2.kafka-svc.default.svc.cluster.local:9092
kafka HA
kafka after 1 unavailable broker [กด Ctrl+C]
Processed a total of 2 messages
จะเห็นว่า event ได้มีการ sync มาที่ kafka-2 เรียบร้อยแล้ว