你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

8. 源码分析之ConsumeQueue

2021/11/30 2:46:08

源码分析之ConsumeQueue

消息发送时数据在ConsumeQueue的落地

在这里插入图片描述

​ 连续发送5条消息,消息是不定长,首先所有信息先放入 Commitlog中,每一条消息放入Commitlog的时候都需要上锁,确保顺序的写入。

当Commitlog写成功了之后。数据再同步到ConsunmeQueue中

​ 并且数据一条条分发,这个是一个典型的轮询

​ Queue Offset 代表一个Queue中的第几条消息

​ Logic Offset就是Queue Offset*20 因为每一条ConsumeQueue中的消息长度都是20.

​ Physical Offset,这个是在 Commitlog中每一条消息偏移量。

这种设计非常的巧妙:

​ 查找消息的时候,可以直按根据队列的消息序号,计算出索引的全局位置(比如序号2,就知道偏移量是20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息、这里面比较耗时两个操作就是分别找到索引和消息所在文件,这两次查找是差不多的,都可以抽象成:

​ 因为每个索引文件或者消息文件的长度的是固定的,对于每一组文件,都维护了一个由小到大有序的文件数组。查找文件的时候,直接通过计算即可获取文件在数组中的序号:

​ 文件在数组中的序号=(全局位置-第一个文件的文件名)/文件固定大小

​ 在通过序号在数组中获取数据的时间复杂度是0(1),二次查找文件的时间复杂度也是是:0(1)+0(1) =0 (1),所以消费时查找数据的时间复杂度也是O(1)。

入口:ReputMessageService.doReput(独立线程)

DefaultMessageStore. start()

// org.apache.rocketmq.store.DefaultMessageStore#start
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
         maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(),
         this.commitLog.getConfirmOffset());
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start(); // here

ReputMessageService.run()

// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            Thread.sleep(1);
            this.doReput(); // here
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
private void doReput() {
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
        //主要是构建ConsumerQueue和Index
        //reputFromOffset:构建ConsumerQueue和Index的进度
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            ...
                DefaultMessageStore.this.doDispatch(dispatchRequest); // here
            ...
        }
    }
}
// org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // 构建供消费端使用的逻辑队列
                DefaultMessageStore.this.putMessagePositionInfo(request); // here
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}
// org.apache.rocketmq.store.DefaultMessageStore#putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

public void putMessagePositionInfoWrapper(DispatchRequest request) {
...
    // 构建供消费端使用的逻辑队列
    boolean result = this.putMessagePositionInfo(
    request.getCommitLogOffset(), request.getMsgSize(), 
    tagsCode, request.getConsumeQueueOffset());
}

// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
                                       final long cqOffset) {
    ...
	MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); // here
    if (mappedFile != null) {
        return mappedFile.appendMessage(this.byteBufferIndex.array()); // here
    }
}
// org.apache.rocketmq.store.MappedFile#appendMessage(byte[])
public boolean appendMessage(final byte[] data) {
    int currentPos = this.wrotePosition.get();
    if ((currentPos + data.length) <= this.fileSize) {
        try {
            this.fileChannel.position(currentPos);
            this.fileChannel.write(ByteBuffer.wrap(data));
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        this.wrotePosition.addAndGet(data.length);
        return true;
    }
    return false;
}

异步刷盘

// org.apache.rocketmq.store.DefaultMessageStore#DefaultMessageStore
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, 
                            final BrokerStatsManager brokerStatsManager,
                            final MessageArrivingListener messageArrivingListener,
                            final BrokerConfig brokerConfig) throws IOException {
     this.messageArrivingListener = messageArrivingListener;
     this.brokerConfig = brokerConfig;
     this.messageStoreConfig = messageStoreConfig;
     this.brokerStatsManager = brokerStatsManager;
     this.allocateMappedFileService = new AllocateMappedFileService(this);
     if (messageStoreConfig.isEnableDLegerCommitLog()) {
         this.commitLog = new DLedgerCommitLog(this);
     } else {
         this.commitLog = new CommitLog(this);
     }
     this.consumeQueueTable = new ConcurrentHashMap<>(32);

     this.flushConsumeQueueService = new FlushConsumeQueueService(); // here
 }
// 
private void doFlush(int retryTimes) {
...
    if (0 == flushConsumeQueueLeastPages) {
        if (logicsMsgTimestamp > 0) {
            DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
        }
        DefaultMessageStore.this.getStoreCheckpoint().flush(); // here
    }
}