Mailing List Archive

cvs commit: jakarta-lucene/src/test/org/apache/lucene ThreadSafetyTest.java
cutting 01/09/27 09:27:02

Modified: src/java/org/apache/lucene/index IndexReader.java
IndexWriter.java SegmentReader.java
src/java/org/apache/lucene/store Directory.java
FSDirectory.java RAMDirectory.java
src/test/org/apache/lucene ThreadSafetyTest.java
Added: src/java/org/apache/lucene/store Lock.java
Log:
Added index lock files. Indexing and search are now not just thread
safe, but also "process safe": multiple processes may may now search
an index while it is being updated from another process.

Two lock files are used in an index. One is "commit.lock". This is
used to synchronize commits [IndexWriter.close()] with opens
[IndexReader.open()]. Since these actions are short-lived, attempts
to obtain this lock will block for up to ten seconds, which should be
plenty of time, before an exception is thrown.

The second lock file is "write.lock". This is used to enforce the
restriction that only one process should be adding documents to an
index at a time. This is created when an IndexWriter is constructed
and removed when it is closed. If index writing is aborted then this
file must be manually removed. Attempts to index from another process
will immediately throw an exception.

It should be impossible to corrupt an index through the Lucene API.
However if a Lucene process exits unexpectedly it can leave the index
locked. The remedy is simply to, at a time when it is certain that no
processes are accessing the index, remove all lock files.

Revision Changes Path
1.2 +16 -11 jakarta-lucene/src/java/org/apache/lucene/index/IndexReader.java

Index: IndexReader.java
===================================================================
RCS file: /home/cvs/jakarta-lucene/src/java/org/apache/lucene/index/IndexReader.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- IndexReader.java 2001/09/18 16:29:53 1.1
+++ IndexReader.java 2001/09/27 16:27:01 1.2
@@ -58,6 +58,7 @@
import java.io.File;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.Lock;
import org.apache.lucene.document.Document;

