17.2.2 Mirror Maker 2로 Active-Passive 구성하기
환경 준비
17.2.1 Kafka Cluster 만들기을 참고하여, Kafka 클러스터를 추가로 생성합니다.
- Kafka Cluster
- source-kafka-cluster
- dest-kafka-cluster
- Secret
- source-kafka-cluster-secret
- dest-kafka-cluster-secret
AKHQ 설정
-
설정파일
application.yml을 업데이트 합니다. dest-kafka-cluster도 추가합니다.akhq: ... connections: source-kafka-cluster: properties: bootstrap.servers: "<bootstrap url:9092>" security.protocol: SASL_SSL sasl.mechanism: SCRAM-SHA-512 sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>"; dest-kafka-cluster: properties: bootstrap.servers: "<bootstrap url:9092>" security.protocol: SASL_SSL sasl.mechanism: SCRAM-SHA-512 sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>"; ...
Mirror Maker 2 설정하기
-
Source -> Dest로 미러링하는 설정 파일(
mm2-source2dest.properties)을 생성합니다.- bootstrap url과 superuser 유저명과 패스워드는 각 클러스터에 맞게 변경합니다.
# specify any number of cluster aliases clusters = source, destination # connection information for each cluster # This is a comma separated host:port pairs for each cluster # for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts source.bootstrap.servers = <bootstrap url:9092> destination.bootstrap.servers = <bootstrap url:9092> #security setup source.security.protocol=SASL_SSL source.sasl.mechanism=SCRAM-SHA-512 source.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<username>" \ password="<password>"; destination.security.protocol=SASL_SSL destination.sasl.mechanism=SCRAM-SHA-512 destination.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<username>" \ password="<password>"; # enable and configure individual replication flows source->destination.enabled = true # regex which defines which topics gets replicated. For eg "foo-.*" source->destination.topics = foo-.* source->destination.groups = .* # Replication factor settings replication.factor = 3 checkpoints.topic.replication.factor = 3 heartbeats.topic.replication.factor = 3 offset-syncs.topic.replication.factor = 3 replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy refresh.topics.interval.seconds=60 refresh.groups.interval.seconds=60 sync.group.offsets.enabled=true emit.checkpoints.enabled=true sync.group.offsets.interval.seconds=10 emit.checkpoints.interval.seconds=10 acks=all -
시작 스크립트를 생성합니다.
start-mm2-source2dest.sh#!/bin/bash CONFIG_FILE="mm2-source2dest.properties" # Set JVM options for stability export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G" export KAFKA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/" # Start MM2 connect-mirror-maker.sh $CONFIG_FILE -
MirrorMaker2를 시작합니다.
./start-mm2-source2dest.sh -
source.properties,destination.properties파일을 각각 만듭니다.security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<user name from OCI Secret>" password="<password from OCI Secret>"; -
환경 변수를 설정합니다.
export SOURCE_BOOTSTRAP_URL=<bootstrap url:9092> export DEST_BOOTSTRAP_URL=<bootstrap url:9092> export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" -
Topic 복제 테스트를 위한 Source 클러스터에 Topic 생성하는 스크립트를 생성합니다.
# create-topic-source.sh TOPIC_NAME=$1 kafka-topics.sh --create --bootstrap-server $SOURCE_BOOTSTRAP_URL --partitions 1 --replication-factor 3 --topic $TOPIC_NAME --command-config source.properties -
Topic을 생성합니다.
$ ./create-topic-source.sh foo-topic-1 Created topic foo-topic-1. $ ./create-topic-source.sh foo-topic-2 Created topic foo-topic-2. -
dest 클러스터에서 Topic 목록을 조회합니다.
- 설정한
refresh.topics.interval.seconds값에 따라 해당 시간만큼 기다립니다. - foo-topic-1, foo-topic-2가 출처 alias가 추가되어
source.foo-topic-1,source.foo-topic-2로 복제된 것을 볼수 있습니다. - 나머지 Topic은 MirrorMaker2를 실행함에 따라 생성되었습니다.
$ kafka-topics.sh --list --bootstrap-server $DEST_BOOTSTRAP_URL --command-config destination.properties __consumer_offsets heartbeats mm2-configs.source.internal mm2-offset-syncs.source.internal mm2-offsets.source.internal mm2-status.source.internal source.checkpoints.internal source.foo-topic-1 source.foo-topic-2 source.heartbeats - 설정한
-
메세지 복제 테스트를 위한 스크립트를 생성합니다.
-
produce-msg-to-source.sh를 생성합니다.TOPIC_NAME=$1 kafka-console-producer.sh --bootstrap-server $SOURCE_BOOTSTRAP_URL --topic $TOPIC_NAME --producer.config source.properties --property parse.key=true --property key.separator=: -
consume-from-source.sh를 생성합니다.TOPIC_NAME=$1 kafka-console-consumer.sh --bootstrap-server $SOURCE_BOOTSTRAP_URL --topic $TOPIC_NAME --consumer.config source.properties --group my-consumer-group-1 -
consume-from-destination.sh를 생성합니다.TOPIC_NAME=$1 kafka-console-consumer.sh --bootstrap-server $DEST_BOOTSTRAP_URL --topic $TOPIC_NAME --consumer.config destination.properties --group my-consumer-group-1
-
-
[Source] foo-topic-1 - Consumer를 시작합니다.
$ ./consume-from-source.sh foo-topic-1 -
[Source] foo-topic-1 - Producer를 시작하고 메세지를 추가합니다.
$ ./produce-msg-to-source.sh foo-topic-1 >1:1 >2:2 > -
[Source] foo-topic-1 - Consumer를 확인합니다.
$ ./consume-from-source.sh foo-topic-1 1 2 -
Dest 클러스터에 my-consumer-group-1 생성될때까지 기다립니다.
kafka-consumer-groups.sh --bootstrap-server $DEST_BOOTSTRAP_URL --command-config destination.properties --list | grep my-consumer-group-1 -
오프셋 동기화 및 LAG을 확인하기 위해, 일단 Consumer를 종료합니다.
-
[Source] foo-topic-1 - Producer에서 메세지 3, 4를 추가합니다.
$ ./produce-msg-to-source.sh foo-topic-1 >1:1 >2:2 >3:3 >4:4 > -
AKHQ에서 Source 클러스터를 확인합니다. 총 4건 메시지에, Consumer 종료로 인해 Lag: 2로 표시됩니다.
-
Dest 클러스터를 확인합니다. 총 4건 메시지에, Consumer 종료로 인해 Lag: 2로 표시됩니다. 메시지, 오프셋 모두 동기화 되었습니다. 다만, 설정한 Interval 값이 따른 실 동기화까지 약간의 시간이 걸립니다.
-
장애 상황을 가정하여, Consumer, Producer, MirrorMaker2 모두 종료합니다.
Mirror Maker 2 설정하기 - Fail Over
-
[Dest] source.foo-topic-1 - Consumer를 시작합니다. Source Consumer가 읽은 이후 메시지 2건이 수신됩니다.
$ ./consume-from-destination.sh source.foo-topic-1 3 4 -
AKHQ에서 Dest 클러스터를 확인합니다. 나머지 2건을 이어서 수신한 이후 현재는 Lag:0 표시되어 수신된 메시지를 모두 읽은 상태입니다.
이 글은 개인으로서, 개인의 시간을 할애하여 작성된 글입니다. 글의 내용에 오류가 있을 수 있으며, 글 속의 의견은 개인적인 의견입니다.