Featured

Kafka MirrorMaker in Kafka 0.10.0.1+

Check MirrorMaker.scala for more details.

Target cluster setup

  1. Download and install Kafka (target cluster). Select appropriate version and download its tgz from Kafka Downloads page.
  2. tar -zxf kafka_2.11-0.10.0.1.tgz
    cd kafka_2.11-0.10.0.1
    
  3. Configure Target Kafka cluster's ZooKeeper
  4. vim ./config/zookeeper.properties
    
    # the directory where the snapshot is stored.
    dataDir=/work/kafka_2.11-0.10.0.1/zookeeper-data
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on thseparatedof connections since this is a non-production config
    maxClientCnxns=0
    
  5. Start Target Kafka cluster's ZooKeeper
  6. ./bin/zookeeper-server-start.sh config/zookeeper.properties
    
  7. Configure Target Kafka cluster's Server
  8. vim ./config/server.properties
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    # The number of threads handling network requests
    num.network.threads=3
    # The number of threads doing disk I/O
    num.io.threads=8
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    # A comma separated list of directories under which to store log files
    log.dirs=/work/kafka_2.11-0.10.0.1/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=2
    log.retention.bytes=1073741824
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    # The interval at which log segments are checked to see if they can be deleted according to the retention policies
    log.retention.check.interval.ms=300000
    ############################# Zookeeper #############################
    zookeeper.connect=localhost:2181
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    log.cleaner.enable=true
    delete.topic.enable=true
    controlled.shutdown.enable=true
    fetch.message.max.bytes=20000000
    replica.fetch.max.bytes=20000000
    message.max.bytes=20000000
    unclean.leader.election.enable=true
    
    ########################## SSL settings (optional) ###############################
    listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/server.keystore.jks
    ssl.keystore.password=changeit
    ssl.key.password=changeit
    ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/server.truststore.jks
    ssl.truststore.password=changeit
    ssl.client.auth=required
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.secure.random.implementation=SHA1PRNG
    ssl.enabled.protocols=TLSv1.2
    security.inter.broker.protocol=SSL
    
  9. Start Target Kafka cluster's Server
  10. ./bin/kafka-server-start.sh ./config/server.properties
    
  11. Create topic in target cluster
  12. ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
    ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --config 'retention.ms=3600000' --topic mirrored_topic
    
  13. Start listening to the topic in target cluster using Console Consumer
  14. ./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mirrored_topic
    

Kafka MirrorMaker

Configure Kafka MirrorMaker

  1. Source cluster consumer config
  2. bootstrap.servers=sourceKafka01:9093,sourceKafka02:9093                               # use plaintext if source Kafka cluster is not using SSL
    client.id=mirror-maker-client-01
    group.id=dp-mirror-maker-group-01
    exclude.internal.topics=true
    auto.offset.reset=earliest
    enable.auto.commit=true
    security.protocol=SSL                                                                  # ignore if source Kafka cluster is not using SSL
    ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterTrustStore.jks # ignore if source Kafka cluster is not using SSL
    ssl.truststore.password=changeit                                                       # ignore if source Kafka cluster is not using SSL
    ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterKeyStore.jks     # The location of the key store file. This is optional for client and can be used for two-way authentication for client.
    ssl.keystore.password=changeit                                                         # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
    ssl.key.password=changeit                                                              # The password of the private key in the key store file. This is optional for client.
    
  3. Target cluster producer config
  4. bootstrap.servers=localhost:9092
    acks=all
    batch.size=1024
    linger.ms=500
    client.id=mirror_maker_producer
    
  5. Run Kafka MirrorMaker
  6. ./bin/kafka-run-class.sh kafka.tools.MirrorMaker --new.consumer --consumer.config config/sourceClusterConsumer.conf --num.streams 2 --producer.config config/targetClusterProducer.conf --whitelist="mirrored_topic"
    
  7. Check progress
  8. # List all consumer groups
    ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --list
    # Describe consumer group and list offset lag related to given group (at source Kafka cluster):
    ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --describe --group mirror-maker-group-01
    

Comments