diff src/com/tmate/hgkit/ll/RevlogStream.java @ 9:d6d2a630f4a6

Access to underlaying file data wrapped into own Access object, implemented with FileChannel and ByteBuffer
author Artem Tikhomirov <tikhomirov.artem@gmail.com>
date Sat, 25 Dec 2010 04:45:59 +0100
parents 5abe5af181bd
children 382cfe9463db
line wrap: on
line diff
--- a/src/com/tmate/hgkit/ll/RevlogStream.java	Thu Dec 23 01:31:40 2010 +0100
+++ b/src/com/tmate/hgkit/ll/RevlogStream.java	Sat Dec 25 04:45:59 2010 +0100
@@ -5,14 +5,12 @@
 
 import static com.tmate.hgkit.ll.HgRepository.TIP;
 
-import java.io.BufferedInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -37,30 +35,33 @@
 		this.indexFile = indexFile;
 	}
 
-	private void detectVersion() {
-		
+	/*package*/ DataAccess getIndexStream() {
+		return create(indexFile);
 	}
 
-	/*package*/ DataInput getIndexStream() {
-		DataInputStream dis = null;
-		try {
-			dis = new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile)));
-		} catch (FileNotFoundException ex) {
-			ex.printStackTrace();
-			// should not happen, we checked for existence
-		}
-		return dis;
-	}
-
-	/*package*/ DataInput getDataStream() {
+	/*package*/ DataAccess getDataStream() {
 		final String indexName = indexFile.getName();
 		File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d");
+		return create(dataFile);
+	}
+	
+	private DataAccess create(File f) {
+		if (!f.exists()) {
+			return new DataAccess();
+		}
 		try {
-			return new DataInputStream(new BufferedInputStream(new FileInputStream(dataFile)));
-		} catch (FileNotFoundException ex) {
-			ex.printStackTrace();
-			return null;
+			FileChannel fc = new FileInputStream(f).getChannel();
+			final int MAPIO_MAGIC_BOUNDARY = 100 * 1024;
+			if (fc.size() > MAPIO_MAGIC_BOUNDARY) {
+				return new MemoryMapFileAccess(fc, fc.size());
+			} else {
+				return new FileAccess(fc, fc.size());
+			}
+		} catch (IOException ex) {
+			// unlikely to happen, we've made sure file exists.
+			ex.printStackTrace(); // FIXME log error
 		}
+		return new DataAccess(); // non-null, empty.
 	}
 
 	public int revisionCount() {
@@ -68,6 +69,8 @@
 		return index.size();
 	}
 
+	private final int REVLOGV1_RECORD_SIZE = 64;
+
 	// should be possible to use TIP, ALL, or -1, -2, -n notation of Hg
 	// ? boolean needsNodeid
 	public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) {
@@ -90,10 +93,10 @@
 		}
 		// XXX may cache [start .. end] from index with a single read (pre-read)
 		
-		DataInput diIndex = null, diData = null;
-		diIndex = getIndexStream();
+		DataAccess daIndex = null, daData = null;
+		daIndex = getIndexStream();
 		if (needData && !inline) {
-			diData = getDataStream();
+			daData = getDataStream();
 		}
 		try {
 			byte[] lastData = null;
@@ -105,29 +108,30 @@
 			} else {
 				i = start;
 			}
-			diIndex.skipBytes(inline ? (int) index.get(i).offset : start * 64);
+			
+			daIndex.seek(inline ? (int) index.get(i).offset : start * REVLOGV1_RECORD_SIZE);
 			for (; i <= end; i++ ) {
-				long l = diIndex.readLong();
+				long l = daIndex.readLong();
 				long offset = l >>> 16;
 				int flags = (int) (l & 0X0FFFF);
-				int compressedLen = diIndex.readInt();
-				int actualLen = diIndex.readInt();
-				int baseRevision = diIndex.readInt();
-				int linkRevision = diIndex.readInt();
-				int parent1Revision = diIndex.readInt();
-				int parent2Revision = diIndex.readInt();
+				int compressedLen = daIndex.readInt();
+				int actualLen = daIndex.readInt();
+				int baseRevision = daIndex.readInt();
+				int linkRevision = daIndex.readInt();
+				int parent1Revision = daIndex.readInt();
+				int parent2Revision = daIndex.readInt();
 				byte[] buf = new byte[32];
 				// XXX Hg keeps 12 last bytes empty, we move them into front here
-				diIndex.readFully(buf, 12, 20);
-				diIndex.skipBytes(12);
+				daIndex.readBytes(buf, 12, 20);
+				daIndex.skip(12);
 				byte[] data = null;
 				if (needData) {
 					byte[] dataBuf = new byte[compressedLen];
 					if (inline) {
-						diIndex.readFully(dataBuf);
+						daIndex.readBytes(dataBuf, 0, compressedLen);
 					} else {
-						diData.skipBytes((int) index.get(i).offset); // FIXME not skip but seek!!! (skip would work only for the first time)
-						diData.readFully(dataBuf);
+						daData.seek(index.get(i).offset);
+						daData.readBytes(dataBuf, 0, compressedLen);
 					}
 					if (dataBuf[0] == 0x78 /* 'x' */) {
 						try {
@@ -169,7 +173,7 @@
 					}
 				} else {
 					if (inline) {
-						diIndex.skipBytes(compressedLen);
+						daIndex.skip(compressedLen);
 					}
 				}
 				if (!extraReadsToBaseRev || i >= start) {
@@ -177,13 +181,13 @@
 				}
 				lastData = data;
 			}
-		} catch (EOFException ex) {
-			// should not happen as long as we read inside known boundaries
-			throw new IllegalStateException(ex);
 		} catch (IOException ex) {
 			throw new IllegalStateException(ex); // FIXME need better handling
 		} finally {
-			hackCloseFileStreams(diIndex, diData); // FIXME HACK!!!
+			daIndex.done();
+			if (daData != null) {
+				daData.done();
+			}
 		}
 	}
 	