/** IndexReader is an abstract class, providing an interface for accessing an
@@ -89,17 +90,21 @@
}

/** Returns an IndexReader reading the index in the given Directory. */
- public static IndexReader open(Directory directory) throws IOException {
- synchronized (directory) {
- SegmentInfos infos = new SegmentInfos();
- infos.read(directory);
- if (infos.size() == 1) // index is optimized
- return new SegmentReader(infos.info(0), true);
-
- SegmentReader[] readers = new SegmentReader[infos.size()];
- for (int i = 0; i < infos.size(); i++)
- readers[i] = new SegmentReader(infos.info(i), i == infos.size() - 1);
- return new SegmentsReader(readers);
+ public static IndexReader open(final Directory directory) throws IOException{
+ synchronized (directory) { // in- & inter-process sync
+ return (IndexReader)new Lock.With(directory.makeLock("commit.lock")) {
+ public Object doBody() throws IOException {
+ SegmentInfos infos = new SegmentInfos();
+ infos.read(directory);
+ if (infos.size() == 1) // index is optimized
+ return new SegmentReader(infos.info(0), true);
+
+ SegmentReader[] readers = new SegmentReader[infos.size()];
+ for (int i = 0; i < infos.size(); i++)
+ readers[i] = new SegmentReader(infos.info(i), i==infos.size()-1);
+ return new SegmentsReader(readers);
+ }
+ }.run();
}
}




1.2 +26 -10 jakarta-lucene/src/java/org/apache/lucene/index/IndexWriter.java

Index: IndexWriter.java
===================================================================
RCS file: /home/cvs/jakarta-lucene/src/java/org/apache/lucene/index/IndexWriter.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- IndexWriter.java 2001/09/18 16:29:53 1.1
+++ IndexWriter.java 2001/09/27 16:27:01 1.2
@@ -62,6 +62,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.Lock;
import org.apache.lucene.store.InputStream;
import org.apache.lucene.store.OutputStream;
import org.apache.lucene.document.Document;
@@ -112,16 +113,25 @@
analyzed with <code>a</code>. If <code>create</code> is true, then a new,
empty index will be created in <code>d</code>, replacing the index already
there, if any. */
- public IndexWriter(Directory d, Analyzer a, boolean create)
+ public IndexWriter(Directory d, Analyzer a, final boolean create)
throws IOException {
directory = d;
analyzer = a;

- synchronized (directory) {
- if (create)
- segmentInfos.write(directory);
- else
- segmentInfos.read(directory);
+ Lock writeLock = directory.makeLock("write.lock");
+ if (!writeLock.obtain()) // obtain write lock
+ throw new IOException("Index locked for write: " + writeLock);
+
+ synchronized (directory) { // in- & inter-process sync
+ new Lock.With(directory.makeLock("commit.lock")) {
+ public Object doBody() throws IOException {
+ if (create)
+ segmentInfos.write(directory);
+ else
+ segmentInfos.read(directory);
+ return null;
+ }
+ }.run();
}
}

@@ -130,6 +140,7 @@
public final synchronized void close() throws IOException {
flushRamSegments();
ramDirectory.close();
+ directory.makeLock("write.lock").release(); // release write lock
directory.close();
}

@@ -286,7 +297,7 @@
int mergedDocCount = 0;
if (infoStream != null) infoStream.print("merging segments");
SegmentMerger merger = new SegmentMerger(directory, mergedName);
- Vector segmentsToDelete = new Vector();
+ final Vector segmentsToDelete = new Vector();
for (int i = minSegment; i < segmentInfos.size(); i++) {
SegmentInfo si = segmentInfos.info(i);
if (infoStream != null)
@@ -307,9 +318,14 @@
segmentInfos.addElement(new SegmentInfo(mergedName, mergedDocCount,
directory));

- synchronized (directory) {
- segmentInfos.write(directory); // commit before deleting
- deleteSegments(segmentsToDelete); // delete now-unused segments
+ synchronized (directory) { // in- & inter-process sync
+ new Lock.With(directory.makeLock("commit.lock")) {
+ public Object doBody() throws IOException {
+ segmentInfos.write(directory); // commit before deleting
+ deleteSegments(segmentsToDelete); // delete now-unused segments
+ return null;
+ }
+ }.run();
}
}




1.2 +9 -3 jakarta-lucene/src/java/org/apache/lucene/index/SegmentReader.java

Index: SegmentReader.java
===================================================================
RCS file: /home/cvs/jakarta-lucene/src/java/org/apache/lucene/index/SegmentReader.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SegmentReader.java 2001/09/18 16:29:54 1.1
+++ SegmentReader.java 2001/09/27 16:27:01 1.2
@@ -61,6 +61,7 @@

import org.apache.lucene.util.BitVector;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.Lock;
import org.apache.lucene.store.InputStream;
import org.apache.lucene.document.Document;

@@ -116,9 +117,14 @@

public final synchronized void close() throws IOException {
if (deletedDocsDirty) {
- synchronized (directory) {
- deletedDocs.write(directory, segment + ".tmp");
- directory.renameFile(segment + ".tmp", segment + ".del");
+ synchronized (directory) { // in- & inter-process sync
+ new Lock.With(directory.makeLock("commit.lock")) {
+ public Object doBody() throws IOException {
+ deletedDocs.write(directory, segment + ".tmp");
+ directory.renameFile(segment + ".tmp", segment + ".del");
+ return null;
+ }
+ }.run();
}
deletedDocsDirty = false;
}



1.2 +5 -0 jakarta-lucene/src/java/org/apache/lucene/store/Directory.java

Index: Directory.java
===================================================================
RCS file: /home/cvs/jakarta-lucene/src/java/org/apache/lucene/store/Directory.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Directory.java 2001/09/18 16:29:59 1.1
+++ Directory.java 2001/09/27 16:27:01 1.2
@@ -108,6 +108,11 @@
abstract public InputStream openFile(String name)
throws IOException, SecurityException;

+ /** Construct a {@link Lock}.
+ * @param name the name of the lock file
+ */
+ abstract public Lock makeLock(String name);
+
/** Closes the store. */
abstract public void close()
throws IOException, SecurityException;



1.2 +23 -2 jakarta-lucene/src/java/org/apache/lucene/store/FSDirectory.java

Index: FSDirectory.java
===================================================================
RCS file: /home/cvs/jakarta-lucene/src/java/org/apache/lucene/store/FSDirectory.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- FSDirectory.java 2001/09/18 16:29:59 1.1
+++ FSDirectory.java 2001/09/27 16:27:02 1.2
@@ -206,6 +206,24 @@
return new FSInputStream(new File(directory, name));
}

+ /** Construct a {@link Lock}.
+ * @param name the name of the lock file
+ */
+ public final Lock makeLock(String name) {
+ final File lockFile = new File(directory, name);
+ return new Lock() {
+ public boolean obtain() throws IOException {
+ return lockFile.createNewFile();
+ }
+ public void release() {
+ lockFile.delete();
+ }
+ public String toString() {
+ return "Lock@" + lockFile;
+ }
+ };
+ }
+
/** Closes the store to future operations. */
public final synchronized void close() throws IOException {
if (--refCount <= 0) {
@@ -214,6 +232,11 @@
}
}
}
+
+ /** For debug output. */
+ public String toString() {
+ return "FSDirectory@" + directory;
+ }
}


@@ -278,8 +301,6 @@
RandomAccessFile file = null;

public FSOutputStream(File path) throws IOException {
- if (path.isFile())
- throw new IOException(path + " already exists");
file = new RandomAccessFile(path, "rw");
}




1.2 +20 -0 jakarta-lucene/src/java/org/apache/lucene/store/RAMDirectory.java

Index: RAMDirectory.java
===================================================================
RCS file: /home/cvs/jakarta-lucene/src/java/org/apache/lucene/store/RAMDirectory.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- RAMDirectory.java 2001/09/18 16:29:59 1.1
+++ RAMDirectory.java 2001/09/27 16:27:02 1.2
@@ -123,6 +123,26 @@
return new RAMInputStream(file);
}

+ /** Construct a {@link Lock}.
+ * @param name the name of the lock file
+ */
+ public final Lock makeLock(final String name) {
+ return new Lock() {
+ public boolean obtain() throws IOException {
+ synchronized (files) {
+ if (!fileExists(name)) {
+ createFile(name).close();
+ return true;
+ }
+ return false;
+ }
+ }
+ public void release() {
+ deleteFile(name);
+ }
+ };
+ }
+
/** Closes the store to future operations. */
public final void close() {
}



1.1 jakarta-lucene/src/java/org/apache/lucene/store/Lock.java

Index: Lock.java
===================================================================
package org.apache.lucene.store;

/* ====================================================================
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Apache" and "Apache Software Foundation" and
* "Apache Lucene" must not be used to endorse or promote products
* derived from this software without prior written permission. For
* written permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* "Apache Lucene", nor may "Apache" appear in their name, without
* prior written permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/

import java.io.IOException;

/** An interprocess mutex lock.
* <p>Typical use might look like:<pre>
* new Lock.With(directory.makeLock("my.lock")) {
* public Object doBody() {
* <it>... code to execute while locked ...</it>
* }
* }.run();
* </pre>
*
* @author Doug Cutting
* @see Directory#makeLock(String)
*/

public abstract class Lock {
/** Attempt to obtain exclusive access.
*
* @return true iff exclusive access is obtained
*/
public abstract boolean obtain() throws IOException;

/** Release exclusive access. */
public abstract void release();

/** Utility class for executing code with exclusive access. */
public abstract static class With {
private Lock lock;
private int sleepInterval = 1000;
private int maxSleeps = 10;

/** Constructs an executor that will grab the named lock. */
public With(Lock lock) {
this.lock = lock;
}

/** Code to execute with exclusive access. */
protected abstract Object doBody() throws IOException;

/** Calls {@link #doBody} while <it>lock</it> is obtained. Blocks if lock
* cannot be obtained immediately. Retries to obtain lock once per second
* until it is obtained, or until it has tried ten times. */
public Object run() throws IOException {
boolean locked = false;
try {
locked = lock.obtain();
int sleepCount = 0;
while (!locked) {
if (++sleepCount == maxSleeps) {
throw new IOException("Timed out waiting for: " + lock);
}
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
throw new IOException(e.toString());
}
locked = lock.obtain();
}

return doBody();

} finally {
if (locked)
lock.release();
}
}
}

}



1.3 +23 -6 jakarta-lucene/src/test/org/apache/lucene/ThreadSafetyTest.java

Index: ThreadSafetyTest.java
===================================================================
RCS file: /home/cvs/jakarta-lucene/src/test/org/apache/lucene/ThreadSafetyTest.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ThreadSafetyTest.java 2001/09/18 17:35:57 1.2
+++ ThreadSafetyTest.java 2001/09/27 16:27:02 1.3
@@ -69,6 +69,8 @@
private static final Random RANDOM = new Random();
private static Searcher SEARCHER;

+ private static int ITERATIONS = 1;
+
private static int random(int i) { // for JDK 1.1 compatibility
int r = RANDOM.nextInt();
if (r < 0) r = -r;
@@ -85,7 +87,7 @@

public void run() {
try {
- for (int i = 0; i < 1024*16; i++) {
+ for (int i = 0; i < 1024*ITERATIONS; i++) {
Document d = new Document();
int n = RANDOM.nextInt();
d.add(Field.Keyword("id", Integer.toString(n)));
@@ -98,6 +100,9 @@
writer = new IndexWriter("index", ANALYZER, false);
}
}
+
+ writer.close();
+
} catch (Exception e) {
System.out.println(e.toString());
e.printStackTrace();
@@ -117,7 +122,7 @@

public void run() {
try {
- for (int i = 0; i < 1024*8; i++) {
+ for (int i = 0; i < 512*ITERATIONS; i++) {
searchFor(RANDOM.nextInt(), (searcher==null)?SEARCHER:searcher);
if (i%reopenInterval == 0) {
if (searcher == null) {
@@ -150,12 +155,24 @@

public static void main(String[] args) throws Exception {

- IndexWriter writer = new IndexWriter("index", ANALYZER, true);
+ boolean readOnly = false;
+ boolean add = false;

- Thread indexerThread = new IndexerThread(writer);
- indexerThread.start();
+ for (int i = 0; i < args.length; i++) {
+ if ("-ro".equals(args[i]))
+ readOnly = true;
+ if ("-add".equals(args[i]))
+ add = true;
+ }

- Thread.sleep(1000);
+ if (!readOnly) {
+ IndexWriter writer = new IndexWriter("index", ANALYZER, !add);
+
+ Thread indexerThread = new IndexerThread(writer);
+ indexerThread.start();
+
+ Thread.sleep(1000);
+ }

SearcherThread searcherThread1 = new SearcherThread(false);
searcherThread1.start();