我大约是把kafka消费不到数据的特殊情况都经历了一遍了吧= =、

kafka消费不到数据的原因,首先检查配置之类的,如是否设置了group.id,对应的topic是否正确等等,这些不多说。

下面是我遇到的几种kafka消费不到数据的情况:

1.多分区,单例消费者的情况,只消费到一个分区,应多加几个消费者,不能用单例,直接subscribe的话,rebalance机制启动,手动的话如下

consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), Offset.Stored) });

 

2.长时间不消费导致 log.retention.hours或者 log.retention.minutes超时,清除log,Offset.Stored失效

解决办法一:

consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), new Offset(index)) });

此处的index为该分区当前的offset,要自己做存储然后手动配置,可测试用。

解决办法二:见问题三,同样解决方式 但是会从头开始消费新进来的数据

 

3.我一次加数据太多导致磁盘耗尽,kafka管理员帮我改到20G内存,但是仍然有一部分数据超出,分区offset靠前的数据被清除,导致再次消费不到。清除掉的数据无法再次被消费,但是还保存的数据可以消费到

解决办法:

consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), Offset.Beginning) });

或者在配置中加

auto.offset.reset=smallest //.NET 默认是largest
auto.offset.reset=earliest//Java 默认是latest

 

关于该配置的测试,请看下面的链接

http://blog.csdn.net/lishuangzhe7047/article/details/74530417

版权声明:本文为sylvialucy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/sylvialucy/p/7827044.html