comparison src/com/tmate/hgkit/ll/RevlogStream.java @ 9:d6d2a630f4a6

Access to underlaying file data wrapped into own Access object, implemented with FileChannel and ByteBuffer
author Artem Tikhomirov <tikhomirov.artem@gmail.com>
date Sat, 25 Dec 2010 04:45:59 +0100
parents 5abe5af181bd
children 382cfe9463db
comparison
equal deleted inserted replaced
8:a78c980749e3 9:d6d2a630f4a6
3 */ 3 */
4 package com.tmate.hgkit.ll; 4 package com.tmate.hgkit.ll;
5 5
6 import static com.tmate.hgkit.ll.HgRepository.TIP; 6 import static com.tmate.hgkit.ll.HgRepository.TIP;
7 7
8 import java.io.BufferedInputStream;
9 import java.io.DataInput;
10 import java.io.DataInputStream;
11 import java.io.EOFException;
12 import java.io.File; 8 import java.io.File;
13 import java.io.FileInputStream; 9 import java.io.FileInputStream;
14 import java.io.FileNotFoundException;
15 import java.io.IOException; 10 import java.io.IOException;
11 import java.nio.ByteBuffer;
12 import java.nio.MappedByteBuffer;
13 import java.nio.channels.FileChannel;
16 import java.util.ArrayList; 14 import java.util.ArrayList;
17 import java.util.Collections; 15 import java.util.Collections;
18 import java.util.LinkedList; 16 import java.util.LinkedList;
19 import java.util.List; 17 import java.util.List;
20 import java.util.zip.DataFormatException; 18 import java.util.zip.DataFormatException;
35 33
36 RevlogStream(File indexFile) { 34 RevlogStream(File indexFile) {
37 this.indexFile = indexFile; 35 this.indexFile = indexFile;
38 } 36 }
39 37
40 private void detectVersion() { 38 /*package*/ DataAccess getIndexStream() {
41 39 return create(indexFile);
42 } 40 }
43 41
44 /*package*/ DataInput getIndexStream() { 42 /*package*/ DataAccess getDataStream() {
45 DataInputStream dis = null;
46 try {
47 dis = new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile)));
48 } catch (FileNotFoundException ex) {
49 ex.printStackTrace();
50 // should not happen, we checked for existence
51 }
52 return dis;
53 }
54
55 /*package*/ DataInput getDataStream() {
56 final String indexName = indexFile.getName(); 43 final String indexName = indexFile.getName();
57 File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d"); 44 File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d");
45 return create(dataFile);
46 }
47
48 private DataAccess create(File f) {
49 if (!f.exists()) {
50 return new DataAccess();
51 }
58 try { 52 try {
59 return new DataInputStream(new BufferedInputStream(new FileInputStream(dataFile))); 53 FileChannel fc = new FileInputStream(f).getChannel();
60 } catch (FileNotFoundException ex) { 54 final int MAPIO_MAGIC_BOUNDARY = 100 * 1024;
61 ex.printStackTrace(); 55 if (fc.size() > MAPIO_MAGIC_BOUNDARY) {
62 return null; 56 return new MemoryMapFileAccess(fc, fc.size());
63 } 57 } else {
58 return new FileAccess(fc, fc.size());
59 }
60 } catch (IOException ex) {
61 // unlikely to happen, we've made sure file exists.
62 ex.printStackTrace(); // FIXME log error
63 }
64 return new DataAccess(); // non-null, empty.
64 } 65 }
65 66
66 public int revisionCount() { 67 public int revisionCount() {
67 initOutline(); 68 initOutline();
68 return index.size(); 69 return index.size();
69 } 70 }
71
72 private final int REVLOGV1_RECORD_SIZE = 64;
70 73
71 // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg 74 // should be possible to use TIP, ALL, or -1, -2, -n notation of Hg
72 // ? boolean needsNodeid 75 // ? boolean needsNodeid
73 public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) { 76 public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) {
74 initOutline(); 77 initOutline();
88 if (end < start || end >= indexSize) { 91 if (end < start || end >= indexSize) {
89 throw new IllegalArgumentException("Bad right range boundary " + end); 92 throw new IllegalArgumentException("Bad right range boundary " + end);
90 } 93 }
91 // XXX may cache [start .. end] from index with a single read (pre-read) 94 // XXX may cache [start .. end] from index with a single read (pre-read)
92 95
93 DataInput diIndex = null, diData = null; 96 DataAccess daIndex = null, daData = null;
94 diIndex = getIndexStream(); 97 daIndex = getIndexStream();
95 if (needData && !inline) { 98 if (needData && !inline) {
96 diData = getDataStream(); 99 daData = getDataStream();
97 } 100 }
98 try { 101 try {
99 byte[] lastData = null; 102 byte[] lastData = null;
100 int i; 103 int i;
101 boolean extraReadsToBaseRev = false; 104 boolean extraReadsToBaseRev = false;
103 i = index.get(start).baseRevision; 106 i = index.get(start).baseRevision;
104 extraReadsToBaseRev = true; 107 extraReadsToBaseRev = true;
105 } else { 108 } else {
106 i = start; 109 i = start;
107 } 110 }
108 diIndex.skipBytes(inline ? (int) index.get(i).offset : start * 64); 111
112 daIndex.seek(inline ? (int) index.get(i).offset : start * REVLOGV1_RECORD_SIZE);
109 for (; i <= end; i++ ) { 113 for (; i <= end; i++ ) {
110 long l = diIndex.readLong(); 114 long l = daIndex.readLong();
111 long offset = l >>> 16; 115 long offset = l >>> 16;
112 int flags = (int) (l & 0X0FFFF); 116 int flags = (int) (l & 0X0FFFF);
113 int compressedLen = diIndex.readInt(); 117 int compressedLen = daIndex.readInt();
114 int actualLen = diIndex.readInt(); 118 int actualLen = daIndex.readInt();
115 int baseRevision = diIndex.readInt(); 119 int baseRevision = daIndex.readInt();
116 int linkRevision = diIndex.readInt(); 120 int linkRevision = daIndex.readInt();
117 int parent1Revision = diIndex.readInt(); 121 int parent1Revision = daIndex.readInt();
118 int parent2Revision = diIndex.readInt(); 122 int parent2Revision = daIndex.readInt();
119 byte[] buf = new byte[32]; 123 byte[] buf = new byte[32];
120 // XXX Hg keeps 12 last bytes empty, we move them into front here 124 // XXX Hg keeps 12 last bytes empty, we move them into front here
121 diIndex.readFully(buf, 12, 20); 125 daIndex.readBytes(buf, 12, 20);
122 diIndex.skipBytes(12); 126 daIndex.skip(12);
123 byte[] data = null; 127 byte[] data = null;
124 if (needData) { 128 if (needData) {
125 byte[] dataBuf = new byte[compressedLen]; 129 byte[] dataBuf = new byte[compressedLen];
126 if (inline) { 130 if (inline) {
127 diIndex.readFully(dataBuf); 131 daIndex.readBytes(dataBuf, 0, compressedLen);
128 } else { 132 } else {
129 diData.skipBytes((int) index.get(i).offset); // FIXME not skip but seek!!! (skip would work only for the first time) 133 daData.seek(index.get(i).offset);
130 diData.readFully(dataBuf); 134 daData.readBytes(dataBuf, 0, compressedLen);
131 } 135 }
132 if (dataBuf[0] == 0x78 /* 'x' */) { 136 if (dataBuf[0] == 0x78 /* 'x' */) {
133 try { 137 try {
134 Inflater zlib = new Inflater(); 138 Inflater zlib = new Inflater();
135 zlib.setInput(dataBuf, 0, compressedLen); 139 zlib.setInput(dataBuf, 0, compressedLen);
167 byte[] baseRevContent = lastData; 171 byte[] baseRevContent = lastData;
168 data = apply(baseRevContent, actualLen, patches); 172 data = apply(baseRevContent, actualLen, patches);
169 } 173 }
170 } else { 174 } else {
171 if (inline) { 175 if (inline) {
172 diIndex.skipBytes(compressedLen); 176 daIndex.skip(compressedLen);
173 } 177 }
174 } 178 }
175 if (!extraReadsToBaseRev || i >= start) { 179 if (!extraReadsToBaseRev || i >= start) {
176 inspector.next(i, actualLen, baseRevision, linkRevision, parent1Revision, parent2Revision, buf, data); 180 inspector.next(i, actualLen, baseRevision, linkRevision, parent1Revision, parent2Revision, buf, data);
177 } 181 }
178 lastData = data; 182 lastData = data;
179 } 183 }
180 } catch (EOFException ex) {
181 // should not happen as long as we read inside known boundaries
182 throw new IllegalStateException(ex);
183 } catch (IOException ex) { 184 } catch (IOException ex) {
184 throw new IllegalStateException(ex); // FIXME need better handling 185 throw new IllegalStateException(ex); // FIXME need better handling
185 } finally { 186 } finally {
186 hackCloseFileStreams(diIndex, diData); // FIXME HACK!!! 187 daIndex.done();
188 if (daData != null) {
189 daData.done();
190 }
187 } 191 }
188 } 192 }
189 193
190 private void initOutline() { 194 private void initOutline() {
191 if (index != null && !index.isEmpty()) { 195 if (index != null && !index.isEmpty()) {
192 return; 196 return;
193 } 197 }
194 ArrayList<IndexEntry> res = new ArrayList<IndexEntry>(); 198 ArrayList<IndexEntry> res = new ArrayList<IndexEntry>();
195 DataInput di = getIndexStream(); 199 DataAccess da = getIndexStream();
196 try { 200 try {
197 int versionField = di.readInt(); 201 int versionField = da.readInt();
198 di.readInt(); // just to skip next 2 bytes of offset + flags 202 da.readInt(); // just to skip next 2 bytes of offset + flags
199 final int INLINEDATA = 1 << 16; 203 final int INLINEDATA = 1 << 16;
200 inline = (versionField & INLINEDATA) != 0; 204 inline = (versionField & INLINEDATA) != 0;
201 long offset = 0; // first offset is always 0, thus Hg uses it for other purposes 205 long offset = 0; // first offset is always 0, thus Hg uses it for other purposes
202 while(true) { // EOFExcepiton should get us outta here. FIXME Our inputstream should has explicit no-more-data indicator 206 while(true) {
203 int compressedLen = di.readInt(); 207 int compressedLen = da.readInt();
204 // 8+4 = 12 bytes total read here 208 // 8+4 = 12 bytes total read here
205 int actualLen = di.readInt(); 209 int actualLen = da.readInt();
206 int baseRevision = di.readInt(); 210 int baseRevision = da.readInt();
207 // 12 + 8 = 20 bytes read here 211 // 12 + 8 = 20 bytes read here
208 // int linkRevision = di.readInt(); 212 // int linkRevision = di.readInt();
209 // int parent1Revision = di.readInt(); 213 // int parent1Revision = di.readInt();
210 // int parent2Revision = di.readInt(); 214 // int parent2Revision = di.readInt();
211 // byte[] nodeid = new byte[32]; 215 // byte[] nodeid = new byte[32];
212 if (inline) { 216 if (inline) {
213 res.add(new IndexEntry(offset + 64*res.size(), baseRevision)); 217 res.add(new IndexEntry(offset + REVLOGV1_RECORD_SIZE * res.size(), baseRevision));
214 di.skipBytes(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size) 218 da.skip(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size)
215 } else { 219 } else {
216 res.add(new IndexEntry(offset, baseRevision)); 220 res.add(new IndexEntry(offset, baseRevision));
217 di.skipBytes(3*4 + 32); 221 da.skip(3*4 + 32);
218 } 222 }
219 long l = di.readLong(); 223 if (da.nonEmpty()) {
220 offset = l >>> 16; 224 long l = da.readLong();
221 } 225 offset = l >>> 16;
222 } catch (EOFException ex) { 226 } else {
223 // fine, done then 227 // fine, done then
224 index = res; 228 index = res;
229 break;
230 }
231 }
225 } catch (IOException ex) { 232 } catch (IOException ex) {
226 ex.printStackTrace(); 233 ex.printStackTrace(); // log error
227 // too bad, no outline then 234 // too bad, no outline then.
228 index = Collections.emptyList(); 235 index = Collections.emptyList();
229 } 236 } finally {
230 hackCloseFileStreams(di, null); // FIXME HACK!!! 237 da.done();
238 }
239
231 } 240 }
232 241
233 // FIXME HACK to deal with File/FileStream nature of out data source. Won't need this once implement
234 // own DataInput based on bytearray chunks or RandomAccessFile
235 private void hackCloseFileStreams(DataInput index, DataInput data) {
236 try {
237 if (index != null) {
238 ((DataInputStream) index).close();
239 }
240 if (data != null) {
241 ((DataInputStream) data).close();
242 }
243 } catch (IOException ex) {
244 ex.printStackTrace();
245 }
246 }
247
248 242
249 // perhaps, package-local or protected, if anyone else from low-level needs them 243 // perhaps, package-local or protected, if anyone else from low-level needs them
250 // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes? 244 // XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes?
251 private static class IndexEntry { 245 private static class IndexEntry {
252 public final long offset; // for separate .i and .d - copy of index record entry, for inline index - actual offset of the record in the .i file (record entry + revision * record size)) 246 public final long offset; // for separate .i and .d - copy of index record entry, for inline index - actual offset of the record in the .i file (record entry + revision * record size))
289 data = new byte[len]; 283 data = new byte[len];
290 System.arraycopy(src, srcOffset, data, 0, len); 284 System.arraycopy(src, srcOffset, data, 0, len);
291 } 285 }
292 } 286 }
293 287
288 /*package-local*/ class DataAccess {
289 public boolean nonEmpty() {
290 return false;
291 }
292 // absolute positioning
293 public void seek(long offset) throws IOException {
294 throw new UnsupportedOperationException();
295 }
296 // relative positioning
297 public void skip(int bytes) throws IOException {
298 throw new UnsupportedOperationException();
299 }
300 // shall be called once this object no longer needed
301 public void done() {
302 // no-op in this empty implementation
303 }
304 public int readInt() throws IOException {
305 byte[] b = new byte[4];
306 readBytes(b, 0, 4);
307 return b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF);
308 }
309 public long readLong() throws IOException {
310 byte[] b = new byte[8];
311 readBytes(b, 0, 8);
312 int i1 = b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF);
313 int i2 = b[4] << 24 | (b[5] & 0xFF) << 16 | (b[6] & 0xFF) << 8 | (b[7] & 0xFF);
314 return ((long) i1) << 32 | ((long) i2 & 0xFFFFFFFF);
315 }
316 public void readBytes(byte[] buf, int offset, int length) throws IOException {
317 throw new UnsupportedOperationException();
318 }
319 }
320
321 // DOESN'T WORK YET
322 private class MemoryMapFileAccess extends DataAccess {
323 private FileChannel fileChannel;
324 private final long size;
325 private long position = 0;
326
327 public MemoryMapFileAccess(FileChannel fc, long channelSize) {
328 fileChannel = fc;
329 size = channelSize;
330 }
331
332 @Override
333 public void seek(long offset) {
334 position = offset;
335 }
336
337 @Override
338 public void skip(int bytes) throws IOException {
339 position += bytes;
340 }
341
342 private boolean fill() throws IOException {
343 final int BUFFER_SIZE = 8 * 1024;
344 long left = size - position;
345 MappedByteBuffer rv = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < BUFFER_SIZE ? left : BUFFER_SIZE);
346 position += rv.capacity();
347 return rv.hasRemaining();
348 }
349
350 @Override
351 public void done() {
352 if (fileChannel != null) {
353 try {
354 fileChannel.close();
355 } catch (IOException ex) {
356 ex.printStackTrace(); // log debug
357 }
358 fileChannel = null;
359 }
360 }
361 }
362
363 private class FileAccess extends DataAccess {
364 private FileChannel fileChannel;
365 private final long size;
366 private ByteBuffer buffer;
367 private long bufferStartInFile = 0; // offset of this.buffer in the file.
368
369 public FileAccess(FileChannel fc, long channelSize) {
370 fileChannel = fc;
371 size = channelSize;
372 final int BUFFER_SIZE = 8 * 1024;
373 // XXX once implementation is more or less stable,
374 // may want to try ByteBuffer.allocateDirect() to see
375 // if there's any performance gain.
376 buffer = ByteBuffer.allocate(size < BUFFER_SIZE ? (int) size : BUFFER_SIZE);
377 buffer.flip(); // or .limit(0) to indicate it's empty
378 }
379
380 @Override
381 public boolean nonEmpty() {
382 return bufferStartInFile + buffer.position() < size;
383 }
384
385 @Override
386 public void seek(long offset) throws IOException {
387 if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) {
388 buffer.position((int) (offset - bufferStartInFile));
389 } else {
390 // out of current buffer, invalidate it (force re-read)
391 // XXX or ever re-read it right away?
392 bufferStartInFile = offset;
393 buffer.clear();
394 buffer.limit(0); // or .flip() to indicate we switch to reading
395 fileChannel.position(offset);
396 }
397 }
398
399 @Override
400 public void skip(int bytes) throws IOException {
401 final int newPos = buffer.position() + bytes;
402 if (newPos >= 0 && newPos < buffer.limit()) {
403 // no need to move file pointer, just rewind/seek buffer
404 buffer.position(newPos);
405 } else {
406 //
407 seek(fileChannel.position()+ bytes);
408 }
409 }
410
411 private boolean fill() throws IOException {
412 if (!buffer.hasRemaining()) {
413 bufferStartInFile += buffer.limit();
414 buffer.clear();
415 if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1
416 fileChannel.read(buffer);
417 // may return -1 when EOF, but empty will reflect this, hence no explicit support here
418 }
419 buffer.flip();
420 }
421 return buffer.hasRemaining();
422 }
423
424 @Override
425 public void readBytes(byte[] buf, int offset, int length) throws IOException {
426 final int tail = buffer.remaining();
427 if (tail >= length) {
428 buffer.get(buf, offset, length);
429 } else {
430 buffer.get(buf, offset, tail);
431 if (fill()) {
432 buffer.get(buf, offset + tail, length - tail);
433 } else {
434 throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past nonEmpty() == false are made.
435 }
436 }
437 }
438
439 @Override
440 public void done() {
441 if (buffer != null) {
442 buffer = null;
443 }
444 if (fileChannel != null) {
445 try {
446 fileChannel.close();
447 } catch (IOException ex) {
448 ex.printStackTrace(); // log debug
449 }
450 fileChannel = null;
451 }
452 }
453 }
294 } 454 }