引言
在中间件团队维护了近一年的Pulsar组件,前段时间其他业务团队的同学向我询问,关于Pulsar消费者客户端,在使用FailOver模式的情况下,客户端断链重连,能否保证partition的负载均衡,如果是均衡的,那他是怎样实现的。这里展开客户端代码和服务端代码来分析一下。
客户端
consumer通过builder模式构造,subscribe
通过调用 subscribeAsync
,其返回一个 CompletableFuture
,入口在 org.apache.pulsar.client.impl.ConsumerBuilderImpl#subscribe
。
@Override
public Consumer<T> subscribe() throws PulsarClientException {
try {
return subscribeAsync().get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public CompletableFuture<Consumer<T>> subscribeAsync()
//...
该 subscribeAsync
方法会进行一系列死信对了与重试队列的配置,随后调用起client的 subscribeAsync
@Override
public CompletableFuture<Consumer<T>> subscribeAsync()
// ...
return applyDLQConfig.thenCompose(__ -> {
if (interceptorList == null || interceptorList.size() == 0) {
return client.subscribeAsync(conf, schema, null);
} else {
return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
}
});
consumer的建链依赖client的配置,client在调用 subscribeAsync
时会对配置做一次检查,比如subscription_name是否合法,topic_name是否合法等。检查结束后,client会调用自身的 singleTopicSubscribeAsync
方法或 multiTopicSubscribeAsync
进行建链操作。
public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema,
ConsumerInterceptors<T> interceptors) {
// ...
if (conf.getTopicNames().size() == 1) {
return singleTopicSubscribeAsync(conf, schema, interceptors);
} else {
return multiTopicSubscribeAsync(conf, schema, interceptors);
}
由于涉及到partition的负载均衡,所以订阅会走 multiTopicSubscribeAsync
进行,接着看 multiTopicSubscribeAsync
。
private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf,
Schema<T> schema,
ConsumerInterceptors<T> interceptors) {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider, consumerSubscribedFuture, schema, interceptors,
true /* createTopicIfDoesNotExist */);
consumers.add(consumer);
return consumerSubscribedFuture;
}
multiTopicSubscribeAsync
实在时简单,实例化consumer后,将consumer对象放入 consumers
容器里面,所以consumer的建链动作也会在这里面完成,接着看 new MultiTopicsConsumerImpl
在做什么。
在 org.apache.pulsar.client.impl.MultiTopicsConsumerImpl#MultiTopicsConsumerImpl
中,会对consumer的资源做初始化,最后调用到核心 startReceivingMessages
上,并返回该 Future。
this.partitionedTopics = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.allTopicPartitionsNumber = new AtomicInteger(0);
this.startMessageId = (MessageIdAdv) startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
this.paused = conf.isStartPaused();
// ...
if (allTopicPartitionsNumber.get() > getCurrentReceiverQueueSize()) {
setCurrentReceiverQueueSize(allTopicPartitionsNumber.get());
}
setState(State.Ready);
// We have successfully created N consumers, so we can start receiving messages now
startReceivingMessages(new ArrayList<>(consumers.values()));
log.info("[{}] [{}] Created topics consumer with {} sub-consumers",
topic, subscription, allTopicPartitionsNumber.get());
subscribeFuture().complete(MultiTopicsConsumerImpl.this);
消费拉取消息的核心在 startReceivingMessages
实现逻辑如下
private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
if (log.isDebugEnabled()) {
log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
topic, newConsumers.size(), getState());
}
if (getState() == State.Ready) {
newConsumers.forEach(consumer -> {
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
consumer.getCurrentReceiverQueueSize());
internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true));
});
}
}
服务端
服务端的请求入口在 org.apache.pulsar.common.protocol.PulsarDecoder
中,所有的请求都会在此处通过protobuf进行内容的反序列化,消费是和SUBSCRIBE
相关的,定位到该处。
case SUBSCRIBE:
checkArgument(cmd.hasSubscribe());
try {
interceptCommand(cmd);
handleSubscribe(cmd.getSubscribe());
} catch (InterceptException e) {
writeAndFlush(ctx, Commands.newError(cmd.getSubscribe().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
定位到 ServerCnx
的 handleSubscribe
函数,经过一系列的参数校验,配置初始化,topic信息处理,服务会走到如下代码块
SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType)
.priorityLevel(priorityLevel)
.consumerName(consumerName).isDurable(isDurable)
.startMessageId(startMessageId).metadata(metadata)
.readCompacted(readCompacted)
.initialPosition(initialPosition)
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
.replicatedSubscriptionStateArg(isReplicated)
.keySharedMeta(keySharedMeta)
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.schemaType(schema == null ? null : schema.getType())
.build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(option));
} else {
return topic.subscribe(option);
}
跟踪一下 topic.subscribe
,其代码路径为 PersistentTopic#subscribe
-> PersistentTopic#internalSubscribe
,在 internalSubscribe
中,有一个操作,将consumer加入到 subscription
中
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
if (subscription instanceof PersistentSubscription persistentSubscription) {
checkBackloggedCursor(persistentSubscription);
}
// ...
});
追一下 addConsumerToSubscription
,其将consumer加入了subscription中
protected CompletableFuture<Void> addConsumerToSubscription(Subscription subscription, Consumer consumer) {
if (isConsumersExceededOnTopic()) {
log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", topic);
return FutureUtil.failedFuture(new ConsumerBusyException("Topic reached max consumers limit"));
}
if (isSameAddressConsumersExceededOnTopic(consumer)) {
log.warn("[{}] Attempting to add consumer to topic which reached max same address consumers limit", topic);
return FutureUtil.failedFuture(new ConsumerBusyException("Topic reached max same address consumers limit"));
}
return subscription.addConsumer(consumer);
}
继续分析 subscription.addConsumer
可以看到 addConsumer 时,会判断每种订阅类型,并做处理。
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, use a negative index so
// dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
partitionIndex, topic, this);
}
break;
分析 PersistentDispatcherSingleActiveConsumer
其实现了 Dispatcher
接口,看其实现类 AbstractDispatcherSingleActiveConsumer
,类中有一个方法 pickAndScheduleActiveConsumer
刚好明确了FailOver的均衡过程,我把基本思路用注释标注了
protected boolean pickAndScheduleActiveConsumer() {
// 检测consumer是否空
checkArgument(!consumers.isEmpty());
// 是否配置了 consumer 的消费权重
AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
// 消费者按照名称排序
consumers.sort((c1, c2) -> {
int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
if (priority != 0) {
hasPriorityConsumer.set(true);
return priority;
}
return c1.consumerName().compareTo(c2.consumerName());
});
// 当前订阅组消费者个数
int consumersSize = consumers.size();
// 如果指定了消费者的权重 让高权重的先消费
if (hasPriorityConsumer.get()) {
int highestPriorityLevel = consumers.get(0).getPriorityLevel();
for (int i = 0; i < consumers.size(); i++) {
if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
consumersSize = i;
break;
}
}
}
// partitionIndex 是当前topic的partition编号 用它与消费者数量取模
// 这样就可以确保每个partition的消费者是均分的
int index = partitionIndex >= 0
? partitionIndex % consumersSize
: peekConsumerIndexFromHashRing(makeHashRing(consumersSize));
// 从consumer列表中,指定刚刚拿到的index对应的消费者,将该consumer写入topic的active consumer中
Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (prevConsumer == activeConsumer) {
// Active consumer did not change. Do nothing at this point
return false;
} else {
// If the active consumer is changed, send notification.
scheduleReadOnActiveConsumer();
return true;
}
}
总结
每当有新consumer加入订阅组或旧consumer离开订阅组,都会触发 pickAndScheduleActiveConsumer
,其核心逻辑是排序当前订阅组下的consumer,取当前topic的partition下标,模除当前订阅组的消费者个数,其余数对应在consumer列表中的下标即为当前topic的活跃consumer。