@@ -192,59 +196,49 @@
 			return;
 		}
 		ArrayList<IndexEntry> res = new ArrayList<IndexEntry>();
-		DataInput di = getIndexStream();
+		DataAccess da = getIndexStream();
 		try {
-			int versionField = di.readInt();
-			di.readInt(); // just to skip next 2 bytes of offset + flags
+			int versionField = da.readInt();
+			da.readInt(); // just to skip next 2 bytes of offset + flags
 			final int INLINEDATA = 1 << 16;
 			inline = (versionField & INLINEDATA) != 0;
 			long offset = 0; // first offset is always 0, thus Hg uses it for other purposes
-			while(true) { // EOFExcepiton should get us outta here. FIXME Our inputstream should has explicit no-more-data indicator
-				int compressedLen = di.readInt();
+			while(true) {
+				int compressedLen = da.readInt();
 				// 8+4 = 12 bytes total read here
-				int actualLen = di.readInt();
-				int baseRevision = di.readInt();
+				int actualLen = da.readInt();
+				int baseRevision = da.readInt();
 				// 12 + 8 = 20 bytes read here
 //				int linkRevision = di.readInt();
 //				int parent1Revision = di.readInt();
 //				int parent2Revision = di.readInt();
 //				byte[] nodeid = new byte[32];
 				if (inline) {
-					res.add(new IndexEntry(offset + 64*res.size(), baseRevision));
-					di.skipBytes(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size)
+					res.add(new IndexEntry(offset + REVLOGV1_RECORD_SIZE * res.size(), baseRevision));
+					da.skip(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size)
 				} else {
 					res.add(new IndexEntry(offset, baseRevision));
-					di.skipBytes(3*4 + 32);
+					da.skip(3*4 + 32);
 				}
-				long l = di.readLong();
-				offset = l >>> 16;
+				if (da.nonEmpty()) {
+					long l = da.readLong();
+					offset = l >>> 16;
+				} else {
+					// fine, done then
+					index = res;
+					break;
+				}
 			}
-		} catch (EOFException ex) {
-			// fine, done then
-			index = res;
 		} catch (IOException ex) {
-			ex.printStackTrace();
-			// too bad, no outline then
+			ex.printStackTrace(); // log error
+			// too bad, no outline then.
 			index = Collections.emptyList();
+		} finally {
+			da.done();
 		}
-		hackCloseFileStreams(di, null); // FIXME HACK!!!
+		
 	}
 	
-	// FIXME HACK to deal with File/FileStream nature of out data source. Won't need this once implement
-	// own DataInput based on bytearray chunks or RandomAccessFile
-	private void hackCloseFileStreams(DataInput index, DataInput data) {
-		try {
-			if (index != null) {
-				((DataInputStream) index).close();
-			}
-			if (data != null) {
-				((DataInputStream) data).close();
-			}
-		} catch (IOException ex) {
-			ex.printStackTrace();
-		}
-	}
-
 
 	// perhaps, package-local or protected, if anyone else from low-level needs them
 	// XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes?
@@ -291,4 +285,170 @@
 		}
 	}
 
