你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

luene核心代码分析报告13

2021/12/20 8:30:16

2021SC@SDUSC

关闭 IndexWriter 对象

代码:

writer.close(); 
--> IndexWriter.closeInternal(boolean) 
 --> (1) 将索引信息由内存写入磁盘: flush(waitForMerges, true, true); 
 --> (2) 进行段合并: mergeScheduler.merge(this); 

对段的合并将在后面的章节进行讨论,此处仅仅讨论将索引信息由写入磁盘的过程。
代码:
I

ndexWriter.flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) 
--> IndexWriter.doFlush(boolean flushDocStores, boolean flushDeletes) 
 --> IndexWriter.doFlushInternal(boolean flushDocStores, boolean flushDeletes) 

将索引写入磁盘包括以下几个过程:

得到要写入的段名:String segment = docWriter.getSegment(); DocumentsWriter 将缓存的信息写入段:docWriter.flush(flushDocStores);
生成新的段信息对象:newSegment = new SegmentInfo(segment, flushedDocCount,
directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile,
docWriter.hasProx());

准备删除文档:docWriter.pushDeletes();
生成 cfs 段:docWriter.createCompoundFile(segment);
删除文档:applyDeletes();

得到要写入的段名

代码:

SegmentInfo newSegment = null; 
final int numDocs = docWriter.getNumDocsInRAM();//文档总数
String docStoreSegment = docWriter.getDocStoreSegment();//存储域和词向量所要要写入的段
名,"_0" 
int docStoreOffset = docWriter.getDocStoreOffset();//存储域和词向量要写入的段中的偏移量
String segment = docWriter.getSegment();//段名,"_0" 

在 Lucene 的索引文件结构一章做过详细介绍,存储域和词向量可以和索引域存储在不同段中。

将缓存的内容写入段 将缓存的内容写入段

代码:

flushedDocCount = docWriter.flush(flushDocStores); 

此过程又包以下两个阶段;
1、按照基本索引链关闭存储域和词向量信息
1、按照基本索引链的结构将索引结果写入段

按照基本索引链关闭存储域和词向量信息

代码为:

closeDocStore(); 
flushState.numDocsInStore = 0; 

其主要是根据基本索引链结构,关闭存储域和词向量信息:
consumer(DocFieldProcessor).closeDocStore(flushState);
consumer(DocInverter).closeDocStore(state);
consumer(TermsHash).closeDocStore(state);
consumer(FreqProxTermsWriter).closeDocStore(state);
if (nextTermsHash != null) nextTermsHash.closeDocStore(state);
consumer(TermVectorsTermsWriter).closeDocStore(state);
endConsumer(NormsWriter).closeDocStore(state); fieldsWriter(StoredFieldsWriter).closeDocStore(state);
其中有实质意义的是以下两个 closeDocStore:

词向量的关闭:

TermVectorsTermsWriter.closeDocStore(SegmentWriteState) 
void closeDocStore(final SegmentWriteState state) throws IOException { 
 if (tvx != null) { 
 //为不保存词向量的文档在 tvd 文件中写入零。即便不保存词向量,在 tvx, tvd 中也保
留一个位置 
 fill(state.numDocsInStore - docWriter.getDocStoreOffset()); 
 //关闭 tvx, tvf, tvd 文件的写入流 
 tvx.close(); 
 tvf.close(); 
 tvd.close(); 
 tvx = null; 
 //记录写入的文件名,为以后生成 cfs 文件的时候,将这些写入的文件生成一个统一的cfs 文件。 
 state.flushedFiles.add(state.docStoreSegmentName + "." + 
IndexFileNames.VECTORS_INDEX_EXTENSION); 
 state.flushedFiles.add(state.docStoreSegmentName + "." + 
IndexFileNames.VECTORS_FIELDS_EXTENSION); 
 state.flushedFiles.add(state.docStoreSegmentName + "." + 
IndexFileNames.VECTORS_DOCUMENTS_EXTENSION); 
 //从 DocumentsWriter 的成员变量 openFiles 中删除,᳾来可能被 IndexFileDeleter 删除
 docWriter.removeOpenFile(state.docStoreSegmentName + "." + 
IndexFileNames.VECTORS_INDEX_EXTENSION); 
 docWriter.removeOpenFile(state.docStoreSegmentName + "." + 
IndexFileNames.VECTORS_FIELDS_EXTENSION); 
 docWriter.removeOpenFile(state.docStoreSegmentName + "." + 
IndexFileNames.VECTORS_DOCUMENTS_EXTENSION); 
 lastDocID = 0; 
 } 
} 

存储域的关闭

StoredFieldsWriter.closeDocStore(SegmentWriteState) 
public void closeDocStore(SegmentWriteState state) throws IOException { 
 //关闭 fdx, fdt 写入流
 fieldsWriter.close(); 
 --> fieldsStream.close(); 
 --> indexStream.close(); 
 fieldsWriter = null; 
 lastDocID = 0; 
 //记录写入的文件名 
 state.flushedFiles.add(state.docStoreSegmentName + "." + 
IndexFileNames.FIELDS_EXTENSION); 
 state.flushedFiles.add(state.docStoreSegmentName + "." + 
IndexFileNames.FIELDS_INDEX_EXTENSION); 
 state.docWriter.removeOpenFile(state.docStoreSegmentName + "." + 
IndexFileNames.FIELDS_EXTENSION); 
 state.docWriter.removeOpenFile(state.docStoreSegmentName + "." + 
IndexFileNames.FIELDS_INDEX_EXTENSION); 
} 

