changeset 9:d6d2a630f4a6

Access to underlaying file data wrapped into own Access object, implemented with FileChannel and ByteBuffer
author Artem Tikhomirov <tikhomirov.artem@gmail.com>
date Sat, 25 Dec 2010 04:45:59 +0100
parents a78c980749e3
children 382cfe9463db
files build.xml design.txt src/com/tmate/hgkit/ll/DigestHelper.java src/com/tmate/hgkit/ll/HgRepository.java src/com/tmate/hgkit/ll/LocalHgRepo.java src/com/tmate/hgkit/ll/Nodeid.java src/com/tmate/hgkit/ll/RevlogIndexStreamAccess.java src/com/tmate/hgkit/ll/RevlogStream.java
diffstat 8 files changed, 332 insertions(+), 98 deletions(-) [+]
line wrap: on
line diff
--- a/build.xml	Thu Dec 23 01:31:40 2010 +0100
+++ b/build.xml	Sat Dec 25 04:45:59 2010 +0100
@@ -6,9 +6,9 @@
 
     <target name="samples" depends="build">
     	
-    	<echo message="History of a specific file"/>
+    	<echo message="History of a specific file(s)"/>
     	<java classpath="hgkit.jar" classname="com.tmate.hgkit.console.Log">
-    		<arg line="design.txt"/>
+    		<arg line="design.txt .classpath src/com/tmate/hgkit/ll/LocalHgRepo.java"/>
     	</java>
     	
     	<echo message="Whole repo log"/>
@@ -16,7 +16,7 @@
     	
     	<echo message="Content of a file"/>
     	<java classpath="hgkit.jar" classname="com.tmate.hgkit.console.Cat">
-    		<arg line="design.txt"/>
+    		<arg line="src/com/tmate/hgkit/ll/Revlog.java"/>
     	</java>
 
     </target>
--- a/design.txt	Thu Dec 23 01:31:40 2010 +0100
+++ b/design.txt	Sat Dec 25 04:45:59 2010 +0100
@@ -23,8 +23,23 @@
 + support patch from baseRev + few deltas (although done in a way patches are applied one by one instead of accumulated)
 + command-line samples (-R, filenames) (Log & Cat) to show on any repo
 +buildfile + run samples
+*input stream impl + lifecycle. Step forward with FileChannel and ByteBuffer, although questionable accomplishment (looks bit complicated, cumbersome)
 
-input stream impl + lifecycle
+calculate sha1 digest for file to see I can deal with nodeid
 delta merge
 Changeset to get index (local revision number)
 
+
+
+>>>> Effective file read/data access
+ReadOperation, Revlog does: repo.getFileSystem().run(this.file, new ReadOperation(), long start=0, long end = -1)
+ReadOperation gets buffer (of whatever size, as decided by FS impl), parses it and then  reports if needs more data.
+This helps to ensure streams are closed after reading, allows caching (if the same file (or LRU) is read few times in sequence)
+and allows buffer management (i.e. reuse. Single buffer for all reads). 
+Scheduling multiple operations (in future, to deal with writes - single queue for FS operations - no locks?)
+
+File access:
+* NIO and mapped files - should be fast. Although seems to give less control on mem usage. 
+* Regular InputStreams and  chunked stream on top - allocate List<byte[]>, each (but last) chunk of fixed size (depending on initial file size) 
+
+<<<<<
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/com/tmate/hgkit/ll/DigestHelper.java	Sat Dec 25 04:45:59 2010 +0100
@@ -0,0 +1,62 @@
+/**
+ * Copyright (c) 2010 Artem Tikhomirov 
+ */
+package com.tmate.hgkit.ll;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ *
+ * @author artem
+ */
+public class DigestHelper {
+	private MessageDigest sha1;
+
+	public DigestHelper() {
+	}
+	
+	private MessageDigest getSHA1() {
+		if (sha1 == null) {
+			try {
+				sha1 = MessageDigest.getInstance("SHA-1");
+			} catch (NoSuchAlgorithmException ex) {
+				// could hardly happen, JDK from Sun always has sha1.
+				ex.printStackTrace(); // FIXME log error
+			}
+		}
+		return sha1;
+	}
+
+	// XXX perhaps, digest functions should throw an exception, as it's caller responsibility to deal with eof, etc
+	public String sha1(byte[] data) {
+		MessageDigest alg = getSHA1();
+		byte[] digest = alg.digest(data);
+		assert digest.length == 20;
+		return toHexString(digest, 0, 20);
+	}
+
+	public byte[] sha1(InputStream is /*ByteBuffer*/) throws IOException {
+		MessageDigest alg = getSHA1();
+		byte[] buf = new byte[1024];
+		int c;
+		while ((c = is.read(buf)) != -1) {
+			alg.update(buf, 0, c);
+		}
+		byte[] digest = alg.digest();
+		return digest;
+	}
+
+	public String toHexString(byte[] data, final int offset, final int count) {
+		char[] result = new char[count << 1];
+		final String hexDigits = "0123456789abcdef";
+		final int end = offset+count;
+		for (int i = offset, j = 0; i < end; i++) {
+			result[j++] = hexDigits.charAt((data[i] >>> 4) & 0x0F);
+			result[j++] = hexDigits.charAt(data[i] & 0x0F);
+		}
+		return new String(result);
+	}
+}
--- a/src/com/tmate/hgkit/ll/HgRepository.java	Thu Dec 23 01:31:40 2010 +0100
+++ b/src/com/tmate/hgkit/ll/HgRepository.java	Sat Dec 25 04:45:59 2010 +0100
@@ -30,16 +30,10 @@
 		isInvalid = invalid;
 	}
 
