Mercurial > jhg
changeset 9:d6d2a630f4a6
Access to underlaying file data wrapped into own Access object, implemented with FileChannel and ByteBuffer
author | Artem Tikhomirov <tikhomirov.artem@gmail.com> |
---|---|
date | Sat, 25 Dec 2010 04:45:59 +0100 |
parents | a78c980749e3 |
children | 382cfe9463db |
files | build.xml design.txt src/com/tmate/hgkit/ll/DigestHelper.java src/com/tmate/hgkit/ll/HgRepository.java src/com/tmate/hgkit/ll/LocalHgRepo.java src/com/tmate/hgkit/ll/Nodeid.java src/com/tmate/hgkit/ll/RevlogIndexStreamAccess.java src/com/tmate/hgkit/ll/RevlogStream.java |
diffstat | 8 files changed, 332 insertions(+), 98 deletions(-) [+] |
line wrap: on
line diff
--- a/build.xml Thu Dec 23 01:31:40 2010 +0100 +++ b/build.xml Sat Dec 25 04:45:59 2010 +0100 @@ -6,9 +6,9 @@ <target name="samples" depends="build"> - <echo message="History of a specific file"/> + <echo message="History of a specific file(s)"/> <java classpath="hgkit.jar" classname="com.tmate.hgkit.console.Log"> - <arg line="design.txt"/> + <arg line="design.txt .classpath src/com/tmate/hgkit/ll/LocalHgRepo.java"/> </java> <echo message="Whole repo log"/> @@ -16,7 +16,7 @@ <echo message="Content of a file"/> <java classpath="hgkit.jar" classname="com.tmate.hgkit.console.Cat"> - <arg line="design.txt"/> + <arg line="src/com/tmate/hgkit/ll/Revlog.java"/> </java> </target>
--- a/design.txt Thu Dec 23 01:31:40 2010 +0100 +++ b/design.txt Sat Dec 25 04:45:59 2010 +0100 @@ -23,8 +23,23 @@ + support patch from baseRev + few deltas (although done in a way patches are applied one by one instead of accumulated) + command-line samples (-R, filenames) (Log & Cat) to show on any repo +buildfile + run samples +*input stream impl + lifecycle. Step forward with FileChannel and ByteBuffer, although questionable accomplishment (looks bit complicated, cumbersome) -input stream impl + lifecycle +calculate sha1 digest for file to see I can deal with nodeid delta merge Changeset to get index (local revision number) + + +>>>> Effective file read/data access +ReadOperation, Revlog does: repo.getFileSystem().run(this.file, new ReadOperation(), long start=0, long end = -1) +ReadOperation gets buffer (of whatever size, as decided by FS impl), parses it and then reports if needs more data. +This helps to ensure streams are closed after reading, allows caching (if the same file (or LRU) is read few times in sequence) +and allows buffer management (i.e. reuse. Single buffer for all reads). +Scheduling multiple operations (in future, to deal with writes - single queue for FS operations - no locks?) + +File access: +* NIO and mapped files - should be fast. Although seems to give less control on mem usage. +* Regular InputStreams and chunked stream on top - allocate List<byte[]>, each (but last) chunk of fixed size (depending on initial file size) + +<<<<< \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/com/tmate/hgkit/ll/DigestHelper.java Sat Dec 25 04:45:59 2010 +0100 @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2010 Artem Tikhomirov + */ +package com.tmate.hgkit.ll; + +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * + * @author artem + */ +public class DigestHelper { + private MessageDigest sha1; + + public DigestHelper() { + } + + private MessageDigest getSHA1() { + if (sha1 == null) { + try { + sha1 = MessageDigest.getInstance("SHA-1"); + } catch (NoSuchAlgorithmException ex) { + // could hardly happen, JDK from Sun always has sha1. + ex.printStackTrace(); // FIXME log error + } + } + return sha1; + } + + // XXX perhaps, digest functions should throw an exception, as it's caller responsibility to deal with eof, etc + public String sha1(byte[] data) { + MessageDigest alg = getSHA1(); + byte[] digest = alg.digest(data); + assert digest.length == 20; + return toHexString(digest, 0, 20); + } + + public byte[] sha1(InputStream is /*ByteBuffer*/) throws IOException { + MessageDigest alg = getSHA1(); + byte[] buf = new byte[1024]; + int c; + while ((c = is.read(buf)) != -1) { + alg.update(buf, 0, c); + } + byte[] digest = alg.digest(); + return digest; + } + + public String toHexString(byte[] data, final int offset, final int count) { + char[] result = new char[count << 1]; + final String hexDigits = "0123456789abcdef"; + final int end = offset+count; + for (int i = offset, j = 0; i < end; i++) { + result[j++] = hexDigits.charAt((data[i] >>> 4) & 0x0F); + result[j++] = hexDigits.charAt(data[i] & 0x0F); + } + return new String(result); + } +}
--- a/src/com/tmate/hgkit/ll/HgRepository.java Thu Dec 23 01:31:40 2010 +0100 +++ b/src/com/tmate/hgkit/ll/HgRepository.java Sat Dec 25 04:45:59 2010 +0100 @@ -30,16 +30,10 @@ isInvalid = invalid; } - public void log() { - Revlog clog = getChangelog(); - assert clog != null; - // TODO get data to the client - } - public final Changelog getChangelog() { if (this.changelog == null) { // might want delegate to protected createChangelog() some day - RevlogStream content = resolve("store/00changelog.i"); // XXX perhaps, knowledge about filenames should be in LocalHgRepo? + RevlogStream content = resolve(toStoragePath("00changelog.i", false)); // XXX perhaps, knowledge about filenames should be in LocalHgRepo? this.changelog = new Changelog(this, content); } return this.changelog; @@ -61,8 +55,10 @@ public abstract String getLocation(); + protected abstract String toStoragePath(String path, boolean isData); + /** * Perhaps, should be separate interface, like ContentLookup */ - protected abstract RevlogStream resolve(String string); + protected abstract RevlogStream resolve(String repositoryPath); }
--- a/src/com/tmate/hgkit/ll/LocalHgRepo.java Thu Dec 23 01:31:40 2010 +0100 +++ b/src/com/tmate/hgkit/ll/LocalHgRepo.java Sat Dec 25 04:45:59 2010 +0100 @@ -94,8 +94,12 @@ } } + // FIXME document what path argument is, whether it includes .i or .d, and whether it's 'normalized' (slashes) or not. + // since .hg/store keeps both .i files and files without extension (e.g. fncache), guees, for data == false + // we shall assume path has extension // FIXME much more to be done, see store.py:_hybridencode // @see http://mercurial.selenic.com/wiki/CaseFoldingPlan + @Override protected String toStoragePath(String path, boolean data) { path = normalize(path); final String STR_STORE = "store/"; @@ -183,11 +187,12 @@ private static char[] toHexByte(int ch, char[] buf) { assert buf.length > 1; final String hexDigits = "0123456789abcdef"; - buf[0] = hexDigits.charAt((ch & 0x00F0) >> 4); + buf[0] = hexDigits.charAt((ch & 0x00F0) >>> 4); buf[1] = hexDigits.charAt(ch & 0x0F); return buf; } + // TODO handle . and .. (although unlikely to face them from GUI client) private static String normalize(String path) { path = path.replace('\\', '/').replace("//", "/"); if (path.startsWith("/")) {
--- a/src/com/tmate/hgkit/ll/Nodeid.java Thu Dec 23 01:31:40 2010 +0100 +++ b/src/com/tmate/hgkit/ll/Nodeid.java Sat Dec 25 04:45:59 2010 +0100 @@ -3,8 +3,6 @@ */ package com.tmate.hgkit.ll; -import java.math.BigInteger; -import java.util.Formatter; /** * @see mercurial/node.py @@ -24,15 +22,13 @@ @Override public String toString() { - // FIXME temp impl. - // BEWARE, if binaryData[0] > 0x80, BigInteger treats it as negative - return new BigInteger(binaryData).toString(); + return new DigestHelper().toHexString(binaryData, 0, 20); } // binascii.unhexlify() public static Nodeid fromAscii(byte[] asciiRepresentation, int offset, int length) { assert length % 2 == 0; // Python's binascii.hexlify convert each byte into 2 digits - byte[] data = new byte[length / 2]; // XXX use known size instead? nodeid is always 20 bytes + byte[] data = new byte[length >>> 1]; // XXX use known size instead? nodeid is always 20 bytes for (int i = 0, j = offset; i < data.length; i++) { int hiNibble = Character.digit(asciiRepresentation[j++], 16); int lowNibble = Character.digit(asciiRepresentation[j++], 16);
--- a/src/com/tmate/hgkit/ll/RevlogIndexStreamAccess.java Thu Dec 23 01:31:40 2010 +0100 +++ b/src/com/tmate/hgkit/ll/RevlogIndexStreamAccess.java Sat Dec 25 04:45:59 2010 +0100 @@ -24,7 +24,7 @@ void readRevlogV0Record() throws IOException { - DataInput di = stream.getIndexStream(); + DataInput di = null; //FIXME stream.getIndexStream(); int offset = di.readInt(); int compressedLen = di.readInt(); int baseRevision = di.readInt(); @@ -42,7 +42,7 @@ // another subclass? void readRevlogNGRecord() throws IOException { - DataInput di = stream.getIndexStream(); + DataInput di = null; //FIXME stream.getIndexStream(); long l = di.readLong(); long offset = l >>> 16; int flags = (int) (l & 0X0FFFF);
--- a/src/com/tmate/hgkit/ll/RevlogStream.java Thu Dec 23 01:31:40 2010 +0100 +++ b/src/com/tmate/hgkit/ll/RevlogStream.java Sat Dec 25 04:45:59 2010 +0100 @@ -5,14 +5,12 @@ import static com.tmate.hgkit.ll.HgRepository.TIP; -import java.io.BufferedInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -37,30 +35,33 @@ this.indexFile = indexFile; } - private void detectVersion() { - + /*package*/ DataAccess getIndexStream() { + return create(indexFile); } - /*package*/ DataInput getIndexStream() { - DataInputStream dis = null; - try { - dis = new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile))); - } catch (FileNotFoundException ex) { - ex.printStackTrace(); - // should not happen, we checked for existence - } - return dis; - } - - /*package*/ DataInput getDataStream() { + /*package*/ DataAccess getDataStream() { final String indexName = indexFile.getName(); File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d"); + return create(dataFile); + } + + private DataAccess create(File f) { + if (!f.exists()) { + return new DataAccess(); + } try { - return new DataInputStream(new BufferedInputStream(new FileInputStream(dataFile))); - } catch (FileNotFoundException ex) { - ex.printStackTrace(); - return null; + FileChannel fc = new FileInputStream(f).getChannel(); + final int MAPIO_MAGIC_BOUNDARY = 100 * 1024; + if (fc.size() > MAPIO_MAGIC_BOUNDARY) { + return new MemoryMapFileAccess(fc, fc.size()); + } else { + return new FileAccess(fc, fc.size()); + } + } catch (IOException ex) { + // unlikely to happen, we've made sure file exists. + ex.printStackTrace(); // FIXME log error } + return new DataAccess(); // non-null, empty. } public int revisionCount() { @@ -68,6 +69,8 @@ return index.size(); } + private final int REVLOGV1_RECORD_SIZE = 64; + // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg // ? boolean needsNodeid public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) { @@ -90,10 +93,10 @@ } // XXX may cache [start .. end] from index with a single read (pre-read) - DataInput diIndex = null, diData = null; - diIndex = getIndexStream(); + DataAccess daIndex = null, daData = null; + daIndex = getIndexStream(); if (needData && !inline) { - diData = getDataStream(); + daData = getDataStream(); } try { byte[] lastData = null; @@ -105,29 +108,30 @@ } else { i = start; } - diIndex.skipBytes(inline ? (int) index.get(i).offset : start * 64); + + daIndex.seek(inline ? (int) index.get(i).offset : start * REVLOGV1_RECORD_SIZE); for (; i <= end; i++ ) { - long l = diIndex.readLong(); + long l = daIndex.readLong(); long offset = l >>> 16; int flags = (int) (l & 0X0FFFF); - int compressedLen = diIndex.readInt(); - int actualLen = diIndex.readInt(); - int baseRevision = diIndex.readInt(); - int linkRevision = diIndex.readInt(); - int parent1Revision = diIndex.readInt(); - int parent2Revision = diIndex.readInt(); + int compressedLen = daIndex.readInt(); + int actualLen = daIndex.readInt(); + int baseRevision = daIndex.readInt(); + int linkRevision = daIndex.readInt(); + int parent1Revision = daIndex.readInt(); + int parent2Revision = daIndex.readInt(); byte[] buf = new byte[32]; // XXX Hg keeps 12 last bytes empty, we move them into front here - diIndex.readFully(buf, 12, 20); - diIndex.skipBytes(12); + daIndex.readBytes(buf, 12, 20); + daIndex.skip(12); byte[] data = null; if (needData) { byte[] dataBuf = new byte[compressedLen]; if (inline) { - diIndex.readFully(dataBuf); + daIndex.readBytes(dataBuf, 0, compressedLen); } else { - diData.skipBytes((int) index.get(i).offset); // FIXME not skip but seek!!! (skip would work only for the first time) - diData.readFully(dataBuf); + daData.seek(index.get(i).offset); + daData.readBytes(dataBuf, 0, compressedLen); } if (dataBuf[0] == 0x78 /* 'x' */) { try { @@ -169,7 +173,7 @@ } } else { if (inline) { - diIndex.skipBytes(compressedLen); + daIndex.skip(compressedLen); } } if (!extraReadsToBaseRev || i >= start) { @@ -177,13 +181,13 @@ } lastData = data; } - } catch (EOFException ex) { - // should not happen as long as we read inside known boundaries - throw new IllegalStateException(ex); } catch (IOException ex) { throw new IllegalStateException(ex); // FIXME need better handling } finally { - hackCloseFileStreams(diIndex, diData); // FIXME HACK!!! + daIndex.done(); + if (daData != null) { + daData.done(); + } } } @@ -192,59 +196,49 @@ return; } ArrayList<IndexEntry> res = new ArrayList<IndexEntry>(); - DataInput di = getIndexStream(); + DataAccess da = getIndexStream(); try { - int versionField = di.readInt(); - di.readInt(); // just to skip next 2 bytes of offset + flags + int versionField = da.readInt(); + da.readInt(); // just to skip next 2 bytes of offset + flags final int INLINEDATA = 1 << 16; inline = (versionField & INLINEDATA) != 0; long offset = 0; // first offset is always 0, thus Hg uses it for other purposes - while(true) { // EOFExcepiton should get us outta here. FIXME Our inputstream should has explicit no-more-data indicator - int compressedLen = di.readInt(); + while(true) { + int compressedLen = da.readInt(); // 8+4 = 12 bytes total read here - int actualLen = di.readInt(); - int baseRevision = di.readInt(); + int actualLen = da.readInt(); + int baseRevision = da.readInt(); // 12 + 8 = 20 bytes read here // int linkRevision = di.readInt(); // int parent1Revision = di.readInt(); // int parent2Revision = di.readInt(); // byte[] nodeid = new byte[32]; if (inline) { - res.add(new IndexEntry(offset + 64*res.size(), baseRevision)); - di.skipBytes(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) + res.add(new IndexEntry(offset + REVLOGV1_RECORD_SIZE * res.size(), baseRevision)); + da.skip(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) } else { res.add(new IndexEntry(offset, baseRevision)); - di.skipBytes(3*4 + 32); + da.skip(3*4 + 32); } - long l = di.readLong(); - offset = l >>> 16; + if (da.nonEmpty()) { + long l = da.readLong(); + offset = l >>> 16; + } else { + // fine, done then + index = res; + break; + } } - } catch (EOFException ex) { - // fine, done then - index = res; } catch (IOException ex) { - ex.printStackTrace(); - // too bad, no outline then + ex.printStackTrace(); // log error + // too bad, no outline then. index = Collections.emptyList(); + } finally { + da.done(); } - hackCloseFileStreams(di, null); // FIXME HACK!!! + } - // FIXME HACK to deal with File/FileStream nature of out data source. Won't need this once implement - // own DataInput based on bytearray chunks or RandomAccessFile - private void hackCloseFileStreams(DataInput index, DataInput data) { - try { - if (index != null) { - ((DataInputStream) index).close(); - } - if (data != null) { - ((DataInputStream) data).close(); - } - } catch (IOException ex) { - ex.printStackTrace(); - } - } - // perhaps, package-local or protected, if anyone else from low-level needs them // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes? @@ -291,4 +285,170 @@ } } + /*package-local*/ class DataAccess { + public boolean nonEmpty() { + return false; + } + // absolute positioning + public void seek(long offset) throws IOException { + throw new UnsupportedOperationException(); + } + // relative positioning + public void skip(int bytes) throws IOException { + throw new UnsupportedOperationException(); + } + // shall be called once this object no longer needed + public void done() { + // no-op in this empty implementation + } + public int readInt() throws IOException { + byte[] b = new byte[4]; + readBytes(b, 0, 4); + return b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); + } + public long readLong() throws IOException { + byte[] b = new byte[8]; + readBytes(b, 0, 8); + int i1 = b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); + int i2 = b[4] << 24 | (b[5] & 0xFF) << 16 | (b[6] & 0xFF) << 8 | (b[7] & 0xFF); + return ((long) i1) << 32 | ((long) i2 & 0xFFFFFFFF); + } + public void readBytes(byte[] buf, int offset, int length) throws IOException { + throw new UnsupportedOperationException(); + } + } + + // DOESN'T WORK YET + private class MemoryMapFileAccess extends DataAccess { + private FileChannel fileChannel; + private final long size; + private long position = 0; + + public MemoryMapFileAccess(FileChannel fc, long channelSize) { + fileChannel = fc; + size = channelSize; + } + + @Override + public void seek(long offset) { + position = offset; + } + + @Override + public void skip(int bytes) throws IOException { + position += bytes; + } + + private boolean fill() throws IOException { + final int BUFFER_SIZE = 8 * 1024; + long left = size - position; + MappedByteBuffer rv = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < BUFFER_SIZE ? left : BUFFER_SIZE); + position += rv.capacity(); + return rv.hasRemaining(); + } + + @Override + public void done() { + if (fileChannel != null) { + try { + fileChannel.close(); + } catch (IOException ex) { + ex.printStackTrace(); // log debug + } + fileChannel = null; + } + } + } + + private class FileAccess extends DataAccess { + private FileChannel fileChannel; + private final long size; + private ByteBuffer buffer; + private long bufferStartInFile = 0; // offset of this.buffer in the file. + + public FileAccess(FileChannel fc, long channelSize) { + fileChannel = fc; + size = channelSize; + final int BUFFER_SIZE = 8 * 1024; + // XXX once implementation is more or less stable, + // may want to try ByteBuffer.allocateDirect() to see + // if there's any performance gain. + buffer = ByteBuffer.allocate(size < BUFFER_SIZE ? (int) size : BUFFER_SIZE); + buffer.flip(); // or .limit(0) to indicate it's empty + } + + @Override + public boolean nonEmpty() { + return bufferStartInFile + buffer.position() < size; + } + + @Override + public void seek(long offset) throws IOException { + if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) { + buffer.position((int) (offset - bufferStartInFile)); + } else { + // out of current buffer, invalidate it (force re-read) + // XXX or ever re-read it right away? + bufferStartInFile = offset; + buffer.clear(); + buffer.limit(0); // or .flip() to indicate we switch to reading + fileChannel.position(offset); + } + } + + @Override + public void skip(int bytes) throws IOException { + final int newPos = buffer.position() + bytes; + if (newPos >= 0 && newPos < buffer.limit()) { + // no need to move file pointer, just rewind/seek buffer + buffer.position(newPos); + } else { + // + seek(fileChannel.position()+ bytes); + } + } + + private boolean fill() throws IOException { + if (!buffer.hasRemaining()) { + bufferStartInFile += buffer.limit(); + buffer.clear(); + if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 + fileChannel.read(buffer); + // may return -1 when EOF, but empty will reflect this, hence no explicit support here + } + buffer.flip(); + } + return buffer.hasRemaining(); + } + + @Override + public void readBytes(byte[] buf, int offset, int length) throws IOException { + final int tail = buffer.remaining(); + if (tail >= length) { + buffer.get(buf, offset, length); + } else { + buffer.get(buf, offset, tail); + if (fill()) { + buffer.get(buf, offset + tail, length - tail); + } else { + throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past nonEmpty() == false are made. + } + } + } + + @Override + public void done() { + if (buffer != null) { + buffer = null; + } + if (fileChannel != null) { + try { + fileChannel.close(); + } catch (IOException ex) { + ex.printStackTrace(); // log debug + } + fileChannel = null; + } + } + } }