tikhomirov@0: /** tikhomirov@0: * Copyright (c) 2010 Artem Tikhomirov tikhomirov@0: */ tikhomirov@0: package com.tmate.hgkit.ll; tikhomirov@0: tikhomirov@5: import static com.tmate.hgkit.ll.HgRepository.TIP; tikhomirov@5: tikhomirov@3: import java.io.File; tikhomirov@3: import java.io.FileInputStream; tikhomirov@2: import java.io.IOException; tikhomirov@9: import java.nio.ByteBuffer; tikhomirov@9: import java.nio.MappedByteBuffer; tikhomirov@9: import java.nio.channels.FileChannel; tikhomirov@2: import java.util.ArrayList; tikhomirov@2: import java.util.Collections; tikhomirov@3: import java.util.LinkedList; tikhomirov@2: import java.util.List; tikhomirov@3: import java.util.zip.DataFormatException; tikhomirov@2: import java.util.zip.Inflater; tikhomirov@0: tikhomirov@0: /** tikhomirov@0: * ? Single RevlogStream per file per repository with accessor to record access session (e.g. with back/forward operations), tikhomirov@0: * or numerous RevlogStream with separate representation of the underlaying data (cached, lazy ChunkStream)? tikhomirov@0: * @author artem tikhomirov@0: * @see http://mercurial.selenic.com/wiki/Revlog tikhomirov@0: * @see http://mercurial.selenic.com/wiki/RevlogNG tikhomirov@0: */ tikhomirov@0: public class RevlogStream { tikhomirov@2: tikhomirov@2: private List index; // indexed access highly needed tikhomirov@2: private boolean inline = false; tikhomirov@3: private final File indexFile; tikhomirov@3: tikhomirov@3: RevlogStream(File indexFile) { tikhomirov@3: this.indexFile = indexFile; tikhomirov@3: } tikhomirov@2: tikhomirov@9: /*package*/ DataAccess getIndexStream() { tikhomirov@9: return create(indexFile); tikhomirov@0: } tikhomirov@0: tikhomirov@9: /*package*/ DataAccess getDataStream() { tikhomirov@3: final String indexName = indexFile.getName(); tikhomirov@3: File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d"); tikhomirov@9: return create(dataFile); tikhomirov@9: } tikhomirov@9: tikhomirov@9: private DataAccess create(File f) { tikhomirov@9: if (!f.exists()) { tikhomirov@9: return new DataAccess(); tikhomirov@9: } tikhomirov@3: try { tikhomirov@9: FileChannel fc = new FileInputStream(f).getChannel(); tikhomirov@9: final int MAPIO_MAGIC_BOUNDARY = 100 * 1024; tikhomirov@9: if (fc.size() > MAPIO_MAGIC_BOUNDARY) { tikhomirov@9: return new MemoryMapFileAccess(fc, fc.size()); tikhomirov@9: } else { tikhomirov@9: return new FileAccess(fc, fc.size()); tikhomirov@9: } tikhomirov@9: } catch (IOException ex) { tikhomirov@9: // unlikely to happen, we've made sure file exists. tikhomirov@9: ex.printStackTrace(); // FIXME log error tikhomirov@3: } tikhomirov@9: return new DataAccess(); // non-null, empty. tikhomirov@0: } tikhomirov@3: tikhomirov@2: public int revisionCount() { tikhomirov@2: initOutline(); tikhomirov@2: return index.size(); tikhomirov@2: } tikhomirov@2: tikhomirov@9: private final int REVLOGV1_RECORD_SIZE = 64; tikhomirov@9: tikhomirov@3: // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg tikhomirov@3: // ? boolean needsNodeid tikhomirov@2: public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) { tikhomirov@2: initOutline(); tikhomirov@2: final int indexSize = index.size(); tikhomirov@3: if (indexSize == 0) { tikhomirov@3: return; tikhomirov@3: } tikhomirov@5: if (end == TIP) { tikhomirov@3: end = indexSize - 1; tikhomirov@3: } tikhomirov@5: if (start == TIP) { tikhomirov@5: start = indexSize - 1; tikhomirov@5: } tikhomirov@2: if (start < 0 || start >= indexSize) { tikhomirov@2: throw new IllegalArgumentException("Bad left range boundary " + start); tikhomirov@2: } tikhomirov@2: if (end < start || end >= indexSize) { tikhomirov@2: throw new IllegalArgumentException("Bad right range boundary " + end); tikhomirov@2: } tikhomirov@2: // XXX may cache [start .. end] from index with a single read (pre-read) tikhomirov@2: tikhomirov@9: DataAccess daIndex = null, daData = null; tikhomirov@9: daIndex = getIndexStream(); tikhomirov@3: if (needData && !inline) { tikhomirov@9: daData = getDataStream(); tikhomirov@2: } tikhomirov@2: try { tikhomirov@3: byte[] lastData = null; tikhomirov@5: int i; tikhomirov@5: boolean extraReadsToBaseRev = false; tikhomirov@5: if (needData && index.get(start).baseRevision < start) { tikhomirov@5: i = index.get(start).baseRevision; tikhomirov@5: extraReadsToBaseRev = true; tikhomirov@5: } else { tikhomirov@5: i = start; tikhomirov@5: } tikhomirov@9: tikhomirov@9: daIndex.seek(inline ? (int) index.get(i).offset : start * REVLOGV1_RECORD_SIZE); tikhomirov@5: for (; i <= end; i++ ) { tikhomirov@9: long l = daIndex.readLong(); tikhomirov@2: long offset = l >>> 16; tikhomirov@2: int flags = (int) (l & 0X0FFFF); tikhomirov@9: int compressedLen = daIndex.readInt(); tikhomirov@9: int actualLen = daIndex.readInt(); tikhomirov@9: int baseRevision = daIndex.readInt(); tikhomirov@9: int linkRevision = daIndex.readInt(); tikhomirov@9: int parent1Revision = daIndex.readInt(); tikhomirov@9: int parent2Revision = daIndex.readInt(); tikhomirov@2: byte[] buf = new byte[32]; tikhomirov@2: // XXX Hg keeps 12 last bytes empty, we move them into front here tikhomirov@9: daIndex.readBytes(buf, 12, 20); tikhomirov@9: daIndex.skip(12); tikhomirov@2: byte[] data = null; tikhomirov@2: if (needData) { tikhomirov@2: byte[] dataBuf = new byte[compressedLen]; tikhomirov@2: if (inline) { tikhomirov@9: daIndex.readBytes(dataBuf, 0, compressedLen); tikhomirov@2: } else { tikhomirov@9: daData.seek(index.get(i).offset); tikhomirov@9: daData.readBytes(dataBuf, 0, compressedLen); tikhomirov@2: } tikhomirov@2: if (dataBuf[0] == 0x78 /* 'x' */) { tikhomirov@3: try { tikhomirov@3: Inflater zlib = new Inflater(); tikhomirov@3: zlib.setInput(dataBuf, 0, compressedLen); tikhomirov@3: byte[] result = new byte[actualLen*2]; // FIXME need to use zlib.finished() instead tikhomirov@3: int resultLen = zlib.inflate(result); tikhomirov@3: zlib.end(); tikhomirov@3: data = new byte[resultLen]; tikhomirov@3: System.arraycopy(result, 0, data, 0, resultLen); tikhomirov@3: } catch (DataFormatException ex) { tikhomirov@3: ex.printStackTrace(); tikhomirov@3: data = new byte[0]; // FIXME need better failure strategy tikhomirov@3: } tikhomirov@2: } else if (dataBuf[0] == 0x75 /* 'u' */) { tikhomirov@2: data = new byte[dataBuf.length - 1]; tikhomirov@2: System.arraycopy(dataBuf, 1, data, 0, data.length); tikhomirov@2: } else { tikhomirov@2: // XXX Python impl in fact throws exception when there's not 'x', 'u' or '0' tikhomirov@3: // but I don't see reason not to return data as is tikhomirov@2: data = dataBuf; tikhomirov@2: } tikhomirov@3: // XXX tikhomirov@3: if (baseRevision != i) { // XXX not sure if this is the right way to detect a patch tikhomirov@3: // this is a patch tikhomirov@3: LinkedList patches = new LinkedList(); tikhomirov@3: int patchElementIndex = 0; tikhomirov@3: do { tikhomirov@3: final int x = patchElementIndex; // shorthand tikhomirov@5: int p1 = ((data[x] & 0xFF)<< 24) | ((data[x+1] & 0xFF) << 16) | ((data[x+2] & 0xFF) << 8) | (data[x+3] & 0xFF); tikhomirov@5: int p2 = ((data[x+4] & 0xFF) << 24) | ((data[x+5] & 0xFF) << 16) | ((data[x+6] & 0xFF) << 8) | (data[x+7] & 0xFF); tikhomirov@5: int len = ((data[x+8] & 0xFF) << 24) | ((data[x+9] & 0xFF) << 16) | ((data[x+10] & 0xFF) << 8) | (data[x+11] & 0xFF); tikhomirov@3: patchElementIndex += 12 + len; tikhomirov@3: patches.add(new PatchRecord(p1, p2, len, data, x+12)); tikhomirov@3: } while (patchElementIndex < data.length); tikhomirov@3: // tikhomirov@5: byte[] baseRevContent = lastData; tikhomirov@6: data = apply(baseRevContent, actualLen, patches); tikhomirov@3: } tikhomirov@3: } else { tikhomirov@3: if (inline) { tikhomirov@9: daIndex.skip(compressedLen); tikhomirov@3: } tikhomirov@2: } tikhomirov@5: if (!extraReadsToBaseRev || i >= start) { tikhomirov@5: inspector.next(i, actualLen, baseRevision, linkRevision, parent1Revision, parent2Revision, buf, data); tikhomirov@5: } tikhomirov@3: lastData = data; tikhomirov@2: } tikhomirov@2: } catch (IOException ex) { tikhomirov@3: throw new IllegalStateException(ex); // FIXME need better handling tikhomirov@3: } finally { tikhomirov@9: daIndex.done(); tikhomirov@9: if (daData != null) { tikhomirov@9: daData.done(); tikhomirov@9: } tikhomirov@2: } tikhomirov@2: } tikhomirov@0: tikhomirov@2: private void initOutline() { tikhomirov@2: if (index != null && !index.isEmpty()) { tikhomirov@2: return; tikhomirov@2: } tikhomirov@2: ArrayList res = new ArrayList(); tikhomirov@9: DataAccess da = getIndexStream(); tikhomirov@2: try { tikhomirov@9: int versionField = da.readInt(); tikhomirov@9: da.readInt(); // just to skip next 2 bytes of offset + flags tikhomirov@2: final int INLINEDATA = 1 << 16; tikhomirov@2: inline = (versionField & INLINEDATA) != 0; tikhomirov@2: long offset = 0; // first offset is always 0, thus Hg uses it for other purposes tikhomirov@9: while(true) { tikhomirov@9: int compressedLen = da.readInt(); tikhomirov@5: // 8+4 = 12 bytes total read here tikhomirov@9: int actualLen = da.readInt(); tikhomirov@9: int baseRevision = da.readInt(); tikhomirov@5: // 12 + 8 = 20 bytes read here tikhomirov@2: // int linkRevision = di.readInt(); tikhomirov@2: // int parent1Revision = di.readInt(); tikhomirov@2: // int parent2Revision = di.readInt(); tikhomirov@2: // byte[] nodeid = new byte[32]; tikhomirov@2: if (inline) { tikhomirov@9: res.add(new IndexEntry(offset + REVLOGV1_RECORD_SIZE * res.size(), baseRevision)); tikhomirov@9: da.skip(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) tikhomirov@2: } else { tikhomirov@5: res.add(new IndexEntry(offset, baseRevision)); tikhomirov@9: da.skip(3*4 + 32); tikhomirov@2: } tikhomirov@9: if (da.nonEmpty()) { tikhomirov@9: long l = da.readLong(); tikhomirov@9: offset = l >>> 16; tikhomirov@9: } else { tikhomirov@9: // fine, done then tikhomirov@9: index = res; tikhomirov@9: break; tikhomirov@9: } tikhomirov@2: } tikhomirov@2: } catch (IOException ex) { tikhomirov@9: ex.printStackTrace(); // log error tikhomirov@9: // too bad, no outline then. tikhomirov@2: index = Collections.emptyList(); tikhomirov@9: } finally { tikhomirov@9: da.done(); tikhomirov@2: } tikhomirov@9: tikhomirov@3: } tikhomirov@3: tikhomirov@2: tikhomirov@2: // perhaps, package-local or protected, if anyone else from low-level needs them tikhomirov@5: // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes? tikhomirov@2: private static class IndexEntry { tikhomirov@4: public final long offset; // for separate .i and .d - copy of index record entry, for inline index - actual offset of the record in the .i file (record entry + revision * record size)) tikhomirov@5: //public final int length; // data past fixed record (need to decide whether including header size or not), and whether length is of compressed data or not tikhomirov@5: public final int baseRevision; tikhomirov@2: tikhomirov@5: public IndexEntry(long o, int baseRev) { tikhomirov@2: offset = o; tikhomirov@5: baseRevision = baseRev; tikhomirov@2: } tikhomirov@2: } tikhomirov@3: tikhomirov@3: // mpatch.c : apply() tikhomirov@3: // FIXME need to implement patch merge (fold, combine, gather and discard from aforementioned mpatch.[c|py]), also see Revlog and Mercurial PDF tikhomirov@6: private static byte[] apply(byte[] baseRevisionContent, int outcomeLen, List patch) { tikhomirov@6: byte[] tempBuf = new byte[outcomeLen]; // XXX tikhomirov@3: int last = 0, destIndex = 0; tikhomirov@3: for (PatchRecord pr : patch) { tikhomirov@3: System.arraycopy(baseRevisionContent, last, tempBuf, destIndex, pr.start-last); tikhomirov@3: destIndex += pr.start - last; tikhomirov@3: System.arraycopy(pr.data, 0, tempBuf, destIndex, pr.data.length); tikhomirov@3: destIndex += pr.data.length; tikhomirov@3: last = pr.end; tikhomirov@3: } tikhomirov@3: System.arraycopy(baseRevisionContent, last, tempBuf, destIndex, baseRevisionContent.length - last); tikhomirov@3: destIndex += baseRevisionContent.length - last; // total length tikhomirov@3: byte[] rv = new byte[destIndex]; tikhomirov@3: System.arraycopy(tempBuf, 0, rv, 0, destIndex); tikhomirov@3: return rv; tikhomirov@3: } tikhomirov@3: tikhomirov@3: static class PatchRecord { // copy of struct frag from mpatch.c tikhomirov@3: int start, end, len; tikhomirov@3: byte[] data; tikhomirov@3: tikhomirov@3: public PatchRecord(int p1, int p2, int len, byte[] src, int srcOffset) { tikhomirov@3: start = p1; tikhomirov@3: end = p2; tikhomirov@3: this.len = len; tikhomirov@3: data = new byte[len]; tikhomirov@3: System.arraycopy(src, srcOffset, data, 0, len); tikhomirov@3: } tikhomirov@3: } tikhomirov@3: tikhomirov@9: /*package-local*/ class DataAccess { tikhomirov@9: public boolean nonEmpty() { tikhomirov@9: return false; tikhomirov@9: } tikhomirov@9: // absolute positioning tikhomirov@9: public void seek(long offset) throws IOException { tikhomirov@9: throw new UnsupportedOperationException(); tikhomirov@9: } tikhomirov@9: // relative positioning tikhomirov@9: public void skip(int bytes) throws IOException { tikhomirov@9: throw new UnsupportedOperationException(); tikhomirov@9: } tikhomirov@9: // shall be called once this object no longer needed tikhomirov@9: public void done() { tikhomirov@9: // no-op in this empty implementation tikhomirov@9: } tikhomirov@9: public int readInt() throws IOException { tikhomirov@9: byte[] b = new byte[4]; tikhomirov@9: readBytes(b, 0, 4); tikhomirov@9: return b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); tikhomirov@9: } tikhomirov@9: public long readLong() throws IOException { tikhomirov@9: byte[] b = new byte[8]; tikhomirov@9: readBytes(b, 0, 8); tikhomirov@9: int i1 = b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); tikhomirov@9: int i2 = b[4] << 24 | (b[5] & 0xFF) << 16 | (b[6] & 0xFF) << 8 | (b[7] & 0xFF); tikhomirov@9: return ((long) i1) << 32 | ((long) i2 & 0xFFFFFFFF); tikhomirov@9: } tikhomirov@9: public void readBytes(byte[] buf, int offset, int length) throws IOException { tikhomirov@9: throw new UnsupportedOperationException(); tikhomirov@9: } tikhomirov@9: } tikhomirov@9: tikhomirov@9: // DOESN'T WORK YET tikhomirov@9: private class MemoryMapFileAccess extends DataAccess { tikhomirov@9: private FileChannel fileChannel; tikhomirov@9: private final long size; tikhomirov@9: private long position = 0; tikhomirov@9: tikhomirov@9: public MemoryMapFileAccess(FileChannel fc, long channelSize) { tikhomirov@9: fileChannel = fc; tikhomirov@9: size = channelSize; tikhomirov@9: } tikhomirov@9: tikhomirov@9: @Override tikhomirov@9: public void seek(long offset) { tikhomirov@9: position = offset; tikhomirov@9: } tikhomirov@9: tikhomirov@9: @Override tikhomirov@9: public void skip(int bytes) throws IOException { tikhomirov@9: position += bytes; tikhomirov@9: } tikhomirov@9: tikhomirov@9: private boolean fill() throws IOException { tikhomirov@9: final int BUFFER_SIZE = 8 * 1024; tikhomirov@9: long left = size - position; tikhomirov@9: MappedByteBuffer rv = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < BUFFER_SIZE ? left : BUFFER_SIZE); tikhomirov@9: position += rv.capacity(); tikhomirov@9: return rv.hasRemaining(); tikhomirov@9: } tikhomirov@9: tikhomirov@9: @Override tikhomirov@9: public void done() { tikhomirov@9: if (fileChannel != null) { tikhomirov@9: try { tikhomirov@9: fileChannel.close(); tikhomirov@9: } catch (IOException ex) { tikhomirov@9: ex.printStackTrace(); // log debug tikhomirov@9: } tikhomirov@9: fileChannel = null; tikhomirov@9: } tikhomirov@9: } tikhomirov@9: } tikhomirov@9: tikhomirov@9: private class FileAccess extends DataAccess { tikhomirov@9: private FileChannel fileChannel; tikhomirov@9: private final long size; tikhomirov@9: private ByteBuffer buffer; tikhomirov@9: private long bufferStartInFile = 0; // offset of this.buffer in the file. tikhomirov@9: tikhomirov@9: public FileAccess(FileChannel fc, long channelSize) { tikhomirov@9: fileChannel = fc; tikhomirov@9: size = channelSize; tikhomirov@9: final int BUFFER_SIZE = 8 * 1024; tikhomirov@9: // XXX once implementation is more or less stable, tikhomirov@9: // may want to try ByteBuffer.allocateDirect() to see tikhomirov@9: // if there's any performance gain. tikhomirov@9: buffer = ByteBuffer.allocate(size < BUFFER_SIZE ? (int) size : BUFFER_SIZE); tikhomirov@9: buffer.flip(); // or .limit(0) to indicate it's empty tikhomirov@9: } tikhomirov@9: tikhomirov@9: @Override tikhomirov@9: public boolean nonEmpty() { tikhomirov@9: return bufferStartInFile + buffer.position() < size; tikhomirov@9: } tikhomirov@9: tikhomirov@9: @Override tikhomirov@9: public void seek(long offset) throws IOException { tikhomirov@9: if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) { tikhomirov@9: buffer.position((int) (offset - bufferStartInFile)); tikhomirov@9: } else { tikhomirov@9: // out of current buffer, invalidate it (force re-read) tikhomirov@9: // XXX or ever re-read it right away? tikhomirov@9: bufferStartInFile = offset; tikhomirov@9: buffer.clear(); tikhomirov@9: buffer.limit(0); // or .flip() to indicate we switch to reading tikhomirov@9: fileChannel.position(offset); tikhomirov@9: } tikhomirov@9: } tikhomirov@9: tikhomirov@9: @Override tikhomirov@9: public void skip(int bytes) throws IOException { tikhomirov@9: final int newPos = buffer.position() + bytes; tikhomirov@9: if (newPos >= 0 && newPos < buffer.limit()) { tikhomirov@9: // no need to move file pointer, just rewind/seek buffer tikhomirov@9: buffer.position(newPos); tikhomirov@9: } else { tikhomirov@9: // tikhomirov@9: seek(fileChannel.position()+ bytes); tikhomirov@9: } tikhomirov@9: } tikhomirov@9: tikhomirov@9: private boolean fill() throws IOException { tikhomirov@9: if (!buffer.hasRemaining()) { tikhomirov@9: bufferStartInFile += buffer.limit(); tikhomirov@9: buffer.clear(); tikhomirov@9: if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 tikhomirov@9: fileChannel.read(buffer); tikhomirov@9: // may return -1 when EOF, but empty will reflect this, hence no explicit support here tikhomirov@9: } tikhomirov@9: buffer.flip(); tikhomirov@9: } tikhomirov@9: return buffer.hasRemaining(); tikhomirov@9: } tikhomirov@9: tikhomirov@9: @Override tikhomirov@9: public void readBytes(byte[] buf, int offset, int length) throws IOException { tikhomirov@9: final int tail = buffer.remaining(); tikhomirov@9: if (tail >= length) { tikhomirov@9: buffer.get(buf, offset, length); tikhomirov@9: } else { tikhomirov@9: buffer.get(buf, offset, tail); tikhomirov@9: if (fill()) { tikhomirov@9: buffer.get(buf, offset + tail, length - tail); tikhomirov@9: } else { tikhomirov@9: throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past nonEmpty() == false are made. tikhomirov@9: } tikhomirov@9: } tikhomirov@9: } tikhomirov@9: tikhomirov@9: @Override tikhomirov@9: public void done() { tikhomirov@9: if (buffer != null) { tikhomirov@9: buffer = null; tikhomirov@9: } tikhomirov@9: if (fileChannel != null) { tikhomirov@9: try { tikhomirov@9: fileChannel.close(); tikhomirov@9: } catch (IOException ex) { tikhomirov@9: ex.printStackTrace(); // log debug tikhomirov@9: } tikhomirov@9: fileChannel = null; tikhomirov@9: } tikhomirov@9: } tikhomirov@9: } tikhomirov@0: }