-	public void log() {
-		Revlog clog = getChangelog();
-		assert clog != null;
-		// TODO get data to the client
-	}
-
 	public final Changelog getChangelog() {
 		if (this.changelog == null) {
 			// might want delegate to protected createChangelog() some day
-			RevlogStream content = resolve("store/00changelog.i"); // XXX perhaps, knowledge about filenames should be in LocalHgRepo?
+			RevlogStream content = resolve(toStoragePath("00changelog.i", false)); // XXX perhaps, knowledge about filenames should be in LocalHgRepo?
 			this.changelog = new Changelog(this, content);
 		}
 		return this.changelog;
@@ -61,8 +55,10 @@
 	public abstract String getLocation();
 
 
+	protected abstract String toStoragePath(String path, boolean isData);
+
 	/**
 	 * Perhaps, should be separate interface, like ContentLookup
 	 */
-	protected abstract RevlogStream resolve(String string);
+	protected abstract RevlogStream resolve(String repositoryPath);
 }
--- a/src/com/tmate/hgkit/ll/LocalHgRepo.java	Thu Dec 23 01:31:40 2010 +0100
+++ b/src/com/tmate/hgkit/ll/LocalHgRepo.java	Sat Dec 25 04:45:59 2010 +0100
@@ -94,8 +94,12 @@
 		}
 	}
 
+	// FIXME document what path argument is, whether it includes .i or .d, and whether it's 'normalized' (slashes) or not.
+	// since .hg/store keeps both .i files and files without extension (e.g. fncache), guees, for data == false 
+	// we shall assume path has extension
 	// FIXME much more to be done, see store.py:_hybridencode
 	// @see http://mercurial.selenic.com/wiki/CaseFoldingPlan