按照基本索引链的结构将索引结果写入段

代码为:

consumer(DocFieldProcessor).flush(threads, flushState); 
 //回收 fieldHash,以便用于下一轮的索引,为提高效率,索引链中的对象是被复用的。
 Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> 
childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, 
Collection<DocFieldConsumerPerField>>(); 
 for ( DocConsumerPerThread thread : threads) { 
 DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread; 
 childThreadsAndFields.put(perThread.consumer, perThread.fields()); 
 perThread.trimFields(state); 
 } 
 //写入存储域
 --> fieldsWriter(StoredFieldsWriter).flush(state); 
 //写入索引域
 --> consumer(DocInverter).flush(childThreadsAndFields, state); 
 //写入域元数据信息,并记录写入的文件名,以便以后生成 cfs 文件
 --> final String fileName = state.segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION); 
 --> fieldInfos.write(state.directory, fileName); 
 --> state.flushedFiles.add(fileName); 

此过程也是按照基本索引链来的:
consumer(DocFieldProcessor).flush(…);
consumer(DocInverter).flush(…);
consumer(TermsHash).flush(…);
consumer(FreqProxTermsWriter).flush(…);
if (nextTermsHash != null) nextTermsHash.flush(…);
consumer(TermVectorsTermsWriter).flush(…);
endConsumer(NormsWriter).flush(…);
fieldsWriter(StoredFieldsWriter).flush(…);

写入存储域 写入存储域

代码为:

StoredFieldsWriter.flush(SegmentWriteState state) { 
 if (state.numDocsInStore > 0) { 
 initFieldsWriter(); 
 fill(state.numDocsInStore - docWriter.getDocStoreOffset()); 
 } 
 if (fieldsWriter != null) 
 fieldsWriter.flush(); 
 } 

从代码中可以看出,是写入 fdx, fdt 两个文件,但是在上述的 closeDocStore 已经写入了,并且把 state.numDocsInStore 置零,fieldsWriter 设为 null,在这里其实什么也不做。

写入索引域

代码为:

DocInverter.flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>, 
SegmentWriteState) 
 //写入倒排表及词向量信息
 --> consumer(TermsHash).flush(childThreadsAndFields, state); 
 //写入标准化因子
 --> endConsumer(NormsWriter).flush(endChildThreadsAndFields, state); 

写入倒排表及词向量信息

代码为:

TermsHash.flush(Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>, 
SegmentWriteState) 
 //写入倒排表信息
 --> consumer(FreqProxTermsWriter).flush(childThreadsAndFields, state); 
 //回收 RawPostingList 
 --> shrinkFreePostings(threadsAndFields, state); 
 //写入词向量信息
 --> if (nextTermsHash != null) nextTermsHash.flush(nextThreadsAndFields, state); 
 --> consumer(TermVectorsTermsWriter).flush(childThreadsAndFields, state); 

写入倒排表信息

代码为:

FreqProxTermsWriter.flush(Map<TermsHashConsumerPerThread, 
 Collection<TermsHashConsumerPerField>>, SegmentWriteState) 

(a) 所有域按名称排序 使得同名域能够一起处理 使得同名域能够一起处理

 Collections.sort(allFields); 
 final int numAllFields = allFields.size(); 

(b) 生成倒排表的写对象

 final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, 
fieldInfos); 
 int start = 0; 

© 对于每一个域

 while(start < numAllFields) { 

(c-1) 找出所有的同名域

 final FieldInfo fieldInfo = allFields.get(start).fieldInfo; 
 final String fieldName = fieldInfo.name; 
 int end = start+1; 
 while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName)) 
 end++; 
 FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start]; 
 for(int i=start;i<end;i++) { 
 fields[i-start] = allFields.get(i); 
 fieldInfo.storePayloads |= fields[i-start].hasPayloads; 
 } 

(c-2) 将同名域的倒排表添加到文件

 appendPostings(fields, consumer); 

(c-3) 释放空间

 for(int i=0;i<fields.length;i++) { 
 TermsHashPerField perField = fields[i].termsHashPerField; 
 int numPostings = perField.numPostings;
 perField.reset(); 
 perField.shrinkHash(numPostings); 
 fields[i].reset(); 
 } 
 start = end; 
 } 

(d) 关闭倒排表的写对象

 consumer.finish(); 

(b) 生成倒排表的写对象
代码为:

public FormatPostingsFieldsWriter(SegmentWriteState state, FieldInfos fieldInfos) throws 
IOException { 
 dir = state.directory; 
 segment = state.segmentName; 
 totalNumDocs = state.numDocs; 
 this.fieldInfos = fieldInfos; 
 //用于写 tii,tis 
 termsOut = new TermInfosWriter(dir, segment, fieldInfos, state.termIndexInterval); 
 //用于写 freq, prox 的跳表 
 skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval, termsOut.maxSkipLevels, 
totalNumDocs, null, null); 
 //记录写入的文件名, 
 state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_EXTENSION)); 
 state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION)); 
 //用以上两个写对象,按照一定的格式写入段 
 termsWriter = new FormatPostingsTermsWriter(state, this); 
} 

© 将同名域的倒排表添加到文件
代码为:

FreqProxTermsWriter.appendPostings(FreqProxTermsWriterPerField[], 
FormatPostingsFieldsConsumer) { 
 int numFields = fields.length; 
 final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields]; 
 for(int i=0;i<numFields;i++) { 
 FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]); 
 boolean result = fms.nextTerm(); //对所有的域,取第一个词(Term) 
 }