+	/*package-local*/ class DataAccess {
+		public boolean nonEmpty() {
+			return false;
+		}
+		// absolute positioning
+		public void seek(long offset) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+		// relative positioning
+		public void skip(int bytes) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+		// shall be called once this object no longer needed
+		public void done() {
+			// no-op in this empty implementation
+		}
+		public int readInt() throws IOException {
+			byte[] b = new byte[4];
+			readBytes(b, 0, 4);
+			return b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF);
+		}
+		public long readLong() throws IOException {
+			byte[] b = new byte[8];
+			readBytes(b, 0, 8);
+			int i1 = b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF);
+			int i2 = b[4] << 24 | (b[5] & 0xFF) << 16 | (b[6] & 0xFF) << 8 | (b[7] & 0xFF);
+			return ((long) i1) << 32 | ((long) i2 & 0xFFFFFFFF);
+		}
+		public void readBytes(byte[] buf, int offset, int length) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	// DOESN'T WORK YET 
+	private class MemoryMapFileAccess extends DataAccess {
+		private FileChannel fileChannel;
+		private final long size;
+		private long position = 0;
+
+		public MemoryMapFileAccess(FileChannel fc, long channelSize) {
+			fileChannel = fc;
+			size = channelSize;
+		}
+
+		@Override
+		public void seek(long offset) {
+			position = offset;
+		}
+
+		@Override
+		public void skip(int bytes) throws IOException {
+			position += bytes;
+		}
+
+		private boolean fill() throws IOException {
+			final int BUFFER_SIZE = 8 * 1024;
+			long left = size - position;
+			MappedByteBuffer rv = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < BUFFER_SIZE ? left : BUFFER_SIZE);
+			position += rv.capacity();
+			return rv.hasRemaining();
+		}
+
+		@Override
+		public void done() {
+			if (fileChannel != null) {
+				try {
+					fileChannel.close();
+				} catch (IOException ex) {
+					ex.printStackTrace(); // log debug
+				}
+				fileChannel = null;
+			}
+		}
+	}
+
+	private class FileAccess extends DataAccess {
+		private FileChannel fileChannel;
+		private final long size;
+		private ByteBuffer buffer;
+		private long bufferStartInFile = 0; // offset of this.buffer in the file.
+
+		public FileAccess(FileChannel fc, long channelSize) {
+			fileChannel = fc;
+			size = channelSize;
+			final int BUFFER_SIZE = 8 * 1024;
+			// XXX once implementation is more or less stable,
+			// may want to try ByteBuffer.allocateDirect() to see
+			// if there's any performance gain.
+			buffer = ByteBuffer.allocate(size < BUFFER_SIZE ? (int) size : BUFFER_SIZE);
+			buffer.flip(); // or .limit(0) to indicate it's empty
+		}
+		
+		@Override
+		public boolean nonEmpty() {
+			return bufferStartInFile + buffer.position() < size;
+		}
+		
+		@Override
+		public void seek(long offset) throws IOException {
+			if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) {
+				buffer.position((int) (offset - bufferStartInFile));
+			} else {
+				// out of current buffer, invalidate it (force re-read) 
+				// XXX or ever re-read it right away?
+				bufferStartInFile = offset;
+				buffer.clear();
+				buffer.limit(0); // or .flip() to indicate we switch to reading
+				fileChannel.position(offset);
+			}
+		}
+
+		@Override
+		public void skip(int bytes) throws IOException {
+			final int newPos = buffer.position() + bytes;
+			if (newPos >= 0 && newPos < buffer.limit()) {
+				// no need to move file pointer, just rewind/seek buffer 
+				buffer.position(newPos);
+			} else {
+				//
+				seek(fileChannel.position()+ bytes);
+			}
+		}
+
+		private boolean fill() throws IOException {
+			if (!buffer.hasRemaining()) {
+				bufferStartInFile += buffer.limit();
+				buffer.clear();
+				if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 
+					fileChannel.read(buffer);
+					// may return -1 when EOF, but empty will reflect this, hence no explicit support here   
+				}
+				buffer.flip();
+			}
+			return buffer.hasRemaining();
+		}
+
+		@Override
+		public void readBytes(byte[] buf, int offset, int length) throws IOException {
+			final int tail = buffer.remaining();
+			if (tail >= length) {
+				buffer.get(buf, offset, length);
+			} else {
+				buffer.get(buf, offset, tail);
+				if (fill()) {
+					buffer.get(buf, offset + tail, length - tail);
+				} else {
+					throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past nonEmpty() == false are made. 
+				}
+			}
+		}
+
+		@Override
+		public void done() {
+			if (buffer != null) {
+				buffer = null;
+			}
+			if (fileChannel != null) {
+				try {
+					fileChannel.close();
+				} catch (IOException ex) {
+					ex.printStackTrace(); // log debug
+				}
+				fileChannel = null;
+			}
+		}
+	}
 }