2021SC@SDUSC
关闭 IndexWriter 对象
代码:
writer.close();
--> IndexWriter.closeInternal(boolean)
--> (1) 将索引信息由内存写入磁盘: flush(waitForMerges, true, true);
--> (2) 进行段合并: mergeScheduler.merge(this);
对段的合并将在后面的章节进行讨论,此处仅仅讨论将索引信息由写入磁盘的过程。
代码:
I
ndexWriter.flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes)
--> IndexWriter.doFlush(boolean flushDocStores, boolean flushDeletes)
--> IndexWriter.doFlushInternal(boolean flushDocStores, boolean flushDeletes)
将索引写入磁盘包括以下几个过程:
得到要写入的段名:String segment = docWriter.getSegment(); DocumentsWriter 将缓存的信息写入段:docWriter.flush(flushDocStores);
生成新的段信息对象:newSegment = new SegmentInfo(segment, flushedDocCount,
directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile,
docWriter.hasProx());
准备删除文档:docWriter.pushDeletes();
生成 cfs 段:docWriter.createCompoundFile(segment);
删除文档:applyDeletes();
得到要写入的段名
代码:
SegmentInfo newSegment = null;
final int numDocs = docWriter.getNumDocsInRAM();//文档总数
String docStoreSegment = docWriter.getDocStoreSegment();//存储域和词向量所要要写入的段
名,"_0"
int docStoreOffset = docWriter.getDocStoreOffset();//存储域和词向量要写入的段中的偏移量
String segment = docWriter.getSegment();//段名,"_0"
在 Lucene 的索引文件结构一章做过详细介绍,存储域和词向量可以和索引域存储在不同段中。
将缓存的内容写入段 将缓存的内容写入段
代码:
flushedDocCount = docWriter.flush(flushDocStores);
此过程又包以下两个阶段;
1、按照基本索引链关闭存储域和词向量信息
1、按照基本索引链的结构将索引结果写入段
按照基本索引链关闭存储域和词向量信息
代码为:
closeDocStore();
flushState.numDocsInStore = 0;
其主要是根据基本索引链结构,关闭存储域和词向量信息:
consumer(DocFieldProcessor).closeDocStore(flushState);
consumer(DocInverter).closeDocStore(state);
consumer(TermsHash).closeDocStore(state);
consumer(FreqProxTermsWriter).closeDocStore(state);
if (nextTermsHash != null) nextTermsHash.closeDocStore(state);
consumer(TermVectorsTermsWriter).closeDocStore(state);
endConsumer(NormsWriter).closeDocStore(state); fieldsWriter(StoredFieldsWriter).closeDocStore(state);
其中有实质意义的是以下两个 closeDocStore:
词向量的关闭:
TermVectorsTermsWriter.closeDocStore(SegmentWriteState)
void closeDocStore(final SegmentWriteState state) throws IOException {
if (tvx != null) {
//为不保存词向量的文档在 tvd 文件中写入零。即便不保存词向量,在 tvx, tvd 中也保
留一个位置
fill(state.numDocsInStore - docWriter.getDocStoreOffset());
//关闭 tvx, tvf, tvd 文件的写入流
tvx.close();
tvf.close();
tvd.close();
tvx = null;
//记录写入的文件名,为以后生成 cfs 文件的时候,将这些写入的文件生成一个统一的cfs 文件。
state.flushedFiles.add(state.docStoreSegmentName + "." +
IndexFileNames.VECTORS_INDEX_EXTENSION);
state.flushedFiles.add(state.docStoreSegmentName + "." +
IndexFileNames.VECTORS_FIELDS_EXTENSION);
state.flushedFiles.add(state.docStoreSegmentName + "." +
IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
//从 DocumentsWriter 的成员变量 openFiles 中删除,来可能被 IndexFileDeleter 删除
docWriter.removeOpenFile(state.docStoreSegmentName + "." +
IndexFileNames.VECTORS_INDEX_EXTENSION);
docWriter.removeOpenFile(state.docStoreSegmentName + "." +
IndexFileNames.VECTORS_FIELDS_EXTENSION);
docWriter.removeOpenFile(state.docStoreSegmentName + "." +
IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
lastDocID = 0;
}
}
存储域的关闭
StoredFieldsWriter.closeDocStore(SegmentWriteState)
public void closeDocStore(SegmentWriteState state) throws IOException {
//关闭 fdx, fdt 写入流
fieldsWriter.close();
--> fieldsStream.close();
--> indexStream.close();
fieldsWriter = null;
lastDocID = 0;
//记录写入的文件名
state.flushedFiles.add(state.docStoreSegmentName + "." +
IndexFileNames.FIELDS_EXTENSION);
state.flushedFiles.add(state.docStoreSegmentName + "." +
IndexFileNames.FIELDS_INDEX_EXTENSION);
state.docWriter.removeOpenFile(state.docStoreSegmentName + "." +
IndexFileNames.FIELDS_EXTENSION);
state.docWriter.removeOpenFile(state.docStoreSegmentName + "." +
IndexFileNames.FIELDS_INDEX_EXTENSION);
}
按照基本索引链的结构将索引结果写入段
代码为:
consumer(DocFieldProcessor).flush(threads, flushState);
//回收 fieldHash,以便用于下一轮的索引,为提高效率,索引链中的对象是被复用的。
Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>
childThreadsAndFields = new HashMap<DocFieldConsumerPerThread,
Collection<DocFieldConsumerPerField>>();
for ( DocConsumerPerThread thread : threads) {
DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
childThreadsAndFields.put(perThread.consumer, perThread.fields());
perThread.trimFields(state);
}
//写入存储域
--> fieldsWriter(StoredFieldsWriter).flush(state);
//写入索引域
--> consumer(DocInverter).flush(childThreadsAndFields, state);
//写入域元数据信息,并记录写入的文件名,以便以后生成 cfs 文件
--> final String fileName = state.segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION);
--> fieldInfos.write(state.directory, fileName);
--> state.flushedFiles.add(fileName);
此过程也是按照基本索引链来的:
consumer(DocFieldProcessor).flush(…);
consumer(DocInverter).flush(…);
consumer(TermsHash).flush(…);
consumer(FreqProxTermsWriter).flush(…);
if (nextTermsHash != null) nextTermsHash.flush(…);
consumer(TermVectorsTermsWriter).flush(…);
endConsumer(NormsWriter).flush(…);
fieldsWriter(StoredFieldsWriter).flush(…);
写入存储域 写入存储域
代码为:
StoredFieldsWriter.flush(SegmentWriteState state) {
if (state.numDocsInStore > 0) {
initFieldsWriter();
fill(state.numDocsInStore - docWriter.getDocStoreOffset());
}
if (fieldsWriter != null)
fieldsWriter.flush();
}
从代码中可以看出,是写入 fdx, fdt 两个文件,但是在上述的 closeDocStore 已经写入了,并且把 state.numDocsInStore 置零,fieldsWriter 设为 null,在这里其实什么也不做。
写入索引域
代码为:
DocInverter.flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>,
SegmentWriteState)
//写入倒排表及词向量信息
--> consumer(TermsHash).flush(childThreadsAndFields, state);
//写入标准化因子
--> endConsumer(NormsWriter).flush(endChildThreadsAndFields, state);
写入倒排表及词向量信息
代码为:
TermsHash.flush(Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>,
SegmentWriteState)
//写入倒排表信息
--> consumer(FreqProxTermsWriter).flush(childThreadsAndFields, state);
//回收 RawPostingList
--> shrinkFreePostings(threadsAndFields, state);
//写入词向量信息
--> if (nextTermsHash != null) nextTermsHash.flush(nextThreadsAndFields, state);
--> consumer(TermVectorsTermsWriter).flush(childThreadsAndFields, state);
写入倒排表信息
代码为:
FreqProxTermsWriter.flush(Map<TermsHashConsumerPerThread,
Collection<TermsHashConsumerPerField>>, SegmentWriteState)
(a) 所有域按名称排序 使得同名域能够一起处理 使得同名域能够一起处理
Collections.sort(allFields);
final int numAllFields = allFields.size();
(b) 生成倒排表的写对象
final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state,
fieldInfos);
int start = 0;
© 对于每一个域
while(start < numAllFields) {
(c-1) 找出所有的同名域
final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
final String fieldName = fieldInfo.name;
int end = start+1;
while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
end++;
FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
for(int i=start;i<end;i++) {
fields[i-start] = allFields.get(i);
fieldInfo.storePayloads |= fields[i-start].hasPayloads;
}
(c-2) 将同名域的倒排表添加到文件
appendPostings(fields, consumer);
(c-3) 释放空间
for(int i=0;i<fields.length;i++) {
TermsHashPerField perField = fields[i].termsHashPerField;
int numPostings = perField.numPostings;
perField.reset();
perField.shrinkHash(numPostings);
fields[i].reset();
}
start = end;
}
(d) 关闭倒排表的写对象
consumer.finish();
(b) 生成倒排表的写对象
代码为:
public FormatPostingsFieldsWriter(SegmentWriteState state, FieldInfos fieldInfos) throws
IOException {
dir = state.directory;
segment = state.segmentName;
totalNumDocs = state.numDocs;
this.fieldInfos = fieldInfos;
//用于写 tii,tis
termsOut = new TermInfosWriter(dir, segment, fieldInfos, state.termIndexInterval);
//用于写 freq, prox 的跳表
skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval, termsOut.maxSkipLevels,
totalNumDocs, null, null);
//记录写入的文件名,
state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_EXTENSION));
state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));
//用以上两个写对象,按照一定的格式写入段
termsWriter = new FormatPostingsTermsWriter(state, this);
}
© 将同名域的倒排表添加到文件
代码为:
FreqProxTermsWriter.appendPostings(FreqProxTermsWriterPerField[],
FormatPostingsFieldsConsumer) {
int numFields = fields.length;
final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];
for(int i=0;i<numFields;i++) {
FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]);
boolean result = fms.nextTerm(); //对所有的域,取第一个词(Term)
}