Skip to main content

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance:
  • Consumer group ID: Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets, hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name.
  • Skewed: A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed.
  • Spread: Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread.
  • Leader skewedThe cluster is in leader skewed state when a node is leader for more partitions than the # of partitions/# of brokers. Change broker config to auto.leader.rebalance.enable=true to enable auto-rebalancing of leader (from ISR) every 5 mins. You can also click on button "Preferred replica election" to elect another replica as leader automatically for all topics in a cluster.
  • # of partitions: The first thing to understand is that a topic partition is the unit of parallelism in Kafka. On both the producer and the broker side, writes to different partitions can be done fully in parallel. So expensive operations such as compression can utilize more hardware resources. On the consumer side, Kafka always gives a single partition’s data to one consumer thread. Thus, the degree of parallelism in the consumer (within a consumer group) is bounded by the number of partitions being consumed. Therefore, in general, the more partitions there are in a Kafka cluster, the higher the throughput one can achieve.
    A rough formula for picking the number of partitions is based on throughput. You measure the throughout that you can achieve on a single partition for production (call it p) and consumption (call it c). Let’s say your target throughput is t. Then you need to have at least max(t/p, t/c) partitions.
  • More Partitions Requires More Open File Handles: Each partition maps to a directory in the file system in the broker. Within that log directory, there will be two files (one for the index and another for the actual data) per log segment. Currently, in Kafka, each broker opens a file handle of both the index and the data file of every log segment. So, the more partitions, the higher that one needs to configure the open file handle limit in the underlying operating system. This is mostly just a configuration issue. We have seen production Kafka clusters running with more than 30 thousand open file handles per broker.
  • Limit the number of partitions per broker to 2000 - 4000 and the total number of partitions in the cluster to low tens of thousands.
  • Controller: Kafka supports intra-cluster replication, which provides higher availability and durability. A partition can have multiple replicas, each stored on a different broker. One of the replicas is designated as the leader and the rest of the replicas are followers. Internally, Kafka manages all those replicas automatically and makes sure that they are kept in sync. Both the producer and the consumer requests to a partition are served on the leader replica. When a broker fails, partitions with a leader on that broker become temporarily unavailable. Kafka will automatically move the leader of those unavailable partitions to some other replicas to continue serving the client requests. This process is done by one of the Kafka brokers designated as the controller. It involves reading and writing some metadata for each affected partition in ZooKeeper. Currently, operations to ZooKeeper are done serially in the controller.
    The more the number of partitions on unavailable node, more time will be taken by controller to pick up new leader.
  • More Partitions May Increase End-to-end Latency: The end-to-end latency in Kafka is defined by the time from when a message is published by the producer to when the message is read by the consumer. Kafka only exposes a message to a consumer after it has been committed, i.e., when the message is replicated to all the in-sync replicas. So, the time to commit a message can be a significant portion of the end-to-end latency. By default, a Kafka broker only uses a single thread to replicate data from another broker, for all partitions that share replicas between the two brokers. Our experiments show that replicating 1000 partitions from one broker to another can add about 20 ms latency, which implies that the end-to-end latency is at least 20 ms. This can be too high for some real-time applications.

    Note that this issue is alleviated on a larger cluster. For example, suppose that there are 1000 partition leaders on a broker and there are 10 other brokers in the same Kafka cluster. Each of the remaining 10 brokers only needs to fetch 100 partitions from the first broker on average. Therefore, the added latency due to committing a message will be just a few ms, instead of tens of ms.

    As a rule of thumb, if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.
  • More Partitions May Require More Memory In the Client: If one increases the number of partitions, message will be accumulated in more partitions in the producer. The aggregate amount of memory used may now exceed the configured memory limit. When this happens, the producer has to either block or drop any new message, neither of which is ideal. To prevent this from happening, one will need to reconfigure the producer with a larger memory size.
    As a rule of thumb, to achieve good throughput, one should allocate at least a few tens of KB per partition being produced in the producer and adjust the total amount of memory if the number of partitions increases significantly.
  • Zookeeper: Post Kafka-0.8, Zookeeper is only used for the brokers management (failures, discovery), not for the offsets management. Though for legacy systems, Kafka can dual-writes the offsets into Zookeeper and Kafka’s __consumer_offsets (see dual.commit.enabled=true and offsets.storage=kafka)

Comments

Mike said…
Thanks for sharing this information. I really like your blog post very much and this article very helpful for me. thank you again for sharing
Compare Gold Brokers

Popular posts from this blog

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem: New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf). Solution is as follows: Right click on "My Computer". Select "Properties". Go to "Advanced" tab. Click on "Environment Variables". Delete "HOME" variable from User / System variables.

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above