你的位置:首页 > 软件开发 > 数据库 > KafkaSpout的核心逻辑PartitionManager

KafkaSpout的核心逻辑PartitionManager

发布时间:2016-10-13 22:00:07
新浪微博:intsmaze刘洋洋哥 KafkaSpout的核心逻辑都是由PartitionManager来实现的。但是这个类实现时候需要考虑的东西有些多,0.92至0.93,至当前(2015.3.14)的master一直在变化。在这里,先分析一下最近的发布版0.93里的逻辑。 ...

新浪微博:intsmaze刘洋洋哥

KafkaSpout的核心逻辑PartitionManager

 

KafkaSpout的核心逻辑都是由PartitionManager来实现的。但是这个类实现时候需要考虑的东西有些多,0.92至0.93,至当前(2015.3.14)的master一直在变化。在这里,先分析一下最近的发布版0.93里的逻辑。也提出一些问题,希望以后Apache Storm会把这个类实现地更完美一些。PartitionManager的主要功能PartitionManager用来管理单个Partition。提供持久化读取进度、读取消息功能,并提供Storm的spout需要实现的nextTuple, fail, ack等功能。实现PartitionManager需要考虑的问题有一些问题是设计PartitionManager时必须考虑的,先把他们提一下,然后看下0.93版PartitionManager的实现。关于批量读取消息以及缓存消息由于Kafka的实现细节(为了高吞吐而设计的log格式,通讯协议),Kafka的SimpleConsumer每次读取消息是会去读取一批,而不能指定响应想要包含的具体的offset,并且由于消息按批压缩,使得每次读取的响应包含的offset可能比想要的最小的offset还要小(但不会更大)。所以,对于PartitoinManager来说,在内部构造一个缓存,保存每次读取得到的一批message是一种自然而且高效的方式。允许有超过一个message处于pendding(已发送但没有ack)状态?如果在发射一个message的tuple之后,就开始等待。那么ack、fail、commit的逻辑就会很简单。但这样消息的处理效率会被极大的降低,且不说还可能使得下游bolt的一些task没事可做。所以一定得允许多个message正在被blot处理,也就是需要有pendding messages的集合。有了pendding的messages集合,ack, fail, commit的逻辑就变得比较复杂,且需要做出一些折衷。    当有message对应的tuple失败时,如何在处理其它正常的消息时,特殊处理失败的这些message?PartitionManager的具体实现下面是PartitionManager的commit方法的主要部分:

Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()          .put("topology", ImmutableMap.of("id", _topologyInstanceId,              "name", _stormConf.get(Config.TOPOLOGY_NAME)))          .put("offset", lastCompletedOffset)          .put("partition", _partition.partition)          .put("broker", ImmutableMap.of("host", _partition.host.host,              "port", _partition.host.port))          .put("topic", _spoutConfig.topic).build(); _state.writeJSON(committedPath(), data);_committedTo = lastCompletedOffset;
另外,这个JSON数据写的路径committedPath也是很重要的。PartitionManager初始化时,会从这个committedPath路径读取信息。

  private String committedPath() {    return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();  }
另外,在所有记录里,最重要的就是offset这个记录。它的意义,使得PartitionManager不得不做出很多权衡。

  Long _emittedToOffset;  SortedSet<Long> _pending = new TreeSet<Long>();  SortedSet<Long> failed = new TreeSet<Long>();  Long _committedTo;  LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();  long numberFailed, numberAcked;
当PartitionManager的next方法被调用以emit新tuple时,它只会从_waitingToEmit取消息。那么failed里的消息如何被再重试呢?原因在于_waitingToEmit为空时,next方**调用fill方法,而fill方**考虑到failed集合内的元素,不过是一种比较粗放的做法。fill方法的主要逻辑依次分为三个部分:    判断该从哪个offset开始,从Kafka抓取消息第一部分:

    final boolean had_failed = !failed.isEmpty();    // Are there failed tuples? If so, fetch those first.    if (had_failed) {      offset = failed.first();    } else {      offset = _emittedToOffset;    }
