Mercurial > jhg
diff src/com/tmate/hgkit/ll/RevlogStream.java @ 9:d6d2a630f4a6
Access to underlaying file data wrapped into own Access object, implemented with FileChannel and ByteBuffer
author | Artem Tikhomirov <tikhomirov.artem@gmail.com> |
---|---|
date | Sat, 25 Dec 2010 04:45:59 +0100 |
parents | 5abe5af181bd |
children | 382cfe9463db |
line wrap: on
line diff
--- a/src/com/tmate/hgkit/ll/RevlogStream.java Thu Dec 23 01:31:40 2010 +0100 +++ b/src/com/tmate/hgkit/ll/RevlogStream.java Sat Dec 25 04:45:59 2010 +0100 @@ -5,14 +5,12 @@ import static com.tmate.hgkit.ll.HgRepository.TIP; -import java.io.BufferedInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -37,30 +35,33 @@ this.indexFile = indexFile; } - private void detectVersion() { - + /*package*/ DataAccess getIndexStream() { + return create(indexFile); } - /*package*/ DataInput getIndexStream() { - DataInputStream dis = null; - try { - dis = new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile))); - } catch (FileNotFoundException ex) { - ex.printStackTrace(); - // should not happen, we checked for existence - } - return dis; - } - - /*package*/ DataInput getDataStream() { + /*package*/ DataAccess getDataStream() { final String indexName = indexFile.getName(); File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d"); + return create(dataFile); + } + + private DataAccess create(File f) { + if (!f.exists()) { + return new DataAccess(); + } try { - return new DataInputStream(new BufferedInputStream(new FileInputStream(dataFile))); - } catch (FileNotFoundException ex) { - ex.printStackTrace(); - return null; + FileChannel fc = new FileInputStream(f).getChannel(); + final int MAPIO_MAGIC_BOUNDARY = 100 * 1024; + if (fc.size() > MAPIO_MAGIC_BOUNDARY) { + return new MemoryMapFileAccess(fc, fc.size()); + } else { + return new FileAccess(fc, fc.size()); + } + } catch (IOException ex) { + // unlikely to happen, we've made sure file exists. + ex.printStackTrace(); // FIXME log error } + return new DataAccess(); // non-null, empty. } public int revisionCount() { @@ -68,6 +69,8 @@ return index.size(); } + private final int REVLOGV1_RECORD_SIZE = 64; + // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg // ? boolean needsNodeid public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) { @@ -90,10 +93,10 @@ } // XXX may cache [start .. end] from index with a single read (pre-read) - DataInput diIndex = null, diData = null; - diIndex = getIndexStream(); + DataAccess daIndex = null, daData = null; + daIndex = getIndexStream(); if (needData && !inline) { - diData = getDataStream(); + daData = getDataStream(); } try { byte[] lastData = null; @@ -105,29 +108,30 @@ } else { i = start; } - diIndex.skipBytes(inline ? (int) index.get(i).offset : start * 64); + + daIndex.seek(inline ? (int) index.get(i).offset : start * REVLOGV1_RECORD_SIZE); for (; i <= end; i++ ) { - long l = diIndex.readLong(); + long l = daIndex.readLong(); long offset = l >>> 16; int flags = (int) (l & 0X0FFFF); - int compressedLen = diIndex.readInt(); - int actualLen = diIndex.readInt(); - int baseRevision = diIndex.readInt(); - int linkRevision = diIndex.readInt(); - int parent1Revision = diIndex.readInt(); - int parent2Revision = diIndex.readInt(); + int compressedLen = daIndex.readInt(); + int actualLen = daIndex.readInt(); + int baseRevision = daIndex.readInt(); + int linkRevision = daIndex.readInt(); + int parent1Revision = daIndex.readInt(); + int parent2Revision = daIndex.readInt(); byte[] buf = new byte[32]; // XXX Hg keeps 12 last bytes empty, we move them into front here - diIndex.readFully(buf, 12, 20); - diIndex.skipBytes(12); + daIndex.readBytes(buf, 12, 20); + daIndex.skip(12); byte[] data = null; if (needData) { byte[] dataBuf = new byte[compressedLen]; if (inline) { - diIndex.readFully(dataBuf); + daIndex.readBytes(dataBuf, 0, compressedLen); } else { - diData.skipBytes((int) index.get(i).offset); // FIXME not skip but seek!!! (skip would work only for the first time) - diData.readFully(dataBuf); + daData.seek(index.get(i).offset); + daData.readBytes(dataBuf, 0, compressedLen); } if (dataBuf[0] == 0x78 /* 'x' */) { try { @@ -169,7 +173,7 @@ } } else { if (inline) { - diIndex.skipBytes(compressedLen); + daIndex.skip(compressedLen); } } if (!extraReadsToBaseRev || i >= start) { @@ -177,13 +181,13 @@ } lastData = data; } - } catch (EOFException ex) { - // should not happen as long as we read inside known boundaries - throw new IllegalStateException(ex); } catch (IOException ex) { throw new IllegalStateException(ex); // FIXME need better handling } finally { - hackCloseFileStreams(diIndex, diData); // FIXME HACK!!! + daIndex.done(); + if (daData != null) { + daData.done(); + } } } @@ -192,59 +196,49 @@ return; } ArrayList<IndexEntry> res = new ArrayList<IndexEntry>(); - DataInput di = getIndexStream(); + DataAccess da = getIndexStream(); try { - int versionField = di.readInt(); - di.readInt(); // just to skip next 2 bytes of offset + flags + int versionField = da.readInt(); + da.readInt(); // just to skip next 2 bytes of offset + flags final int INLINEDATA = 1 << 16; inline = (versionField & INLINEDATA) != 0; long offset = 0; // first offset is always 0, thus Hg uses it for other purposes - while(true) { // EOFExcepiton should get us outta here. FIXME Our inputstream should has explicit no-more-data indicator - int compressedLen = di.readInt(); + while(true) { + int compressedLen = da.readInt(); // 8+4 = 12 bytes total read here - int actualLen = di.readInt(); - int baseRevision = di.readInt(); + int actualLen = da.readInt(); + int baseRevision = da.readInt(); // 12 + 8 = 20 bytes read here // int linkRevision = di.readInt(); // int parent1Revision = di.readInt(); // int parent2Revision = di.readInt(); // byte[] nodeid = new byte[32]; if (inline) { - res.add(new IndexEntry(offset + 64*res.size(), baseRevision)); - di.skipBytes(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) + res.add(new IndexEntry(offset + REVLOGV1_RECORD_SIZE * res.size(), baseRevision)); + da.skip(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) } else { res.add(new IndexEntry(offset, baseRevision)); - di.skipBytes(3*4 + 32); + da.skip(3*4 + 32); } - long l = di.readLong(); - offset = l >>> 16; + if (da.nonEmpty()) { + long l = da.readLong(); + offset = l >>> 16; + } else { + // fine, done then + index = res; + break; + } } - } catch (EOFException ex) { - // fine, done then - index = res; } catch (IOException ex) { - ex.printStackTrace(); - // too bad, no outline then + ex.printStackTrace(); // log error + // too bad, no outline then. index = Collections.emptyList(); + } finally { + da.done(); } - hackCloseFileStreams(di, null); // FIXME HACK!!! + } - // FIXME HACK to deal with File/FileStream nature of out data source. Won't need this once implement - // own DataInput based on bytearray chunks or RandomAccessFile - private void hackCloseFileStreams(DataInput index, DataInput data) { - try { - if (index != null) { - ((DataInputStream) index).close(); - } - if (data != null) { - ((DataInputStream) data).close(); - } - } catch (IOException ex) { - ex.printStackTrace(); - } - } - // perhaps, package-local or protected, if anyone else from low-level needs them // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes? @@ -291,4 +285,170 @@ } } + /*package-local*/ class DataAccess { + public boolean nonEmpty() { + return false; + } + // absolute positioning + public void seek(long offset) throws IOException { + throw new UnsupportedOperationException(); + } + // relative positioning + public void skip(int bytes) throws IOException { + throw new UnsupportedOperationException(); + } + // shall be called once this object no longer needed + public void done() { + // no-op in this empty implementation + } + public int readInt() throws IOException { + byte[] b = new byte[4]; + readBytes(b, 0, 4); + return b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); + } + public long readLong() throws IOException { + byte[] b = new byte[8]; + readBytes(b, 0, 8); + int i1 = b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); + int i2 = b[4] << 24 | (b[5] & 0xFF) << 16 | (b[6] & 0xFF) << 8 | (b[7] & 0xFF); + return ((long) i1) << 32 | ((long) i2 & 0xFFFFFFFF); + } + public void readBytes(byte[] buf, int offset, int length) throws IOException { + throw new UnsupportedOperationException(); + } + } + + // DOESN'T WORK YET + private class MemoryMapFileAccess extends DataAccess { + private FileChannel fileChannel; + private final long size; + private long position = 0; + + public MemoryMapFileAccess(FileChannel fc, long channelSize) { + fileChannel = fc; + size = channelSize; + } + + @Override + public void seek(long offset) { + position = offset; + } + + @Override + public void skip(int bytes) throws IOException { + position += bytes; + } + + private boolean fill() throws IOException { + final int BUFFER_SIZE = 8 * 1024; + long left = size - position; + MappedByteBuffer rv = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < BUFFER_SIZE ? left : BUFFER_SIZE); + position += rv.capacity(); + return rv.hasRemaining(); + } + + @Override + public void done() { + if (fileChannel != null) { + try { + fileChannel.close(); + } catch (IOException ex) { + ex.printStackTrace(); // log debug + } + fileChannel = null; + } + } + } + + private class FileAccess extends DataAccess { + private FileChannel fileChannel; + private final long size; + private ByteBuffer buffer; + private long bufferStartInFile = 0; // offset of this.buffer in the file. + + public FileAccess(FileChannel fc, long channelSize) { + fileChannel = fc; + size = channelSize; + final int BUFFER_SIZE = 8 * 1024; + // XXX once implementation is more or less stable, + // may want to try ByteBuffer.allocateDirect() to see + // if there's any performance gain. + buffer = ByteBuffer.allocate(size < BUFFER_SIZE ? (int) size : BUFFER_SIZE); + buffer.flip(); // or .limit(0) to indicate it's empty + } + + @Override + public boolean nonEmpty() { + return bufferStartInFile + buffer.position() < size; + } + + @Override + public void seek(long offset) throws IOException { + if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) { + buffer.position((int) (offset - bufferStartInFile)); + } else { + // out of current buffer, invalidate it (force re-read) + // XXX or ever re-read it right away? + bufferStartInFile = offset; + buffer.clear(); + buffer.limit(0); // or .flip() to indicate we switch to reading + fileChannel.position(offset); + } + } + + @Override + public void skip(int bytes) throws IOException { + final int newPos = buffer.position() + bytes; + if (newPos >= 0 && newPos < buffer.limit()) { + // no need to move file pointer, just rewind/seek buffer + buffer.position(newPos); + } else { + // + seek(fileChannel.position()+ bytes); + } + } + + private boolean fill() throws IOException { + if (!buffer.hasRemaining()) { + bufferStartInFile += buffer.limit(); + buffer.clear(); + if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 + fileChannel.read(buffer); + // may return -1 when EOF, but empty will reflect this, hence no explicit support here + } + buffer.flip(); + } + return buffer.hasRemaining(); + } + + @Override + public void readBytes(byte[] buf, int offset, int length) throws IOException { + final int tail = buffer.remaining(); + if (tail >= length) { + buffer.get(buf, offset, length); + } else { + buffer.get(buf, offset, tail); + if (fill()) { + buffer.get(buf, offset + tail, length - tail); + } else { + throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past nonEmpty() == false are made. + } + } + } + + @Override + public void done() { + if (buffer != null) { + buffer = null; + } + if (fileChannel != null) { + try { + fileChannel.close(); + } catch (IOException ex) { + ex.printStackTrace(); // log debug + } + fileChannel = null; + } + } + } }