Mercurial > jhg
comparison src/com/tmate/hgkit/ll/RevlogStream.java @ 9:d6d2a630f4a6
Access to underlaying file data wrapped into own Access object, implemented with FileChannel and ByteBuffer
| author | Artem Tikhomirov <tikhomirov.artem@gmail.com> |
|---|---|
| date | Sat, 25 Dec 2010 04:45:59 +0100 |
| parents | 5abe5af181bd |
| children | 382cfe9463db |
comparison
equal
deleted
inserted
replaced
| 8:a78c980749e3 | 9:d6d2a630f4a6 |
|---|---|
| 3 */ | 3 */ |
| 4 package com.tmate.hgkit.ll; | 4 package com.tmate.hgkit.ll; |
| 5 | 5 |
| 6 import static com.tmate.hgkit.ll.HgRepository.TIP; | 6 import static com.tmate.hgkit.ll.HgRepository.TIP; |
| 7 | 7 |
| 8 import java.io.BufferedInputStream; | |
| 9 import java.io.DataInput; | |
| 10 import java.io.DataInputStream; | |
| 11 import java.io.EOFException; | |
| 12 import java.io.File; | 8 import java.io.File; |
| 13 import java.io.FileInputStream; | 9 import java.io.FileInputStream; |
| 14 import java.io.FileNotFoundException; | |
| 15 import java.io.IOException; | 10 import java.io.IOException; |
| 11 import java.nio.ByteBuffer; | |
| 12 import java.nio.MappedByteBuffer; | |
| 13 import java.nio.channels.FileChannel; | |
| 16 import java.util.ArrayList; | 14 import java.util.ArrayList; |
| 17 import java.util.Collections; | 15 import java.util.Collections; |
| 18 import java.util.LinkedList; | 16 import java.util.LinkedList; |
| 19 import java.util.List; | 17 import java.util.List; |
| 20 import java.util.zip.DataFormatException; | 18 import java.util.zip.DataFormatException; |
| 35 | 33 |
| 36 RevlogStream(File indexFile) { | 34 RevlogStream(File indexFile) { |
| 37 this.indexFile = indexFile; | 35 this.indexFile = indexFile; |
| 38 } | 36 } |
| 39 | 37 |
| 40 private void detectVersion() { | 38 /*package*/ DataAccess getIndexStream() { |
| 41 | 39 return create(indexFile); |
| 42 } | 40 } |
| 43 | 41 |
| 44 /*package*/ DataInput getIndexStream() { | 42 /*package*/ DataAccess getDataStream() { |
| 45 DataInputStream dis = null; | |
| 46 try { | |
| 47 dis = new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile))); | |
| 48 } catch (FileNotFoundException ex) { | |
| 49 ex.printStackTrace(); | |
| 50 // should not happen, we checked for existence | |
| 51 } | |
| 52 return dis; | |
| 53 } | |
| 54 | |
| 55 /*package*/ DataInput getDataStream() { | |
| 56 final String indexName = indexFile.getName(); | 43 final String indexName = indexFile.getName(); |
| 57 File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d"); | 44 File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d"); |
| 45 return create(dataFile); | |
| 46 } | |
| 47 | |
| 48 private DataAccess create(File f) { | |
| 49 if (!f.exists()) { | |
| 50 return new DataAccess(); | |
| 51 } | |
| 58 try { | 52 try { |
| 59 return new DataInputStream(new BufferedInputStream(new FileInputStream(dataFile))); | 53 FileChannel fc = new FileInputStream(f).getChannel(); |
| 60 } catch (FileNotFoundException ex) { | 54 final int MAPIO_MAGIC_BOUNDARY = 100 * 1024; |
| 61 ex.printStackTrace(); | 55 if (fc.size() > MAPIO_MAGIC_BOUNDARY) { |
| 62 return null; | 56 return new MemoryMapFileAccess(fc, fc.size()); |
| 63 } | 57 } else { |
| 58 return new FileAccess(fc, fc.size()); | |
| 59 } | |
| 60 } catch (IOException ex) { | |
| 61 // unlikely to happen, we've made sure file exists. | |
| 62 ex.printStackTrace(); // FIXME log error | |
| 63 } | |
| 64 return new DataAccess(); // non-null, empty. | |
| 64 } | 65 } |
| 65 | 66 |
| 66 public int revisionCount() { | 67 public int revisionCount() { |
| 67 initOutline(); | 68 initOutline(); |
| 68 return index.size(); | 69 return index.size(); |
| 69 } | 70 } |
| 71 | |
| 72 private final int REVLOGV1_RECORD_SIZE = 64; | |
| 70 | 73 |
| 71 // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg | 74 // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg |
| 72 // ? boolean needsNodeid | 75 // ? boolean needsNodeid |
| 73 public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) { | 76 public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) { |
| 74 initOutline(); | 77 initOutline(); |
| 88 if (end < start || end >= indexSize) { | 91 if (end < start || end >= indexSize) { |
| 89 throw new IllegalArgumentException("Bad right range boundary " + end); | 92 throw new IllegalArgumentException("Bad right range boundary " + end); |
| 90 } | 93 } |
| 91 // XXX may cache [start .. end] from index with a single read (pre-read) | 94 // XXX may cache [start .. end] from index with a single read (pre-read) |
| 92 | 95 |
| 93 DataInput diIndex = null, diData = null; | 96 DataAccess daIndex = null, daData = null; |
| 94 diIndex = getIndexStream(); | 97 daIndex = getIndexStream(); |
| 95 if (needData && !inline) { | 98 if (needData && !inline) { |
| 96 diData = getDataStream(); | 99 daData = getDataStream(); |
| 97 } | 100 } |
| 98 try { | 101 try { |
| 99 byte[] lastData = null; | 102 byte[] lastData = null; |
| 100 int i; | 103 int i; |
| 101 boolean extraReadsToBaseRev = false; | 104 boolean extraReadsToBaseRev = false; |
| 103 i = index.get(start).baseRevision; | 106 i = index.get(start).baseRevision; |
| 104 extraReadsToBaseRev = true; | 107 extraReadsToBaseRev = true; |
| 105 } else { | 108 } else { |
| 106 i = start; | 109 i = start; |
| 107 } | 110 } |
| 108 diIndex.skipBytes(inline ? (int) index.get(i).offset : start * 64); | 111 |
| 112 daIndex.seek(inline ? (int) index.get(i).offset : start * REVLOGV1_RECORD_SIZE); | |
| 109 for (; i <= end; i++ ) { | 113 for (; i <= end; i++ ) { |
| 110 long l = diIndex.readLong(); | 114 long l = daIndex.readLong(); |
| 111 long offset = l >>> 16; | 115 long offset = l >>> 16; |
| 112 int flags = (int) (l & 0X0FFFF); | 116 int flags = (int) (l & 0X0FFFF); |
| 113 int compressedLen = diIndex.readInt(); | 117 int compressedLen = daIndex.readInt(); |
| 114 int actualLen = diIndex.readInt(); | 118 int actualLen = daIndex.readInt(); |
| 115 int baseRevision = diIndex.readInt(); | 119 int baseRevision = daIndex.readInt(); |
| 116 int linkRevision = diIndex.readInt(); | 120 int linkRevision = daIndex.readInt(); |
| 117 int parent1Revision = diIndex.readInt(); | 121 int parent1Revision = daIndex.readInt(); |
| 118 int parent2Revision = diIndex.readInt(); | 122 int parent2Revision = daIndex.readInt(); |
| 119 byte[] buf = new byte[32]; | 123 byte[] buf = new byte[32]; |
| 120 // XXX Hg keeps 12 last bytes empty, we move them into front here | 124 // XXX Hg keeps 12 last bytes empty, we move them into front here |
| 121 diIndex.readFully(buf, 12, 20); | 125 daIndex.readBytes(buf, 12, 20); |
| 122 diIndex.skipBytes(12); | 126 daIndex.skip(12); |
| 123 byte[] data = null; | 127 byte[] data = null; |
| 124 if (needData) { | 128 if (needData) { |
| 125 byte[] dataBuf = new byte[compressedLen]; | 129 byte[] dataBuf = new byte[compressedLen]; |
| 126 if (inline) { | 130 if (inline) { |
| 127 diIndex.readFully(dataBuf); | 131 daIndex.readBytes(dataBuf, 0, compressedLen); |
| 128 } else { | 132 } else { |
| 129 diData.skipBytes((int) index.get(i).offset); // FIXME not skip but seek!!! (skip would work only for the first time) | 133 daData.seek(index.get(i).offset); |
| 130 diData.readFully(dataBuf); | 134 daData.readBytes(dataBuf, 0, compressedLen); |
| 131 } | 135 } |
| 132 if (dataBuf[0] == 0x78 /* 'x' */) { | 136 if (dataBuf[0] == 0x78 /* 'x' */) { |
| 133 try { | 137 try { |
| 134 Inflater zlib = new Inflater(); | 138 Inflater zlib = new Inflater(); |
| 135 zlib.setInput(dataBuf, 0, compressedLen); | 139 zlib.setInput(dataBuf, 0, compressedLen); |
| 167 byte[] baseRevContent = lastData; | 171 byte[] baseRevContent = lastData; |
| 168 data = apply(baseRevContent, actualLen, patches); | 172 data = apply(baseRevContent, actualLen, patches); |
| 169 } | 173 } |
| 170 } else { | 174 } else { |
| 171 if (inline) { | 175 if (inline) { |
| 172 diIndex.skipBytes(compressedLen); | 176 daIndex.skip(compressedLen); |
| 173 } | 177 } |
| 174 } | 178 } |
| 175 if (!extraReadsToBaseRev || i >= start) { | 179 if (!extraReadsToBaseRev || i >= start) { |
| 176 inspector.next(i, actualLen, baseRevision, linkRevision, parent1Revision, parent2Revision, buf, data); | 180 inspector.next(i, actualLen, baseRevision, linkRevision, parent1Revision, parent2Revision, buf, data); |
| 177 } | 181 } |
| 178 lastData = data; | 182 lastData = data; |
| 179 } | 183 } |
| 180 } catch (EOFException ex) { | |
| 181 // should not happen as long as we read inside known boundaries | |
| 182 throw new IllegalStateException(ex); | |
| 183 } catch (IOException ex) { | 184 } catch (IOException ex) { |
| 184 throw new IllegalStateException(ex); // FIXME need better handling | 185 throw new IllegalStateException(ex); // FIXME need better handling |
| 185 } finally { | 186 } finally { |
| 186 hackCloseFileStreams(diIndex, diData); // FIXME HACK!!! | 187 daIndex.done(); |
| 188 if (daData != null) { | |
| 189 daData.done(); | |
| 190 } | |
| 187 } | 191 } |
| 188 } | 192 } |
| 189 | 193 |
| 190 private void initOutline() { | 194 private void initOutline() { |
| 191 if (index != null && !index.isEmpty()) { | 195 if (index != null && !index.isEmpty()) { |
| 192 return; | 196 return; |
| 193 } | 197 } |
| 194 ArrayList<IndexEntry> res = new ArrayList<IndexEntry>(); | 198 ArrayList<IndexEntry> res = new ArrayList<IndexEntry>(); |
| 195 DataInput di = getIndexStream(); | 199 DataAccess da = getIndexStream(); |
| 196 try { | 200 try { |
| 197 int versionField = di.readInt(); | 201 int versionField = da.readInt(); |
| 198 di.readInt(); // just to skip next 2 bytes of offset + flags | 202 da.readInt(); // just to skip next 2 bytes of offset + flags |
| 199 final int INLINEDATA = 1 << 16; | 203 final int INLINEDATA = 1 << 16; |
| 200 inline = (versionField & INLINEDATA) != 0; | 204 inline = (versionField & INLINEDATA) != 0; |
| 201 long offset = 0; // first offset is always 0, thus Hg uses it for other purposes | 205 long offset = 0; // first offset is always 0, thus Hg uses it for other purposes |
| 202 while(true) { // EOFExcepiton should get us outta here. FIXME Our inputstream should has explicit no-more-data indicator | 206 while(true) { |
| 203 int compressedLen = di.readInt(); | 207 int compressedLen = da.readInt(); |
| 204 // 8+4 = 12 bytes total read here | 208 // 8+4 = 12 bytes total read here |
| 205 int actualLen = di.readInt(); | 209 int actualLen = da.readInt(); |
| 206 int baseRevision = di.readInt(); | 210 int baseRevision = da.readInt(); |
| 207 // 12 + 8 = 20 bytes read here | 211 // 12 + 8 = 20 bytes read here |
| 208 // int linkRevision = di.readInt(); | 212 // int linkRevision = di.readInt(); |
| 209 // int parent1Revision = di.readInt(); | 213 // int parent1Revision = di.readInt(); |
| 210 // int parent2Revision = di.readInt(); | 214 // int parent2Revision = di.readInt(); |
| 211 // byte[] nodeid = new byte[32]; | 215 // byte[] nodeid = new byte[32]; |
| 212 if (inline) { | 216 if (inline) { |
| 213 res.add(new IndexEntry(offset + 64*res.size(), baseRevision)); | 217 res.add(new IndexEntry(offset + REVLOGV1_RECORD_SIZE * res.size(), baseRevision)); |
| 214 di.skipBytes(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) | 218 da.skip(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) |
| 215 } else { | 219 } else { |
| 216 res.add(new IndexEntry(offset, baseRevision)); | 220 res.add(new IndexEntry(offset, baseRevision)); |
| 217 di.skipBytes(3*4 + 32); | 221 da.skip(3*4 + 32); |
| 218 } | 222 } |
| 219 long l = di.readLong(); | 223 if (da.nonEmpty()) { |
| 220 offset = l >>> 16; | 224 long l = da.readLong(); |
| 221 } | 225 offset = l >>> 16; |
| 222 } catch (EOFException ex) { | 226 } else { |
| 223 // fine, done then | 227 // fine, done then |
| 224 index = res; | 228 index = res; |
| 229 break; | |
| 230 } | |
| 231 } | |
| 225 } catch (IOException ex) { | 232 } catch (IOException ex) { |
| 226 ex.printStackTrace(); | 233 ex.printStackTrace(); // log error |
| 227 // too bad, no outline then | 234 // too bad, no outline then. |
| 228 index = Collections.emptyList(); | 235 index = Collections.emptyList(); |
| 229 } | 236 } finally { |
| 230 hackCloseFileStreams(di, null); // FIXME HACK!!! | 237 da.done(); |
| 238 } | |
| 239 | |
| 231 } | 240 } |
| 232 | 241 |
| 233 // FIXME HACK to deal with File/FileStream nature of out data source. Won't need this once implement | |
| 234 // own DataInput based on bytearray chunks or RandomAccessFile | |
| 235 private void hackCloseFileStreams(DataInput index, DataInput data) { | |
| 236 try { | |
| 237 if (index != null) { | |
| 238 ((DataInputStream) index).close(); | |
| 239 } | |
| 240 if (data != null) { | |
| 241 ((DataInputStream) data).close(); | |
| 242 } | |
| 243 } catch (IOException ex) { | |
| 244 ex.printStackTrace(); | |
| 245 } | |
| 246 } | |
| 247 | |
| 248 | 242 |
| 249 // perhaps, package-local or protected, if anyone else from low-level needs them | 243 // perhaps, package-local or protected, if anyone else from low-level needs them |
| 250 // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes? | 244 // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes? |
| 251 private static class IndexEntry { | 245 private static class IndexEntry { |
| 252 public final long offset; // for separate .i and .d - copy of index record entry, for inline index - actual offset of the record in the .i file (record entry + revision * record size)) | 246 public final long offset; // for separate .i and .d - copy of index record entry, for inline index - actual offset of the record in the .i file (record entry + revision * record size)) |
| 289 data = new byte[len]; | 283 data = new byte[len]; |
| 290 System.arraycopy(src, srcOffset, data, 0, len); | 284 System.arraycopy(src, srcOffset, data, 0, len); |
| 291 } | 285 } |
| 292 } | 286 } |
| 293 | 287 |
| 288 /*package-local*/ class DataAccess { | |
| 289 public boolean nonEmpty() { | |
| 290 return false; | |
| 291 } | |
| 292 // absolute positioning | |
| 293 public void seek(long offset) throws IOException { | |
| 294 throw new UnsupportedOperationException(); | |
| 295 } | |
| 296 // relative positioning | |
| 297 public void skip(int bytes) throws IOException { | |
| 298 throw new UnsupportedOperationException(); | |
| 299 } | |
| 300 // shall be called once this object no longer needed | |
| 301 public void done() { | |
| 302 // no-op in this empty implementation | |
| 303 } | |
| 304 public int readInt() throws IOException { | |
| 305 byte[] b = new byte[4]; | |
| 306 readBytes(b, 0, 4); | |
| 307 return b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); | |
| 308 } | |
| 309 public long readLong() throws IOException { | |
| 310 byte[] b = new byte[8]; | |
| 311 readBytes(b, 0, 8); | |
| 312 int i1 = b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); | |
| 313 int i2 = b[4] << 24 | (b[5] & 0xFF) << 16 | (b[6] & 0xFF) << 8 | (b[7] & 0xFF); | |
| 314 return ((long) i1) << 32 | ((long) i2 & 0xFFFFFFFF); | |
| 315 } | |
| 316 public void readBytes(byte[] buf, int offset, int length) throws IOException { | |
| 317 throw new UnsupportedOperationException(); | |
| 318 } | |
| 319 } | |
| 320 | |
| 321 // DOESN'T WORK YET | |
| 322 private class MemoryMapFileAccess extends DataAccess { | |
| 323 private FileChannel fileChannel; | |
| 324 private final long size; | |
| 325 private long position = 0; | |
| 326 | |
| 327 public MemoryMapFileAccess(FileChannel fc, long channelSize) { | |
| 328 fileChannel = fc; | |
| 329 size = channelSize; | |
| 330 } | |
| 331 | |
| 332 @Override | |
| 333 public void seek(long offset) { | |
| 334 position = offset; | |
| 335 } | |
| 336 | |
| 337 @Override | |
| 338 public void skip(int bytes) throws IOException { | |
| 339 position += bytes; | |
| 340 } | |
| 341 | |
| 342 private boolean fill() throws IOException { | |
| 343 final int BUFFER_SIZE = 8 * 1024; | |
| 344 long left = size - position; | |
| 345 MappedByteBuffer rv = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < BUFFER_SIZE ? left : BUFFER_SIZE); | |
| 346 position += rv.capacity(); | |
| 347 return rv.hasRemaining(); | |
| 348 } | |
| 349 | |
| 350 @Override | |
| 351 public void done() { | |
| 352 if (fileChannel != null) { | |
| 353 try { | |
| 354 fileChannel.close(); | |
| 355 } catch (IOException ex) { | |
| 356 ex.printStackTrace(); // log debug | |
| 357 } | |
| 358 fileChannel = null; | |
| 359 } | |
| 360 } | |
| 361 } | |
| 362 | |
| 363 private class FileAccess extends DataAccess { | |
| 364 private FileChannel fileChannel; | |
| 365 private final long size; | |
| 366 private ByteBuffer buffer; | |
| 367 private long bufferStartInFile = 0; // offset of this.buffer in the file. | |
| 368 | |
| 369 public FileAccess(FileChannel fc, long channelSize) { | |
| 370 fileChannel = fc; | |
| 371 size = channelSize; | |
| 372 final int BUFFER_SIZE = 8 * 1024; | |
| 373 // XXX once implementation is more or less stable, | |
| 374 // may want to try ByteBuffer.allocateDirect() to see | |
| 375 // if there's any performance gain. | |
| 376 buffer = ByteBuffer.allocate(size < BUFFER_SIZE ? (int) size : BUFFER_SIZE); | |
| 377 buffer.flip(); // or .limit(0) to indicate it's empty | |
| 378 } | |
| 379 | |
| 380 @Override | |
| 381 public boolean nonEmpty() { | |
| 382 return bufferStartInFile + buffer.position() < size; | |
| 383 } | |
| 384 | |
| 385 @Override | |
| 386 public void seek(long offset) throws IOException { | |
| 387 if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) { | |
| 388 buffer.position((int) (offset - bufferStartInFile)); | |
| 389 } else { | |
| 390 // out of current buffer, invalidate it (force re-read) | |
| 391 // XXX or ever re-read it right away? | |
| 392 bufferStartInFile = offset; | |
| 393 buffer.clear(); | |
| 394 buffer.limit(0); // or .flip() to indicate we switch to reading | |
| 395 fileChannel.position(offset); | |
| 396 } | |
| 397 } | |
| 398 | |
| 399 @Override | |
| 400 public void skip(int bytes) throws IOException { | |
| 401 final int newPos = buffer.position() + bytes; | |
| 402 if (newPos >= 0 && newPos < buffer.limit()) { | |
| 403 // no need to move file pointer, just rewind/seek buffer | |
| 404 buffer.position(newPos); | |
| 405 } else { | |
| 406 // | |
| 407 seek(fileChannel.position()+ bytes); | |
| 408 } | |
| 409 } | |
| 410 | |
| 411 private boolean fill() throws IOException { | |
| 412 if (!buffer.hasRemaining()) { | |
| 413 bufferStartInFile += buffer.limit(); | |
| 414 buffer.clear(); | |
| 415 if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 | |
| 416 fileChannel.read(buffer); | |
| 417 // may return -1 when EOF, but empty will reflect this, hence no explicit support here | |
| 418 } | |
| 419 buffer.flip(); | |
| 420 } | |
| 421 return buffer.hasRemaining(); | |
| 422 } | |
| 423 | |
| 424 @Override | |
| 425 public void readBytes(byte[] buf, int offset, int length) throws IOException { | |
| 426 final int tail = buffer.remaining(); | |
| 427 if (tail >= length) { | |
| 428 buffer.get(buf, offset, length); | |
| 429 } else { | |
| 430 buffer.get(buf, offset, tail); | |
| 431 if (fill()) { | |
| 432 buffer.get(buf, offset + tail, length - tail); | |
| 433 } else { | |
| 434 throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past nonEmpty() == false are made. | |
| 435 } | |
| 436 } | |
| 437 } | |
| 438 | |
| 439 @Override | |
| 440 public void done() { | |
| 441 if (buffer != null) { | |
| 442 buffer = null; | |
| 443 } | |
| 444 if (fileChannel != null) { | |
| 445 try { | |
| 446 fileChannel.close(); | |
| 447 } catch (IOException ex) { | |
| 448 ex.printStackTrace(); // log debug | |
| 449 } | |
| 450 fileChannel = null; | |
| 451 } | |
| 452 } | |
| 453 } | |
| 294 } | 454 } |
