Skip to main content

Kafka MirrorMaker in Kafka 0.10.0.1+

Check MirrorMaker.scala for more details.

Target cluster setup

  1. Download and install Kafka (target cluster). Select appropriate version and download its tgz from Kafka Downloads page.
  2. tar -zxf kafka_2.11-0.10.0.1.tgz
    cd kafka_2.11-0.10.0.1
    
  3. Configure Target Kafka cluster's ZooKeeper
  4. vim ./config/zookeeper.properties
    
    # the directory where the snapshot is stored.
    dataDir=/work/kafka_2.11-0.10.0.1/zookeeper-data
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on thseparatedof connections since this is a non-production config
    maxClientCnxns=0
    
  5. Start Target Kafka cluster's ZooKeeper
  6. ./bin/zookeeper-server-start.sh config/zookeeper.properties
    
  7. Configure Target Kafka cluster's Server
  8. vim ./config/server.properties
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    # The number of threads handling network requests
    num.network.threads=3
    # The number of threads doing disk I/O
    num.io.threads=8
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    # A comma separated list of directories under which to store log files
    log.dirs=/work/kafka_2.11-0.10.0.1/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=2
    log.retention.bytes=1073741824
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    # The interval at which log segments are checked to see if they can be deleted according to the retention policies
    log.retention.check.interval.ms=300000
    ############################# Zookeeper #############################
    zookeeper.connect=localhost:2181
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    log.cleaner.enable=true
    delete.topic.enable=true
    controlled.shutdown.enable=true
    fetch.message.max.bytes=20000000
    replica.fetch.max.bytes=20000000
    message.max.bytes=20000000
    unclean.leader.election.enable=true
    
    ########################## SSL settings (optional) ###############################
    listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/server.keystore.jks
    ssl.keystore.password=changeit
    ssl.key.password=changeit
    ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/server.truststore.jks
    ssl.truststore.password=changeit
    ssl.client.auth=required
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.secure.random.implementation=SHA1PRNG
    ssl.enabled.protocols=TLSv1.2
    security.inter.broker.protocol=SSL
    
  9. Start Target Kafka cluster's Server
  10. ./bin/kafka-server-start.sh ./config/server.properties
    
  11. Create topic in target cluster
  12. ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
    ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --config 'retention.ms=3600000' --topic mirrored_topic
    
  13. Start listening to the topic in target cluster using Console Consumer
  14. ./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mirrored_topic
    

Kafka MirrorMaker

Configure Kafka MirrorMaker

  1. Source cluster consumer config
  2. bootstrap.servers=sourceKafka01:9093,sourceKafka02:9093                               # use plaintext if source Kafka cluster is not using SSL
    client.id=mirror-maker-client-01
    group.id=dp-mirror-maker-group-01
    exclude.internal.topics=true
    auto.offset.reset=earliest
    enable.auto.commit=true
    security.protocol=SSL                                                                  # ignore if source Kafka cluster is not using SSL
    ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterTrustStore.jks # ignore if source Kafka cluster is not using SSL
    ssl.truststore.password=changeit                                                       # ignore if source Kafka cluster is not using SSL
    ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterKeyStore.jks     # The location of the key store file. This is optional for client and can be used for two-way authentication for client.
    ssl.keystore.password=changeit                                                         # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
    ssl.key.password=changeit                                                              # The password of the private key in the key store file. This is optional for client.
    
  3. Target cluster producer config
  4. bootstrap.servers=localhost:9092
    acks=all
    batch.size=1024
    linger.ms=500
    client.id=mirror_maker_producer
    
  5. Run Kafka MirrorMaker
  6. ./bin/kafka-run-class.sh kafka.tools.MirrorMaker --new.consumer --consumer.config config/sourceClusterConsumer.conf --num.streams 2 --producer.config config/targetClusterProducer.conf --whitelist="mirrored_topic"
    
  7. Check progress
  8. # List all consumer groups
    ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --list
    # Describe consumer group and list offset lag related to given group (at source Kafka cluster):
    ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --describe --group mirror-maker-group-01
    

Comments

Popular posts from this blog

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem:
New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf).
Solution is as follows:
Right click on "My Computer".Select "Properties".Go to "Advanced" tab.Click on "Environment Variables".Delete "HOME" variable from User / System variables.

Procedure for name and date of birth change (Pune)

For change of name, the form (scribd) is available free of cost at Government Book Depot (Shaskiya Granthagar), which is located near Collector’s office, next to Saint Helena's School. The postal address is:
Government Photozinco Press Premises and Book Depot,
5, Photozinco Press Road, Pune, MH, 411001.
Wikimapia link

Charges for name or date of birth change, in the Maharashtra Government Gazette:
INR 120.00 per insertion (for two copies of the Gazette)
For backward class applicants: INR 60.00
Charges for extra copy of the Gazette: INR 15.00 per copy (two copies are enough, so you may not want to pay extra for extra copies).

Backward class applicants are required to submit a xerox of caste certificate of old name as issued by the Collector of the District concerned.

Once the form is duly submitted, it normally takes 10 to 15 days for publication of advertisement in the Maharashtra Government Gazette. The Gazette copy reaches to the address filled in the form within next 7 to 15 day…

Setting up ELK 5.x (ElasticSearch, Logstash, Kibana) cluster

We recently upgraded from ElasticSearch 2.4.3 to 5.6.2.
Below are the steps that we used to install and configure new ELK cluster.

ElasticSearchInstallationmkdir -p /work/elk/data/data-es562 mkdir -p /work/elk/data/logs-es562 mkdir -p /work/elk/data/repo-es562 cd /work/elk/ curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.2.tar.gz tar -zxvf elasticsearch-5.6.2.tar.gz curl -O https://artifacts.elastic.co/downloads/packs/x-pack/x-pack-5.6.2.zip cd /work/elk/elasticsearch-5.6.2 ./bin/elasticsearch-plugin install file:///work/elk/x-pack-5.6.2.zip
Configuration
Settings for Master + Ingest node in elasticsearch.ymlWe have reused master nodes as ingest nodes, because we don't have any heavy ingest pipelines, and x-pack monitoring requires at-least one ingest node to be present in the cluster.
cluster.name: ESDev562 node.name: "master_01" node.master: true # Enable the node.master role (enabled by default). node.data: false # D…