聊聊Pulsar的FailOver消费模式

引言

在中间件团队维护了近一年的Pulsar组件,前段时间其他业务团队的同学向我询问,关于Pulsar消费者客户端,在使用FailOver模式的情况下,客户端断链重连,能否保证partition的负载均衡,如果是均衡的,那他是怎样实现的。这里展开客户端代码和服务端代码来分析一下。

客户端

consumer通过builder模式构造,subscribe 通过调用 subscribeAsync,其返回一个 CompletableFuture,入口在 org.apache.pulsar.client.impl.ConsumerBuilderImpl#subscribe

@Override
public Consumer<T> subscribe() throws PulsarClientException {
    try {
        return subscribeAsync().get();
    } catch (Exception e) {
        throw PulsarClientException.unwrap(e);
    }
}

@Override
public CompletableFuture<Consumer<T>> subscribeAsync()
    //...

subscribeAsync 方法会进行一系列死信对了与重试队列的配置,随后调用起client的 subscribeAsync

@Override
public CompletableFuture<Consumer<T>> subscribeAsync()
// ...
return applyDLQConfig.thenCompose(__ -> {
    if (interceptorList == null || interceptorList.size() == 0) {
        return client.subscribeAsync(conf, schema, null);
    } else {
        return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
    }
});

consumer的建链依赖client的配置,client在调用 subscribeAsync 时会对配置做一次检查,比如subscription_name是否合法,topic_name是否合法等。检查结束后,client会调用自身的 singleTopicSubscribeAsync 方法或 multiTopicSubscribeAsync 进行建链操作。

public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema,
                                                             ConsumerInterceptors<T> interceptors) {
// ...
if (conf.getTopicNames().size() == 1) {
    return singleTopicSubscribeAsync(conf, schema, interceptors);
} else {
    return multiTopicSubscribeAsync(conf, schema, interceptors);
}

由于涉及到partition的负载均衡,所以订阅会走 multiTopicSubscribeAsync 进行,接着看 multiTopicSubscribeAsync

private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf,
                                                                        Schema<T> schema,
                                                                        ConsumerInterceptors<T> interceptors) {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();

    ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
            externalExecutorProvider, consumerSubscribedFuture, schema, interceptors,
            true /* createTopicIfDoesNotExist */);
    
    consumers.add(consumer);
    
    return consumerSubscribedFuture;
}

multiTopicSubscribeAsync 实在时简单,实例化consumer后,将consumer对象放入 consumers 容器里面,所以consumer的建链动作也会在这里面完成,接着看 new MultiTopicsConsumerImpl 在做什么。

org.apache.pulsar.client.impl.MultiTopicsConsumerImpl#MultiTopicsConsumerImpl 中,会对consumer的资源做初始化,最后调用到核心 startReceivingMessages 上,并返回该 Future。

this.partitionedTopics = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.allTopicPartitionsNumber = new AtomicInteger(0);
this.startMessageId = (MessageIdAdv) startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
this.paused = conf.isStartPaused();
// ...
if (allTopicPartitionsNumber.get() > getCurrentReceiverQueueSize()) {
    setCurrentReceiverQueueSize(allTopicPartitionsNumber.get());
}
setState(State.Ready);
// We have successfully created N consumers, so we can start receiving messages now
startReceivingMessages(new ArrayList<>(consumers.values()));
log.info("[{}] [{}] Created topics consumer with {} sub-consumers",
    topic, subscription, allTopicPartitionsNumber.get());
subscribeFuture().complete(MultiTopicsConsumerImpl.this);

消费拉取消息的核心在 startReceivingMessages 实现逻辑如下

private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
    if (log.isDebugEnabled()) {
    log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
        topic, newConsumers.size(), getState());
    }
    
    if (getState() == State.Ready) {
    newConsumers.forEach(consumer -> {
        consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
                consumer.getCurrentReceiverQueueSize());
        internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true));
    });
    }
}

服务端

服务端的请求入口在 org.apache.pulsar.common.protocol.PulsarDecoder 中,所有的请求都会在此处通过protobuf进行内容的反序列化,消费是和SUBSCRIBE相关的,定位到该处。

case SUBSCRIBE:
checkArgument(cmd.hasSubscribe());
try {
    interceptCommand(cmd);
    handleSubscribe(cmd.getSubscribe());
} catch (InterceptException e) {
    writeAndFlush(ctx, Commands.newError(cmd.getSubscribe().getRequestId(),
            getServerError(e.getErrorCode()), e.getMessage()));
}
break;

