你的位置:首页 > 数据库

[数据库]kafkaspot在ack机制下如何保证内存不溢


新浪微博:intsmaze刘洋洋哥。

 
storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送;如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送。
 
但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发,那么我们是会在我们的spout类里面定义一个map集合,并以msgId作为key。
public class MySpout extends BaseRichSpout {  private static final long serialVersionUID = 5028304756439810609L;  // key:messageId,Data  private HashMap<String, String> waitAck = new HashMap<String, String>();  private SpoutOutputCollector collector;  public void declareOutputFields(OutputFieldsDeclarer declarer) {    declarer.declare(new Fields("sentence"));  }  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {    this.collector = collector;  }  public void nextTuple() {    String sentence = "the cow jumped over the moon";    String messageId = UUID.randomUUID().toString().replaceAll("-", "");    waitAck.put(messageId, sentence);    //指定messageId,开启ackfail机制    collector.emit(new Values(sentence), messageId);  }  @Override  public void ack(Object msgId) {    System.out.println("消息处理成功:" + msgId);    System.out.println("删除缓存中的数据...");    waitAck.remove(msgId);  }  @Override  public void fail(Object msgId) {    System.out.println("消息处理失败:" + msgId);    System.out.println("重新发送失败的信息...");    //重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的,而且下游    collector.emit(new Values(waitAck.get(msgId)),msgId);  }}

 
那么kafkaspout会不会也是这样还保存这已发送未收到bolt响应的消息呢?如果这样,如果消息处理不断失败,不断重发,消息不断积累在kafkaspout节点上,kafkaspout端会不就会出现内存溢出?
 
其实并没有,回想kafka的原理,Kafka会为每一个consumergroup保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。也就是说,kafkaspot在消费kafka的数据是,通过offset读取到消息并发送给bolt后,kafkaspot只是保存者当前的offset值。
当失败或成功根据msgId查询offset值,然后再去kafka消费该数据来确保消息的重新发送。
 
那么虽然offset数据小,但是当offset的数据量上去了还是会内存溢出的?
其实并没有,kafkaspout发现缓存的数据超过限制了,会把某端的数据清理掉的。
 
 
kafkaspot中发送数据的代码
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));


可以看到msgID里面包装了offset参数。
它不缓存已经发送出去的数据信息。
 
当他接收到来至bolt的响应后,会从接收到的msgId中得到offset。以下是从源码中折取的关键代码:
public void ack(Object msgId) {   KafkaMessageId id = (KafkaMessageId) msgId;   PartitionManager m = _coordinator.getManager(id.partition);   if (m != null) {     m.ack(id.offset);   } } m.ack(id.offset); public void ack(Long offset) {   _pending.remove(offset);//处理成功移除offset   numberAcked++; }

public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } m.fail(id.offset); public void fail(Long offset) {     failed.add(offset);//处理失败添加offset numberFailed++; } SortedSet<Long> _pending = new TreeSet<Long>(); SortedSet<Long> failed = new TreeSet<Long>();

 

关于kafkaspot的源码解析大家可以看这边博客:http://www.cnblogs.com/cruze/p/4241181.html

源码解析中涉及了很多kafka的概念,所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的,如果不理解kafka概念,那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。






深圳免费旅游景点大全深圳市周边旅游景点深圳旅游哪家旅行社好深圳一日游攻略大全深圳游玩攻略大全2015马降龙碉楼三八妇女节门票免费吗?开平马降龙碉楼3.8女士优惠价格? 2015赤坎古镇三八妇女节女士免费吗?开平赤坎影视城3.8女士门票优惠活动? 2015赤坎古镇三八妇女节门票优惠活动?开平赤坎影视城3.8女士免费吗? 福州桃花朵朵开 依傍枝头等君来 西安十大“约会胜地”推荐 电影院 有“沟”必火 看完会哭的最美秋色 西安十大“约会胜地”推荐 西安高校 西安十大“约会胜地”推荐 大唐芙蓉园 韶关市动物园地址?韶关动物园门票多少钱? 韶关有动物园吗?韶关华南虎动物园地址? 韶关动物园地址?韶关华南虎去哪看? 韶关市华南虎动物园在哪里?华南虎动物园旅游攻略? 自由行需要准备什么? 2015香山红叶节是什么时候?香山红叶节攻略 川洞一日游攻略?天峨县川洞门票优惠吗? 川洞有什么好玩的?天峨县川洞在哪里? SFH3211FA Datasheet SFH3211FA Datasheet SFH3211FA-3/4 Datasheet SFH3211FA-3/4 Datasheet SFH3219 Datasheet SFH3219 Datasheet 福田汽车站订票电话 福田汽车站订票电话 福田汽车站订票电话 流溪河国家森林公园好玩吗 流溪河国家森林公园好玩吗 流溪河国家森林公园好玩吗 iphone5香港价格 iphone5香港价格 iphone5香港价格