+	@Override
 	protected String toStoragePath(String path, boolean data) {
 		path = normalize(path);
 		final String STR_STORE = "store/";
@@ -183,11 +187,12 @@
 	private static char[] toHexByte(int ch, char[] buf) {
 		assert buf.length > 1;
 		final String hexDigits = "0123456789abcdef";
-		buf[0] = hexDigits.charAt((ch & 0x00F0) >> 4);
+		buf[0] = hexDigits.charAt((ch & 0x00F0) >>> 4);
 		buf[1] = hexDigits.charAt(ch & 0x0F);
 		return buf;
 	}
 
+	// TODO handle . and .. (although unlikely to face them from GUI client)
 	private static String normalize(String path) {
 		path = path.replace('\\', '/').replace("//", "/");
 		if (path.startsWith("/")) {
--- a/src/com/tmate/hgkit/ll/Nodeid.java	Thu Dec 23 01:31:40 2010 +0100
+++ b/src/com/tmate/hgkit/ll/Nodeid.java	Sat Dec 25 04:45:59 2010 +0100
@@ -3,8 +3,6 @@
  */
 package com.tmate.hgkit.ll;
 
-import java.math.BigInteger;
-import java.util.Formatter;
 
 /**
  * @see mercurial/node.py
@@ -24,15 +22,13 @@
 
 	@Override
 	public String toString() {
-		// FIXME temp impl.
-		// BEWARE, if binaryData[0] > 0x80, BigInteger treats it as negative  
-		return new BigInteger(binaryData).toString();
+		return new DigestHelper().toHexString(binaryData, 0, 20);
 	}
 
 	// binascii.unhexlify()
 	public static Nodeid fromAscii(byte[] asciiRepresentation, int offset, int length) {
 		assert length % 2 == 0; // Python's binascii.hexlify convert each byte into 2 digits
-		byte[] data = new byte[length / 2]; // XXX use known size instead? nodeid is always 20 bytes
+		byte[] data = new byte[length >>> 1]; // XXX use known size instead? nodeid is always 20 bytes
 		for (int i = 0, j = offset; i < data.length; i++) {
 			int hiNibble = Character.digit(asciiRepresentation[j++], 16);
 			int lowNibble = Character.digit(asciiRepresentation[j++], 16);
--- a/src/com/tmate/hgkit/ll/RevlogIndexStreamAccess.java	Thu Dec 23 01:31:40 2010 +0100
+++ b/src/com/tmate/hgkit/ll/RevlogIndexStreamAccess.java	Sat Dec 25 04:45:59 2010 +0100
@@ -24,7 +24,7 @@
 
 	
 	void readRevlogV0Record() throws IOException {
-		DataInput di = stream.getIndexStream();
+		DataInput di = null; //FIXME stream.getIndexStream();
 		int offset = di.readInt();
 		int compressedLen = di.readInt();
 		int baseRevision = di.readInt();
@@ -42,7 +42,7 @@
 	
 	// another subclass?
 	void readRevlogNGRecord() throws IOException {
-		DataInput di = stream.getIndexStream();
+		DataInput di = null; //FIXME stream.getIndexStream();
 		long l = di.readLong();
 		long offset = l >>> 16;
 		int flags = (int) (l & 0X0FFFF);
--- a/src/com/tmate/hgkit/ll/RevlogStream.java	Thu Dec 23 01:31:40 2010 +0100
+++ b/src/com/tmate/hgkit/ll/RevlogStream.java	Sat Dec 25 04:45:59 2010 +0100
@@ -5,14 +5,12 @@
 
 import static com.tmate.hgkit.ll.HgRepository.TIP;
 
-import java.io.BufferedInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -37,30 +35,33 @@
 		this.indexFile = indexFile;
 	}
 
-	private void detectVersion() {
-		
+	/*package*/ DataAccess getIndexStream() {
+		return create(indexFile);
 	}
 
-	/*package*/ DataInput getIndexStream() {
-		DataInputStream dis = null;
-		try {
-			dis = new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile)));
-		} catch (FileNotFoundException ex) {
-			ex.printStackTrace();
-			// should not happen, we checked for existence
-		}
-		return dis;
-	}
-
-	/*package*/ DataInput getDataStream() {
+	/*package*/ DataAccess getDataStream() {
 		final String indexName = indexFile.getName();
 		File dataFile = new File(indexFile.getParentFile(), indexName.substring(0, indexName.length() - 1) + "d");
+		return create(dataFile);
+	}
+	
+	private DataAccess create(File f) {
+		if (!f.exists()) {
+			return new DataAccess();
+		}
 		try {
-			return new DataInputStream(new BufferedInputStream(new FileInputStream(dataFile)));
-		} catch (FileNotFoundException ex) {
-			ex.printStackTrace();
-			return null;
+			FileChannel fc = new FileInputStream(f).getChannel();
+			final int MAPIO_MAGIC_BOUNDARY = 100 * 1024;
+			if (fc.size() > MAPIO_MAGIC_BOUNDARY) {
+				return new MemoryMapFileAccess(fc, fc.size());
+			} else {
+				return new FileAccess(fc, fc.size());
+			}
+		} catch (IOException ex) {
+			// unlikely to happen, we've made sure file exists.
+			ex.printStackTrace(); // FIXME log error
 		}
+		return new DataAccess(); // non-null, empty.
 	}
 
 	public int revisionCount() {
@@ -68,6 +69,8 @@
 		return index.size();
 	}
 
+	private final int REVLOGV1_RECORD_SIZE = 64;
+
 	// should be possible to use TIP, ALL, or -1, -2, -n notation of Hg
 	// ? boolean needsNodeid
 	public void iterate(int start, int end, boolean needData, Revlog.Inspector inspector) {
@@ -90,10 +93,10 @@
 		}
 		// XXX may cache [start .. end] from index with a single read (pre-read)
 		
-		DataInput diIndex = null, diData = null;
-		diIndex = getIndexStream();
+		DataAccess daIndex = null, daData = null;
+		daIndex = getIndexStream();
 		if (needData && !inline) {
-			diData = getDataStream();
+			daData = getDataStream();
 		}
 		try {
 			byte[] lastData = null;
@@ -105,29 +108,30 @@
 			} else {
 				i = start;
 			}
-			diIndex.skipBytes(inline ? (int) index.get(i).offset : start * 64);
+			
+			daIndex.seek(inline ? (int) index.get(i).offset : start * REVLOGV1_RECORD_SIZE);
 			for (; i <= end; i++ ) {
-				long l = diIndex.readLong();
+				long l = daIndex.readLong();
 				long offset = l >>> 16;
 				int flags = (int) (l & 0X0FFFF);
-				int compressedLen = diIndex.readInt();
-				int actualLen = diIndex.readInt();
-				int baseRevision = diIndex.readInt();
-				int linkRevision = diIndex.readInt();
-				int parent1Revision = diIndex.readInt();
-				int parent2Revision = diIndex.readInt();
+				int compressedLen = daIndex.readInt();
+				int actualLen = daIndex.readInt();
+				int baseRevision = daIndex.readInt();
+				int linkRevision = daIndex.readInt();
+				int parent1Revision = daIndex.readInt();
+				int parent2Revision = daIndex.readInt();
 				byte[] buf = new byte[32];
 				// XXX Hg keeps 12 last bytes empty, we move them into front here
-				diIndex.readFully(buf, 12, 20);
-				diIndex.skipBytes(12);
+				daIndex.readBytes(buf, 12, 20);
+				daIndex.skip(12);
 				byte[] data = null;
 				if (needData) {
 					byte[] dataBuf = new byte[compressedLen];
 					if (inline) {
-						diIndex.readFully(dataBuf);
+						daIndex.readBytes(dataBuf, 0, compressedLen);
 					} else {
-						diData.skipBytes((int) index.get(i).offset); // FIXME not skip but seek!!! (skip would work only for the first time)
-						diData.readFully(dataBuf);
+						daData.seek(index.get(i).offset);
+						daData.readBytes(dataBuf, 0, compressedLen);
 					}
 					if (dataBuf[0] == 0x78 /* 'x' */) {
 						try {
@@ -169,7 +173,7 @@
 					}
 				} else {
 					if (inline) {
-						diIndex.skipBytes(compressedLen);
+						daIndex.skip(compressedLen);
 					}
 				}
 				if (!extraReadsToBaseRev || i >= start) {
@@ -177,13 +181,13 @@
 				}
 				lastData = data;
 			}
-		} catch (EOFException ex) {
-			// should not happen as long as we read inside known boundaries
-			throw new IllegalStateException(ex);
 		} catch (IOException ex) {
 			throw new IllegalStateException(ex); // FIXME need better handling
 		} finally {
-			hackCloseFileStreams(diIndex, diData); // FIXME HACK!!!
+			daIndex.done();
+			if (daData != null) {
+				daData.done();
+			}
 		}
 	}
 	
