QBoard » Big Data » Big Data - Data Ingestion Tools : Sqoop, Flume, Kafka, Nifi.. » Rebalancing issue while reading messages in Kafka

Rebalancing issue while reading messages in Kafka

  • I am trying to read messages on Kafka topic, but I am unable to read it. The process gets killed after sometime, without reading any messages.

    Here is the rebalancing error which I get:

    [2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
    kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
        at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
        at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
        at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
        at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
        at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
    Consumed 0 messages

    I tried to run ConsumerOffsetChecker, and this is the error which I get. I have no clue whatsoever, how to resolve this. Anybody, any idea?

    ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
    Group           Topic                          Pid Offset          logSize         Lag             Owner
    Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
            at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
            at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
            at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
            at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
            at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
            at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
            at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
            at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
            at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
            at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
            at scala.collection.immutable.List.foreach(List.scala:45)
            at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
            at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
            at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
            at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
            at scala.collection.immutable.List.foreach(List.scala:45)
            at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
            at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
    Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
            at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
            at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
            at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
            at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
            at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
            at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
            at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
            at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
            ... 16 more

     

      December 30, 2020 1:30 PM IST
    0
  • There is a bug in kafka.tools.ConsumerOffsetChecker. If the a particular Zookeeper node holding consumed offset information doesn't exit, the tool exits throwing the execption.

    For example, suppose you have a consumer group "mygroup" and a topic "topictest". Then the offset for partition 2 is maintained in Znode: /consumers/mygroup/offsets/topictest/2.

    If there is no entry for partition 2 of topic topictest in Znode, then consumer offsetchecker tool will exit while checking offset for partition 2. Basically, it will fail while checking the first partition "n" for which the Znode /consumers/mygroup/offsets/topictest/n is missing on Zookeeper.

      January 6, 2022 1:00 PM IST
    0
  • rebalance.backoff.ms defines the time for which Kafka will wait before rebalancing & zookeeper.session.timeout.ms parameter specifies the time for which Kafka will wait to connect to zookeeper. So, try adjusting these two parameters.

    Hope this helps!

      August 13, 2021 1:02 PM IST
    0