这篇文章主要介绍“PartitionManager分区管理器怎么使用”,在日常操作中,相信很多人在PartitionManager分区管理器怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PartitionManager分区管理器怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

成都创新互联企业建站,十多年网站建设经验,专注于网站建设技术,精于网页设计,有多年建站和网站代运营经验,设计师为客户打造网络企业风格,提供周到的建站售前咨询和贴心的售后服务。对于成都网站制作、成都做网站中不同领域进行深入了解和探索,创新互联在网站建设中充分了解客户行业的需求,以灵动的思维在网页中充分展现,通过对客户行业精准市场调研,为客户提供的解决方案。
阅读背景:对于java内部类有一个粗浅的认识
阅读目的:了解kafka 分区是如何在Storm接口之中进行管理的
最终主题:详尽的梳理PartitionManager的整个过程
package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
import com.google.common.collect.ImmutableMap;
import com.mixbox.storm.kafka.KafkaSpout.EmitState;
import com.mixbox.storm.kafka.KafkaSpout.MessageAndRealOffset;
import com.mixbox.storm.kafka.trident.MaxMetric;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* 分区的管理器
*
* @author Yin Shuai
*
*/
public class PartitionManager {
public static final Logger LOG = LoggerFactory
.getLogger(PartitionManager.class);
private final CombinedMetric _fetchAPILatencyMax;
private final ReducedMetric _fetchAPILatencyMean;
private final CountMetric _fetchAPICallCount;
private final CountMetric _fetchAPIMessageCount;
/**
* kafka MessageID 封装了 partition 和offset
*
* @author Yin Shuai
*/
static class KafkaMessageId {
public Partition partition;
public long offset;
public KafkaMessageId(Partition partition, long offset) {
this.partition = partition;
this.offset = offset;
}
}
// 被发送的偏移量
Long _emittedToOffset;
SortedSet _pending = new TreeSet();
// 已经提交的
Long _committedTo;
// 等待去发射
LinkedList _waitingToEmit = new LinkedList();
// 分区
Partition _partition;
// Storm Spout的配置文件
SpoutConfig _spoutConfig;
// topology 的实例ID
String _topologyInstanceId;
// kafka 底层的消费者ID
SimpleConsumer _consumer;
// 动态的分区Connection
DynamicPartitionConnections _connections;
//ZKState 状态的维护
ZkState _state;
//Storm的配置文件
Map _stormConf;
//
@SuppressWarnings("unchecked")
public PartitionManager(DynamicPartitionConnections connections,
String topologyInstanceId, ZkState state, Map stormConf,
SpoutConfig spoutConfig, Partition id) {
_partition = id;
_connections = connections;
_spoutConfig = spoutConfig;
_topologyInstanceId = topologyInstanceId;
_consumer = connections.register(id.host, id.partition);
_state = state;
_stormConf = stormConf;
String jsonTopologyId = null;
Long jsonOffset = null;
String path = committedPath();
try {
Map