Mercurial > jhg
comparison src/com/tmate/hgkit/ll/RevlogStream.java @ 9:d6d2a630f4a6
Access to underlaying file data wrapped into own Access object, implemented with FileChannel and ByteBuffer
author | Artem Tikhomirov <tikhomirov.artem@gmail.com> |
---|---|
date | Sat, 25 Dec 2010 04:45:59 +0100 |
parents | 5abe5af181bd |
children | 382cfe9463db |
comparison
equal
deleted
inserted
replaced
8:a78c980749e3 | 9:d6d2a630f4a6 |
---|---|
3 */ | 3 */ |
4 package com.tmate.hgkit.ll; | 4 package com.tmate.hgkit.ll; |
5 | 5 |
6 import static com.tmate.hgkit.ll.HgRepository.TIP; | 6 import static com.tmate.hgkit.ll.HgRepository.TIP; |
7 | 7 |
8 import java.io.BufferedInputStream; | |
9 import java.io.DataInput; | |
10 import java.io.DataInputStream; | |
11 import java.io.EOFException; | |
12 import java.io.File; | 8 import java.io.File; |
13 import java.io.FileInputStream; | 9 import java.io.FileInputStream; |
14 import java.io.FileNotFoundException; | |
15 import java.io.IOException; | 10 import java.io.IOException; |
11 import java.nio.ByteBuffer; | |
12 import java.nio.MappedByteBuffer; | |
13 import java.nio.channels.FileChannel; | |
16 import java.util.ArrayList; | 14 import java.util.ArrayList; |
17 import java.util.Collections; | 15 import java.util.Collections; |
18 import java.util.LinkedList; | 16 import java.util.LinkedList; |
19 import java.util.List; | 17 import java.util.List; |
20 import java.util.zip.DataFormatException; | 18 import java.util.zip.DataFormatException; |
35 | 33 |
36 RevlogStream(File indexFile) { | 34 RevlogStream(File indexFile) { |
37 this.indexFile = indexFile; | 35 this.indexFile = indexFile; |
38 } | 36 } |
39 | 37 |
40 private void detectVersion() { | 38 /*package*/ DataAccess getIndexStream() { |
41 | 39 return create(indexFile); |
42 } | 40 } |
43 | 41 |
44 /*package*/ DataInput getIndexStream() { | 42 /*package*/ DataAccess getDataStream() { |
45 DataInputStream dis = null; | |
46 try { | |
47 dis = new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile))); | |
48 } catch (FileNotFoundException ex) { | |
49 ex.printStackTrace(); | |
50 // should not happen, we checked for existence | |
51 } | |
52 return dis; | |
53 } | |
54 | |
55 /*package*/ DataInput getDataStream() { | |
56 final String indexName = indexFile.getName(); | 43 final String indexName = indexFile.getName(); |
57 File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d"); | 44 File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d"); |
45 return create(dataFile); | |
46 } | |
47 | |
48 private DataAccess create(File f) { | |
49 if (!f.exists()) { | |
50 return new DataAccess(); | |
51 } | |
58 try { | 52 try { |
59 return new DataInputStream(new BufferedInputStream(new FileInputStream(dataFile))); | 53 FileChannel fc = new FileInputStream(f).getChannel(); |
60 } catch (FileNotFoundException ex) { | 54 final int MAPIO_MAGIC_BOUNDARY = 100 * 1024; |
61 ex.printStackTrace(); | 55 if (fc.size() > MAPIO_MAGIC_BOUNDARY) { |
62 return null; | 56 return new MemoryMapFileAccess(fc, fc.size()); |
63 } | 57 } else { |
58 return new FileAccess(fc, fc.size()); | |
59 } | |
60 } catch (IOException ex) { | |
61 // unlikely to happen, we've made sure file exists. | |
62 ex.printStackTrace(); // FIXME log error | |
63 } | |
64 return new DataAccess(); // non-null, empty. | |
64 } | 65 } |
65 | 66 |
66 public int revisionCount() { | 67 public int revisionCount() { |
67 initOutline(); | 68 initOutline(); |
68 return index.size(); | 69 return index.size(); |
69 } | 70 } |
71 | |
72 private final int REVLOGV1_RECORD_SIZE = 64; | |
70 | 73 |
71 // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg | 74 // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg |
72 // ? boolean needsNodeid | 75 // ? boolean needsNodeid |
73 public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) { | 76 public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) { |
74 initOutline(); | 77 initOutline(); |
88 if (end < start || end >= indexSize) { | 91 if (end < start || end >= indexSize) { |
89 throw new IllegalArgumentException("Bad right range boundary " + end); | 92 throw new IllegalArgumentException("Bad right range boundary " + end); |
90 } | 93 } |
91 // XXX may cache [start .. end] from index with a single read (pre-read) | 94 // XXX may cache [start .. end] from index with a single read (pre-read) |
92 | 95 |
93 DataInput diIndex = null, diData = null; | 96 DataAccess daIndex = null, daData = null; |
94 diIndex = getIndexStream(); | 97 daIndex = getIndexStream(); |
95 if (needData && !inline) { | 98 if (needData && !inline) { |
96 diData = getDataStream(); | 99 daData = getDataStream(); |
97 } | 100 } |
98 try { | 101 try { |
99 byte[] lastData = null; | 102 byte[] lastData = null; |
100 int i; | 103 int i; |
101 boolean extraReadsToBaseRev = false; | 104 boolean extraReadsToBaseRev = false; |
103 i = index.get(start).baseRevision; | 106 i = index.get(start).baseRevision; |
104 extraReadsToBaseRev = true; | 107 extraReadsToBaseRev = true; |
105 } else { | 108 } else { |
106 i = start; | 109 i = start; |
107 } | 110 } |
108 diIndex.skipBytes(inline ? (int) index.get(i).offset : start * 64); | 111 |
112 daIndex.seek(inline ? (int) index.get(i).offset : start * REVLOGV1_RECORD_SIZE); | |
109 for (; i <= end; i++ ) { | 113 for (; i <= end; i++ ) { |
110 long l = diIndex.readLong(); | 114 long l = daIndex.readLong(); |
111 long offset = l >>> 16; | 115 long offset = l >>> 16; |
112 int flags = (int) (l & 0X0FFFF); | 116 int flags = (int) (l & 0X0FFFF); |
113 int compressedLen = diIndex.readInt(); | 117 int compressedLen = daIndex.readInt(); |
114 int actualLen = diIndex.readInt(); | 118 int actualLen = daIndex.readInt(); |
115 int baseRevision = diIndex.readInt(); | 119 int baseRevision = daIndex.readInt(); |
116 int linkRevision = diIndex.readInt(); | 120 int linkRevision = daIndex.readInt(); |
117 int parent1Revision = diIndex.readInt(); | 121 int parent1Revision = daIndex.readInt(); |
118 int parent2Revision = diIndex.readInt(); | 122 int parent2Revision = daIndex.readInt(); |
119 byte[] buf = new byte[32]; | 123 byte[] buf = new byte[32]; |
120 // XXX Hg keeps 12 last bytes empty, we move them into front here | 124 // XXX Hg keeps 12 last bytes empty, we move them into front here |
121 diIndex.readFully(buf, 12, 20); | 125 daIndex.readBytes(buf, 12, 20); |
122 diIndex.skipBytes(12); | 126 daIndex.skip(12); |
123 byte[] data = null; | 127 byte[] data = null; |
124 if (needData) { | 128 if (needData) { |
125 byte[] dataBuf = new byte[compressedLen]; | 129 byte[] dataBuf = new byte[compressedLen]; |
126 if (inline) { | 130 if (inline) { |
127 diIndex.readFully(dataBuf); | 131 daIndex.readBytes(dataBuf, 0, compressedLen); |
128 } else { | 132 } else { |
129 diData.skipBytes((int) index.get(i).offset); // FIXME not skip but seek!!! (skip would work only for the first time) | 133 daData.seek(index.get(i).offset); |
130 diData.readFully(dataBuf); | 134 daData.readBytes(dataBuf, 0, compressedLen); |
131 } | 135 } |
132 if (dataBuf[0] == 0x78 /* 'x' */) { | 136 if (dataBuf[0] == 0x78 /* 'x' */) { |
133 try { | 137 try { |
134 Inflater zlib = new Inflater(); | 138 Inflater zlib = new Inflater(); |
135 zlib.setInput(dataBuf, 0, compressedLen); | 139 zlib.setInput(dataBuf, 0, compressedLen); |
167 byte[] baseRevContent = lastData; | 171 byte[] baseRevContent = lastData; |
168 data = apply(baseRevContent, actualLen, patches); | 172 data = apply(baseRevContent, actualLen, patches); |
169 } | 173 } |
170 } else { | 174 } else { |
171 if (inline) { | 175 if (inline) { |
172 diIndex.skipBytes(compressedLen); | 176 daIndex.skip(compressedLen); |
173 } | 177 } |
174 } | 178 } |
175 if (!extraReadsToBaseRev || i >= start) { | 179 if (!extraReadsToBaseRev || i >= start) { |
176 inspector.next(i, actualLen, baseRevision, linkRevision, parent1Revision, parent2Revision, buf, data); | 180 inspector.next(i, actualLen, baseRevision, linkRevision, parent1Revision, parent2Revision, buf, data); |
177 } | 181 } |
178 lastData = data; | 182 lastData = data; |
179 } | 183 } |
180 } catch (EOFException ex) { | |
181 // should not happen as long as we read inside known boundaries | |
182 throw new IllegalStateException(ex); | |
183 } catch (IOException ex) { | 184 } catch (IOException ex) { |
184 throw new IllegalStateException(ex); // FIXME need better handling | 185 throw new IllegalStateException(ex); // FIXME need better handling |
185 } finally { | 186 } finally { |
186 hackCloseFileStreams(diIndex, diData); // FIXME HACK!!! | 187 daIndex.done(); |
188 if (daData != null) { | |
189 daData.done(); | |
190 } | |
187 } | 191 } |
188 } | 192 } |
189 | 193 |
190 private void initOutline() { | 194 private void initOutline() { |
191 if (index != null && !index.isEmpty()) { | 195 if (index != null && !index.isEmpty()) { |
192 return; | 196 return; |
193 } | 197 } |
194 ArrayList<IndexEntry> res = new ArrayList<IndexEntry>(); | 198 ArrayList<IndexEntry> res = new ArrayList<IndexEntry>(); |
195 DataInput di = getIndexStream(); | 199 DataAccess da = getIndexStream(); |
196 try { | 200 try { |
197 int versionField = di.readInt(); | 201 int versionField = da.readInt(); |
198 di.readInt(); // just to skip next 2 bytes of offset + flags | 202 da.readInt(); // just to skip next 2 bytes of offset + flags |
199 final int INLINEDATA = 1 << 16; | 203 final int INLINEDATA = 1 << 16; |
200 inline = (versionField & INLINEDATA) != 0; | 204 inline = (versionField & INLINEDATA) != 0; |
201 long offset = 0; // first offset is always 0, thus Hg uses it for other purposes | 205 long offset = 0; // first offset is always 0, thus Hg uses it for other purposes |
202 while(true) { // EOFExcepiton should get us outta here. FIXME Our inputstream should has explicit no-more-data indicator | 206 while(true) { |
203 int compressedLen = di.readInt(); | 207 int compressedLen = da.readInt(); |
204 // 8+4 = 12 bytes total read here | 208 // 8+4 = 12 bytes total read here |
205 int actualLen = di.readInt(); | 209 int actualLen = da.readInt(); |
206 int baseRevision = di.readInt(); | 210 int baseRevision = da.readInt(); |
207 // 12 + 8 = 20 bytes read here | 211 // 12 + 8 = 20 bytes read here |
208 // int linkRevision = di.readInt(); | 212 // int linkRevision = di.readInt(); |
209 // int parent1Revision = di.readInt(); | 213 // int parent1Revision = di.readInt(); |
210 // int parent2Revision = di.readInt(); | 214 // int parent2Revision = di.readInt(); |
211 // byte[] nodeid = new byte[32]; | 215 // byte[] nodeid = new byte[32]; |
212 if (inline) { | 216 if (inline) { |
213 res.add(new IndexEntry(offset + 64*res.size(), baseRevision)); | 217 res.add(new IndexEntry(offset + REVLOGV1_RECORD_SIZE * res.size(), baseRevision)); |
214 di.skipBytes(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) | 218 da.skip(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) |
215 } else { | 219 } else { |
216 res.add(new IndexEntry(offset, baseRevision)); | 220 res.add(new IndexEntry(offset, baseRevision)); |
217 di.skipBytes(3*4 + 32); | 221 da.skip(3*4 + 32); |
218 } | 222 } |
219 long l = di.readLong(); | 223 if (da.nonEmpty()) { |
220 offset = l >>> 16; | 224 long l = da.readLong(); |
221 } | 225 offset = l >>> 16; |
222 } catch (EOFException ex) { | 226 } else { |
223 // fine, done then | 227 // fine, done then |
224 index = res; | 228 index = res; |
229 break; | |
230 } | |
231 } | |
225 } catch (IOException ex) { | 232 } catch (IOException ex) { |
226 ex.printStackTrace(); | 233 ex.printStackTrace(); // log error |
227 // too bad, no outline then | 234 // too bad, no outline then. |
228 index = Collections.emptyList(); | 235 index = Collections.emptyList(); |
229 } | 236 } finally { |
230 hackCloseFileStreams(di, null); // FIXME HACK!!! | 237 da.done(); |
238 } | |
239 | |
231 } | 240 } |
232 | 241 |
233 // FIXME HACK to deal with File/FileStream nature of out data source. Won't need this once implement | |
234 // own DataInput based on bytearray chunks or RandomAccessFile | |
235 private void hackCloseFileStreams(DataInput index, DataInput data) { | |
236 try { | |
237 if (index != null) { | |
238 ((DataInputStream) index).close(); | |
239 } | |
240 if (data != null) { | |
241 ((DataInputStream) data).close(); | |
242 } | |
243 } catch (IOException ex) { | |
244 ex.printStackTrace(); | |
245 } | |
246 } | |
247 | |
248 | 242 |
249 // perhaps, package-local or protected, if anyone else from low-level needs them | 243 // perhaps, package-local or protected, if anyone else from low-level needs them |
250 // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes? | 244 // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes? |
251 private static class IndexEntry { | 245 private static class IndexEntry { |
252 public final long offset; // for separate .i and .d - copy of index record entry, for inline index - actual offset of the record in the .i file (record entry + revision * record size)) | 246 public final long offset; // for separate .i and .d - copy of index record entry, for inline index - actual offset of the record in the .i file (record entry + revision * record size)) |
289 data = new byte[len]; | 283 data = new byte[len]; |
290 System.arraycopy(src, srcOffset, data, 0, len); | 284 System.arraycopy(src, srcOffset, data, 0, len); |
291 } | 285 } |
292 } | 286 } |
293 | 287 |
288 /*package-local*/ class DataAccess { | |
289 public boolean nonEmpty() { | |
290 return false; | |
291 } | |
292 // absolute positioning | |
293 public void seek(long offset) throws IOException { | |
294 throw new UnsupportedOperationException(); | |
295 } | |
296 // relative positioning | |
297 public void skip(int bytes) throws IOException { | |
298 throw new UnsupportedOperationException(); | |
299 } | |
300 // shall be called once this object no longer needed | |
301 public void done() { | |
302 // no-op in this empty implementation | |
303 } | |
304 public int readInt() throws IOException { | |
305 byte[] b = new byte[4]; | |
306 readBytes(b, 0, 4); | |
307 return b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); | |
308 } | |
309 public long readLong() throws IOException { | |
310 byte[] b = new byte[8]; | |
311 readBytes(b, 0, 8); | |
312 int i1 = b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF); | |
313 int i2 = b[4] << 24 | (b[5] & 0xFF) << 16 | (b[6] & 0xFF) << 8 | (b[7] & 0xFF); | |
314 return ((long) i1) << 32 | ((long) i2 & 0xFFFFFFFF); | |
315 } | |
316 public void readBytes(byte[] buf, int offset, int length) throws IOException { | |
317 throw new UnsupportedOperationException(); | |
318 } | |
319 } | |
320 | |
321 // DOESN'T WORK YET | |
322 private class MemoryMapFileAccess extends DataAccess { | |
323 private FileChannel fileChannel; | |
324 private final long size; | |
325 private long position = 0; | |
326 | |
327 public MemoryMapFileAccess(FileChannel fc, long channelSize) { | |
328 fileChannel = fc; | |
329 size = channelSize; | |
330 } | |
331 | |
332 @Override | |
333 public void seek(long offset) { | |
334 position = offset; | |
335 } | |
336 | |
337 @Override | |
338 public void skip(int bytes) throws IOException { | |
339 position += bytes; | |
340 } | |
341 | |
342 private boolean fill() throws IOException { | |
343 final int BUFFER_SIZE = 8 * 1024; | |
344 long left = size - position; | |
345 MappedByteBuffer rv = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < BUFFER_SIZE ? left : BUFFER_SIZE); | |
346 position += rv.capacity(); | |
347 return rv.hasRemaining(); | |
348 } | |
349 | |
350 @Override | |
351 public void done() { | |
352 if (fileChannel != null) { | |
353 try { | |
354 fileChannel.close(); | |
355 } catch (IOException ex) { | |
356 ex.printStackTrace(); // log debug | |
357 } | |
358 fileChannel = null; | |
359 } | |
360 } | |
361 } | |
362 | |
363 private class FileAccess extends DataAccess { | |
364 private FileChannel fileChannel; | |
365 private final long size; | |
366 private ByteBuffer buffer; | |
367 private long bufferStartInFile = 0; // offset of this.buffer in the file. | |
368 | |
369 public FileAccess(FileChannel fc, long channelSize) { | |
370 fileChannel = fc; | |
371 size = channelSize; | |
372 final int BUFFER_SIZE = 8 * 1024; | |
373 // XXX once implementation is more or less stable, | |
374 // may want to try ByteBuffer.allocateDirect() to see | |
375 // if there's any performance gain. | |
376 buffer = ByteBuffer.allocate(size < BUFFER_SIZE ? (int) size : BUFFER_SIZE); | |
377 buffer.flip(); // or .limit(0) to indicate it's empty | |
378 } | |
379 | |
380 @Override | |
381 public boolean nonEmpty() { | |
382 return bufferStartInFile + buffer.position() < size; | |
383 } | |
384 | |
385 @Override | |
386 public void seek(long offset) throws IOException { | |
387 if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) { | |
388 buffer.position((int) (offset - bufferStartInFile)); | |
389 } else { | |
390 // out of current buffer, invalidate it (force re-read) | |
391 // XXX or ever re-read it right away? | |
392 bufferStartInFile = offset; | |
393 buffer.clear(); | |
394 buffer.limit(0); // or .flip() to indicate we switch to reading | |
395 fileChannel.position(offset); | |
396 } | |
397 } | |
398 | |
399 @Override | |
400 public void skip(int bytes) throws IOException { | |
401 final int newPos = buffer.position() + bytes; | |
402 if (newPos >= 0 && newPos < buffer.limit()) { | |
403 // no need to move file pointer, just rewind/seek buffer | |
404 buffer.position(newPos); | |
405 } else { | |
406 // | |
407 seek(fileChannel.position()+ bytes); | |
408 } | |
409 } | |
410 | |
411 private boolean fill() throws IOException { | |
412 if (!buffer.hasRemaining()) { | |
413 bufferStartInFile += buffer.limit(); | |
414 buffer.clear(); | |
415 if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 | |
416 fileChannel.read(buffer); | |
417 // may return -1 when EOF, but empty will reflect this, hence no explicit support here | |
418 } | |
419 buffer.flip(); | |
420 } | |
421 return buffer.hasRemaining(); | |
422 } | |
423 | |
424 @Override | |
425 public void readBytes(byte[] buf, int offset, int length) throws IOException { | |
426 final int tail = buffer.remaining(); | |
427 if (tail >= length) { | |
428 buffer.get(buf, offset, length); | |
429 } else { | |
430 buffer.get(buf, offset, tail); | |
431 if (fill()) { | |
432 buffer.get(buf, offset + tail, length - tail); | |
433 } else { | |
434 throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past nonEmpty() == false are made. | |
435 } | |
436 } | |
437 } | |
438 | |
439 @Override | |
440 public void done() { | |
441 if (buffer != null) { | |
442 buffer = null; | |
443 } | |
444 if (fileChannel != null) { | |
445 try { | |
446 fileChannel.close(); | |
447 } catch (IOException ex) { | |
448 ex.printStackTrace(); // log debug | |
449 } | |
450 fileChannel = null; | |
451 } | |
452 } | |
453 } | |
294 } | 454 } |