@@ -192,59 +196,49 @@
 			return;
 		}
 		ArrayList<IndexEntry> res = new ArrayList<IndexEntry>();
-		DataInput di = getIndexStream();
+		DataAccess da = getIndexStream();
 		try {
-			int versionField = di.readInt();
-			di.readInt(); // just to skip next 2 bytes of offset + flags
+			int versionField = da.readInt();
+			da.readInt(); // just to skip next 2 bytes of offset + flags
 			final int INLINEDATA = 1 << 16;
 			inline = (versionField & INLINEDATA) != 0;
 			long offset = 0; // first offset is always 0, thus Hg uses it for other purposes
-			while(true) { // EOFExcepiton should get us outta here. FIXME Our inputstream should has explicit no-more-data indicator
-				int compressedLen = di.readInt();
+			while(true) {
+				int compressedLen = da.readInt();
 				// 8+4 = 12 bytes total read here
-				int actualLen = di.readInt();
-				int baseRevision = di.readInt();
+				int actualLen = da.readInt();
+				int baseRevision = da.readInt();
 				// 12 + 8 = 20 bytes read here
 //				int linkRevision = di.readInt();
 //				int parent1Revision = di.readInt();
 //				int parent2Revision = di.readInt();
 //				byte[] nodeid = new byte[32];
 				if (inline) {
-					res.add(new IndexEntry(offset + 64*res.size(), baseRevision));
-					di.skipBytes(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size)
+					res.add(new IndexEntry(offset + REVLOGV1_RECORD_SIZE * res.size(), baseRevision));
+					da.skip(3*4 + 32 + compressedLen); // Check: 44 (skip) + 20 (read) = 64 (total RevlogNG record size)
 				} else {
 					res.add(new IndexEntry(offset, baseRevision));
-					di.skipBytes(3*4 + 32);
+					da.skip(3*4 + 32);
 				}
-				long l = di.readLong();
-				offset = l >>> 16;
+				if (da.nonEmpty()) {
+					long l = da.readLong();
+					offset = l >>> 16;
+				} else {
+					// fine, done then
+					index = res;
+					break;
+				}
 			}
-		} catch (EOFException ex) {
-			// fine, done then
-			index = res;
 		} catch (IOException ex) {
-			ex.printStackTrace();
-			// too bad, no outline then
+			ex.printStackTrace(); // log error
+			// too bad, no outline then.
 			index = Collections.emptyList();
+		} finally {
+			da.done();
 		}
-		hackCloseFileStreams(di, null); // FIXME HACK!!!
+		
 	}
 	
