TheKoguryo's 기술 블로그

 Version 2025-11-19

17.2.2 Mirror Maker 2로 Active-Passive 구성하기

환경 준비

17.2.1 Kafka Cluster 만들기을 참고하여, Kafka 클러스터를 추가로 생성합니다.

  • Kafka Cluster
    • source-kafka-cluster
    • dest-kafka-cluster
  • Secret
    • source-kafka-cluster-secret
    • dest-kafka-cluster-secret

AKHQ 설정

  1. 설정파일 application.yml을 업데이트 합니다. dest-kafka-cluster도 추가합니다.

    akhq:
      ...
      connections:
        source-kafka-cluster:
          properties:
            bootstrap.servers: "<bootstrap url:9092>"
            security.protocol: SASL_SSL
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";
        dest-kafka-cluster:
          properties:
            bootstrap.servers: "<bootstrap url:9092>"
            security.protocol: SASL_SSL
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";        
      ...
    
Mirror Maker 2 설정하기
  1. Source -> Dest로 미러링하는 설정 파일(mm2-source2dest.properties)을 생성합니다.

    1. bootstrap url과 superuser 유저명과 패스워드는 각 클러스터에 맞게 변경합니다.
    # specify any number of cluster aliases
    clusters = source, destination
    
    # connection information for each cluster
    # This is a comma separated host:port pairs for each cluster
    # for example. "A_host1:9092, A_host2:9092, A_host3:9092"  and you can see the exact host name on Ambari > Hosts
    source.bootstrap.servers = <bootstrap url:9092>
    destination.bootstrap.servers = <bootstrap url:9092>
    
    #security setup
    source.security.protocol=SASL_SSL
    source.sasl.mechanism=SCRAM-SHA-512
    source.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="<username>" \
      password="<password>";
    
    destination.security.protocol=SASL_SSL
    destination.sasl.mechanism=SCRAM-SHA-512
    destination.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="<username>" \
      password="<password>";
    
    # enable and configure individual replication flows
    source->destination.enabled = true
    
    # regex which defines which topics gets replicated. For eg "foo-.*"
    source->destination.topics = foo-.*
    source->destination.groups = .*
    
    # Replication factor settings
    replication.factor = 3
    checkpoints.topic.replication.factor = 3
    heartbeats.topic.replication.factor = 3
    offset-syncs.topic.replication.factor = 3
    
    replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    
    refresh.topics.interval.seconds=60
    refresh.groups.interval.seconds=60
    sync.group.offsets.enabled=true
    emit.checkpoints.enabled=true
    sync.group.offsets.interval.seconds=10
    emit.checkpoints.interval.seconds=10
    
    acks=all
    
  2. 시작 스크립트를 생성합니다. start-mm2-source2dest.sh

    #!/bin/bash
    
    CONFIG_FILE="mm2-source2dest.properties"
    
    # Set JVM options for stability
    export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
    export KAFKA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/"
    
    # Start MM2
    connect-mirror-maker.sh $CONFIG_FILE
    
  3. MirrorMaker2를 시작합니다.

    ./start-mm2-source2dest.sh
    
  4. source.properties, destination.properties 파일을 각각 만듭니다.

    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<user name from OCI Secret>" password="<password from OCI Secret>";
    
  5. 환경 변수를 설정합니다.

    export SOURCE_BOOTSTRAP_URL=<bootstrap url:9092>
    export DEST_BOOTSTRAP_URL=<bootstrap url:9092>
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    
  6. Topic 복제 테스트를 위한 Source 클러스터에 Topic 생성하는 스크립트를 생성합니다.

    # create-topic-source.sh
    TOPIC_NAME=$1
    
    kafka-topics.sh --create --bootstrap-server $SOURCE_BOOTSTRAP_URL --partitions 1 --replication-factor 3 --topic $TOPIC_NAME --command-config source.properties
    
  7. Topic을 생성합니다.

    $ ./create-topic-source.sh foo-topic-1
    Created topic foo-topic-1.
    $ ./create-topic-source.sh foo-topic-2
    Created topic foo-topic-2.
    
  8. dest 클러스터에서 Topic 목록을 조회합니다.

    • 설정한 refresh.topics.interval.seconds 값에 따라 해당 시간만큼 기다립니다.
    • foo-topic-1, foo-topic-2가 출처 alias가 추가되어 source.foo-topic-1, source.foo-topic-2로 복제된 것을 볼수 있습니다.
    • 나머지 Topic은 MirrorMaker2를 실행함에 따라 생성되었습니다.
    $ kafka-topics.sh --list --bootstrap-server $DEST_BOOTSTRAP_URL --command-config destination.properties
    __consumer_offsets
    heartbeats
    mm2-configs.source.internal
    mm2-offset-syncs.source.internal
    mm2-offsets.source.internal
    mm2-status.source.internal
    source.checkpoints.internal
    source.foo-topic-1
    source.foo-topic-2
    source.heartbeats
    
  9. 메세지 복제 테스트를 위한 스크립트를 생성합니다.

    1. produce-msg-to-source.sh를 생성합니다.

      TOPIC_NAME=$1
      
      kafka-console-producer.sh --bootstrap-server $SOURCE_BOOTSTRAP_URL --topic $TOPIC_NAME --producer.config source.properties --property parse.key=true --property key.separator=:
      
    2. consume-from-source.sh를 생성합니다.

      TOPIC_NAME=$1
      
      kafka-console-consumer.sh --bootstrap-server $SOURCE_BOOTSTRAP_URL --topic $TOPIC_NAME --consumer.config source.properties --group my-consumer-group-1
      
    3. consume-from-destination.sh를 생성합니다.

      TOPIC_NAME=$1
      
      kafka-console-consumer.sh --bootstrap-server $DEST_BOOTSTRAP_URL --topic $TOPIC_NAME --consumer.config destination.properties --group my-consumer-group-1
      
  10. [Source] foo-topic-1 - Consumer를 시작합니다.

    $ ./consume-from-source.sh foo-topic-1
    
  11. [Source] foo-topic-1 - Producer를 시작하고 메세지를 추가합니다.

    $ ./produce-msg-to-source.sh foo-topic-1
    >1:1
    >2:2
    >
    
  12. [Source] foo-topic-1 - Consumer를 확인합니다.

    $ ./consume-from-source.sh foo-topic-1
    1
    2
    
  13. Dest 클러스터에 my-consumer-group-1 생성될때까지 기다립니다.

    kafka-consumer-groups.sh --bootstrap-server $DEST_BOOTSTRAP_URL --command-config destination.properties --list | grep my-consumer-group-1
    
  14. 오프셋 동기화 및 LAG을 확인하기 위해, 일단 Consumer를 종료합니다.

  15. [Source] foo-topic-1 - Producer에서 메세지 3, 4를 추가합니다.

    $ ./produce-msg-to-source.sh foo-topic-1
    >1:1
    >2:2
    >3:3
    >4:4
    >
    
  16. AKHQ에서 Source 클러스터를 확인합니다. 총 4건 메시지에, Consumer 종료로 인해 Lag: 2로 표시됩니다.

    image-20251119145844829

  17. Dest 클러스터를 확인합니다. 총 4건 메시지에, Consumer 종료로 인해 Lag: 2로 표시됩니다. 메시지, 오프셋 모두 동기화 되었습니다. 다만, 설정한 Interval 값이 따른 실 동기화까지 약간의 시간이 걸립니다.

    image-20251119145905157

  18. 장애 상황을 가정하여, Consumer, Producer, MirrorMaker2 모두 종료합니다.

Mirror Maker 2 설정하기 - Fail Over
  1. [Dest] source.foo-topic-1 - Consumer를 시작합니다. Source Consumer가 읽은 이후 메시지 2건이 수신됩니다.

    $ ./consume-from-destination.sh source.foo-topic-1
    3
    4
    
  2. AKHQ에서 Dest 클러스터를 확인합니다. 나머지 2건을 이어서 수신한 이후 현재는 Lag:0 표시되어 수신된 메시지를 모두 읽은 상태입니다.

    image-20251119145939481



이 글은 개인으로서, 개인의 시간을 할애하여 작성된 글입니다. 글의 내용에 오류가 있을 수 있으며, 글 속의 의견은 개인적인 의견입니다.

Last updated on 19 Nov 2025