如果没有failed消息,fill方法就会从之前读取过的最大的offset继续抓取。在知道了从何处抓取之后,开始真正的抓取过程:

     try {      msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);    } catch (UpdateOffsetException e) { //如果是offset "out of range", 并且设置了useStartOffsetTimeIfOffsetOutOfRange      _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);      LOG.warn("Using new offset: {}", _emittedToOffset);      // fetch failed, so don't update the metrics      return;    }

  public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {    long startOffsetTime = kafka.api.OffsetRequest.LatestTime();    if ( config.forceFromStart ) {      startOffsetTime = config.startOffsetTime;    }    return getOffset(consumer, topic, partition, startOffsetTime);  }
在获取fetch到消息以后,获取的消息集可能会包含了各种例外情况,需要细致处理:

  for (MessageAndOffset msg : msgs) {        final Long cur_offset = msg.offset();        if (cur_offset < offset) {          // Skip any old offsets.          continue;        }        if (!had_failed failed.contains(cur_offset)) {          numMessages += 1;          _pending.add(cur_offset);//_pending表示已经读取而未被ack的消息          _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));          _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);          if (had_failed) {//说明此消息在failed集合里。从failed集合里remove掉它,因为它被重新加入了_waitingToEmit集合,将要被重新处理。            failed.remove(cur_offset);//          }        }
通过对fill方法的分析可以看到,如果一个消息始终fail,除非在PartitionManager的其它方法中把它移除,否则它会使PartitionManager的处理进度停止。下面将要看到,在fail和ack方法中,这样一直fail的消息还是有机会被丢弃的,但这取决于你的配置,而这些配置是很微妙的。ack方法的主要功能是把消息从_pending集合中去掉,表示这个消息处理完成。从_pending集合去除,PartitionManager才能获取正确的处理进度的信息,以更新Zookeeper里的记录。但是,它还有别的作用。

  public void ack(Long offset) {    if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {      // Too many things pending! 已读取但未确认的消息太多了,就把此次确认的offset - maxOffsetBehind之前的清除了      _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();    }    _pending.remove(offset);//从_pending集合中移除它,表示这个消息已被处理    numberAcked++;  }
commit方法被调用时,会调用lastCompletedOffset方法获取当前PartitionManager处理的进度,并且将这个进度持久化。这个“进度”,是说在此之前的所有消息都已被ack,或“不需要ack”, 总之,是说这些消息已处理完毕。

  public long lastCompletedOffset() {    if (_pending.isEmpty()) {      return _emittedToOffset;    } else {      return _pending.first();    }  }
陷阱failed方法,使得PartitonManager的有些行为非常隐晦。结合ack、fill和commit方法,可能会出现以下特殊情况,这些情况和KafkaConfig.maxOffBehind配置,及KafkaConfig.useStartOffsetTimeIfOffsetOutOfRange配置、KafkaConfig.fetchSizeBytes配置相关。    maxOffsetBehind设置得较小,而fetchSizeBytes相对较大,使得maxOffsetBehind小于一次fetch得到的消息总数。设这批fetch得到的消息的offset范围为[a, b],那么所有小于(b - maxOffsetBehind)的offset的消息,即使处理失败,也不会被重试。设这样失败,但不会被重试的消息中的某个的offset为X, 那么如果某个大于( X + maxOffsetBehind)的消息被ack时,offset为X的这个消息会被从_pending集合中移除。但是如果所有大于(X + maxOffsetBehind)的消息都被fail了,而在(_emmittedToOffset与_emittedToOffset - maxOffsetBehind之间) 有消息failed了,那么failed集合中不会包括X,但会包括比X的offset大的元素,X不会被重试,但X会一直停留在_pending集合,造成commit无法更新实际进度,并且带来内存泄漏。总之,当前PartitionManager的实现还有很多需要改进之处,而且有些情况容易给用户带来困扰。


原标题:KafkaSpout的核心逻辑PartitionManager

关键词:ASP

ASP
*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: admin#shaoqun.com (#换成@)。