定位到 ServerCnxhandleSubscribe 函数,经过一系列的参数校验,配置初始化,topic信息处理,服务会走到如下代码块

SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
        .subscriptionName(subscriptionName)
        .consumerId(consumerId).subType(subType)
        .priorityLevel(priorityLevel)
        .consumerName(consumerName).isDurable(isDurable)
        .startMessageId(startMessageId).metadata(metadata)
        .readCompacted(readCompacted)
        .initialPosition(initialPosition)
        .startMessageRollbackDurationSec(startMessageRollbackDurationSec)
        .replicatedSubscriptionStateArg(isReplicated)
        .keySharedMeta(keySharedMeta)
        .subscriptionProperties(subscriptionProperties)
        .consumerEpoch(consumerEpoch)
        .schemaType(schema == null ? null : schema.getType())
        .build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
    return topic.addSchemaIfIdleOrCheckCompatible(schema)
            .thenCompose(v -> topic.subscribe(option));
} else {
    return topic.subscribe(option);
}

跟踪一下 topic.subscribe,其代码路径为 PersistentTopic#subscribe -> PersistentTopic#internalSubscribe ,在 internalSubscribe 中,有一个操作,将consumer加入到 subscription

return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
    if (subscription instanceof PersistentSubscription persistentSubscription) {
        checkBackloggedCursor(persistentSubscription);
    }
    // ...
});

追一下 addConsumerToSubscription,其将consumer加入了subscription中

protected CompletableFuture<Void> addConsumerToSubscription(Subscription subscription, Consumer consumer) {
    if (isConsumersExceededOnTopic()) {
        log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", topic);
        return FutureUtil.failedFuture(new ConsumerBusyException("Topic reached max consumers limit"));
    }

    if (isSameAddressConsumersExceededOnTopic(consumer)) {
        log.warn("[{}] Attempting to add consumer to topic which reached max same address consumers limit", topic);
        return FutureUtil.failedFuture(new ConsumerBusyException("Topic reached max same address consumers limit"));
    }

    return subscription.addConsumer(consumer);
}

继续分析 subscription.addConsumer 可以看到 addConsumer 时,会判断每种订阅类型,并做处理。

case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
    // For non partition topics, use a negative index so
    // dispatcher won't sort consumers before picking
    // an active consumer for the topic.
    partitionIndex = -1;
}

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
    previousDispatcher = dispatcher;
    dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
                    partitionIndex, topic, this);
}
break;

分析 PersistentDispatcherSingleActiveConsumer 其实现了 Dispatcher 接口,看其实现类 AbstractDispatcherSingleActiveConsumer,类中有一个方法 pickAndScheduleActiveConsumer 刚好明确了FailOver的均衡过程,我把基本思路用注释标注了

protected boolean pickAndScheduleActiveConsumer() {
    // 检测consumer是否空
    checkArgument(!consumers.isEmpty());
    // 是否配置了 consumer 的消费权重
    AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
    // 消费者按照名称排序
    consumers.sort((c1, c2) -> {
        int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
        if (priority != 0) {
            hasPriorityConsumer.set(true);
            return priority;
        }
        return c1.consumerName().compareTo(c2.consumerName());
    });
    // 当前订阅组消费者个数
    int consumersSize = consumers.size();
    // 如果指定了消费者的权重 让高权重的先消费
    if (hasPriorityConsumer.get()) {
        int highestPriorityLevel = consumers.get(0).getPriorityLevel();
        for (int i = 0; i < consumers.size(); i++) {
            if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
                consumersSize = i;
                break;
            }
        }
    }
    // partitionIndex 是当前topic的partition编号 用它与消费者数量取模
    // 这样就可以确保每个partition的消费者是均分的
    int index = partitionIndex >= 0
            ? partitionIndex % consumersSize
            : peekConsumerIndexFromHashRing(makeHashRing(consumersSize));

    // 从consumer列表中,指定刚刚拿到的index对应的消费者,将该consumer写入topic的active consumer中
    Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));

    Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
    if (prevConsumer == activeConsumer) {
        // Active consumer did not change. Do nothing at this point
        return false;
    } else {
        // If the active consumer is changed, send notification.
        scheduleReadOnActiveConsumer();
        return true;
    }
}

总结

每当有新consumer加入订阅组或旧consumer离开订阅组,都会触发 pickAndScheduleActiveConsumer ,其核心逻辑是排序当前订阅组下的consumer,取当前topic的partition下标,模除当前订阅组的消费者个数,其余数对应在consumer列表中的下标即为当前topic的活跃consumer。