-	// FIXME HACK to deal with File/FileStream nature of out data source. Won't need this once implement
-	// own DataInput based on bytearray chunks or RandomAccessFile
-	private void hackCloseFileStreams(DataInput index, DataInput data) {
-		try {
-			if (index != null) {
-				((DataInputStream) index).close();
-			}
-			if (data != null) {
-				((DataInputStream) data).close();
-			}
-		} catch (IOException ex) {
-			ex.printStackTrace();
-		}
-	}
-
 
 	// perhaps, package-local or protected, if anyone else from low-level needs them
 	// XXX think over if we should keep offset in case of separate data file - we read the field anyway. Perhaps, distinct entry classes for Inline and non-inline indexes?
@@ -291,4 +285,170 @@
 		}
 	}
 
+	/*package-local*/ class DataAccess {
+		public boolean nonEmpty() {
+			return false;
+		}
+		// absolute positioning
+		public void seek(long offset) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+		// relative positioning
+		public void skip(int bytes) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+		// shall be called once this object no longer needed
+		public void done() {
+			// no-op in this empty implementation
+		}
+		public int readInt() throws IOException {
+			byte[] b = new byte[4];
+			readBytes(b, 0, 4);
+			return b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF);
+		}
+		public long readLong() throws IOException {
+			byte[] b = new byte[8];
+			readBytes(b, 0, 8);
+			int i1 = b[0] << 24 | (b[1] & 0xFF) << 16 | (b[2] & 0xFF) << 8 | (b[3] & 0xFF);
+			int i2 = b[4] << 24 | (b[5] & 0xFF) << 16 | (b[6] & 0xFF) << 8 | (b[7] & 0xFF);
+			return ((long) i1) << 32 | ((long) i2 & 0xFFFFFFFF);
+		}
+		public void readBytes(byte[] buf, int offset, int length) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	// DOESN'T WORK YET 
+	private class MemoryMapFileAccess extends DataAccess {
+		private FileChannel fileChannel;
+		private final long size;
+		private long position = 0;
+
+		public MemoryMapFileAccess(FileChannel fc, long channelSize) {
+			fileChannel = fc;
+			size = channelSize;
+		}
+
+		@Override
+		public void seek(long offset) {
+			position = offset;
+		}
+
+		@Override
+		public void skip(int bytes) throws IOException {
+			position += bytes;
+		}
+
+		private boolean fill() throws IOException {
+			final int BUFFER_SIZE = 8 * 1024;
+			long left = size - position;
+			MappedByteBuffer rv = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < BUFFER_SIZE ? left : BUFFER_SIZE);
+			position += rv.capacity();
+			return rv.hasRemaining();
+		}
+
+		@Override
+		public void done() {
+			if (fileChannel != null) {
+				try {
+					fileChannel.close();
+				} catch (IOException ex) {
+					ex.printStackTrace(); // log debug
+				}
+				fileChannel = null;
+			}
+		}
+	}
+
+	private class FileAccess extends DataAccess {
+		private FileChannel fileChannel;
+		private final long size;
+		private ByteBuffer buffer;
+		private long bufferStartInFile = 0; // offset of this.buffer in the file.
+
+		public FileAccess(FileChannel fc, long channelSize) {
+			fileChannel = fc;
+			size = channelSize;
+			final int BUFFER_SIZE = 8 * 1024;
+			// XXX once implementation is more or less stable,
+			// may want to try ByteBuffer.allocateDirect() to see
+			// if there's any performance gain.
+			buffer = ByteBuffer.allocate(size < BUFFER_SIZE ? (int) size : BUFFER_SIZE);
+			buffer.flip(); // or .limit(0) to indicate it's empty
+		}
+		
+		@Override
+		public boolean nonEmpty() {
+			return bufferStartInFile + buffer.position() < size;
+		}
+		
+		@Override
+		public void seek(long offset) throws IOException {
+			if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) {
+				buffer.position((int) (offset - bufferStartInFile));
+			} else {
+				// out of current buffer, invalidate it (force re-read) 
+				// XXX or ever re-read it right away?
+				bufferStartInFile = offset;
+				buffer.clear();
+				buffer.limit(0); // or .flip() to indicate we switch to reading
+				fileChannel.position(offset);
+			}
+		}
+
+		@Override
+		public void skip(int bytes) throws IOException {
+			final int newPos = buffer.position() + bytes;
+			if (newPos >= 0 && newPos < buffer.limit()) {
+				// no need to move file pointer, just rewind/seek buffer 
+				buffer.position(newPos);
+			} else {
+				//
+				seek(fileChannel.position()+ bytes);
+			}
+		}
+
+		private boolean fill() throws IOException {
+			if (!buffer.hasRemaining()) {
+				bufferStartInFile += buffer.limit();
+				buffer.clear();
+				if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 
+					fileChannel.read(buffer);
+					// may return -1 when EOF, but empty will reflect this, hence no explicit support here   
+				}
+				buffer.flip();
+			}
+			return buffer.hasRemaining();
+		}
+
+		@Override
+		public void readBytes(byte[] buf, int offset, int length) throws IOException {
+			final int tail = buffer.remaining();
+			if (tail >= length) {
+				buffer.get(buf, offset, length);
+			} else {
+				buffer.get(buf, offset, tail);
+				if (fill()) {
+					buffer.get(buf, offset + tail, length - tail);
+				} else {
+					throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past nonEmpty() == false are made. 
+				}
+			}
+		}
+
+		@Override
+		public void done() {
+			if (buffer != null) {
+				buffer = null;
+			}
+			if (fileChannel != null) {
+				try {
+					fileChannel.close();
+				} catch (IOException ex) {
+					ex.printStackTrace(); // log debug
+				}
+				fileChannel = null;
+			}
+		}
+	}
 }