diff src/org/tmatesoft/hg/repo/Revlog.java @ 157:d5268ca7715b

Merged branch wrap-data-access into default for resource-friendly data access. Updated API to promote that friendliness to clients (channels, not byte[]). More exceptions
author Artem Tikhomirov <tikhomirov.artem@gmail.com>
date Wed, 09 Mar 2011 05:22:17 +0100
parents src/com/tmate/hgkit/ll/Revlog.java@9429c7bd1920 src/com/tmate/hgkit/ll/Revlog.java@3959bffb14e9
children 2c3e96674e2a
line wrap: on
line diff
--- a/src/org/tmatesoft/hg/repo/Revlog.java	Wed Mar 02 01:06:09 2011 +0100
+++ b/src/org/tmatesoft/hg/repo/Revlog.java	Wed Mar 09 05:22:17 2011 +0100
@@ -19,6 +19,8 @@
 import static org.tmatesoft.hg.repo.HgRepository.BAD_REVISION;
 import static org.tmatesoft.hg.repo.HgRepository.TIP;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,12 +29,24 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.tmatesoft.hg.core.HgBadStateException;
+import org.tmatesoft.hg.core.HgException;
 import org.tmatesoft.hg.core.Nodeid;
+import org.tmatesoft.hg.internal.DataAccess;
 import org.tmatesoft.hg.internal.RevlogStream;
+import org.tmatesoft.hg.util.ByteChannel;
+import org.tmatesoft.hg.util.CancelSupport;
+import org.tmatesoft.hg.util.CancelledException;
+import org.tmatesoft.hg.util.ProgressSupport;
 
 
 /**
- *
+ * Base class for all Mercurial entities that are serialized in a so called revlog format (changelog, manifest, data files).
+ * 
+ * Implementation note:
+ * Hides actual actual revlog stream implementation and its access methods (i.e. RevlogStream.Inspector), iow shall not expose anything internal
+ * in public methods.
+ *   
  * @author Artem Tikhomirov
  * @author TMate Software Ltd.
  */
@@ -100,22 +114,21 @@
 	 * Access to revision data as is (decompressed, but otherwise unprocessed, i.e. not parsed for e.g. changeset or manifest entries) 
 	 * @param nodeid
 	 */
-	public byte[] content(Nodeid nodeid) {
-		return content(getLocalRevision(nodeid));
+	protected void rawContent(Nodeid nodeid, ByteChannel sink) throws HgException, IOException, CancelledException {
+		rawContent(getLocalRevision(nodeid), sink);
 	}
 	
 	/**
 	 * @param revision - repo-local index of this file change (not a changelog revision number!)
 	 */
-	public byte[] content(int revision) {
-		final byte[][] dataPtr = new byte[1][];
-		RevlogStream.Inspector insp = new RevlogStream.Inspector() {
-			public void next(int revisionNumber, int actualLen, int baseRevision, int linkRevision, int parent1Revision, int parent2Revision, byte[] nodeid, byte[] data) {
-				dataPtr[0] = data;
-			}
-		};
+	protected void rawContent(int revision, ByteChannel sink) throws HgException, IOException, CancelledException {
+		if (sink == null) {
+			throw new IllegalArgumentException();
+		}
+		ContentPipe insp = new ContentPipe(sink, 0);
+		insp.checkCancelled();
 		content.iterate(revision, revision, true, insp);
-		return dataPtr[0];
+		insp.checkFailed();
 	}
 
 	/**
@@ -145,7 +158,7 @@
 			public int p2 = -1;
 			public byte[] nodeid;
 			
-			public void next(int revisionNumber, int actualLen, int baseRevision, int linkRevision, int parent1Revision, int parent2Revision, byte[] nodeid, byte[] data) {
+			public void next(int revisionNumber, int actualLen, int baseRevision, int linkRevision, int parent1Revision, int parent2Revision, byte[] nodeid, DataAccess da) {
 				p1 = parent1Revision;
 				p2 = parent2Revision;
 				this.nodeid = new byte[20];
@@ -203,7 +216,7 @@
 			RevlogStream.Inspector insp = new RevlogStream.Inspector() {
 				final Nodeid[] sequentialRevisionNodeids = new Nodeid[revisionCount];
 				int ix = 0;
-				public void next(int revisionNumber, int actualLen, int baseRevision, int linkRevision, int parent1Revision, int parent2Revision, byte[] nodeid, byte[] data) {
+				public void next(int revisionNumber, int actualLen, int baseRevision, int linkRevision, int parent1Revision, int parent2Revision, byte[] nodeid, DataAccess da) {
 					if (ix != revisionNumber) {
 						// XXX temp code, just to make sure I understand what's going on here
 						throw new IllegalStateException();
@@ -267,4 +280,79 @@
 			return modified;
 		}
 	}
+
+	protected static class ContentPipe implements RevlogStream.Inspector, CancelSupport {
+		private final ByteChannel sink;
+		private final CancelSupport cancelSupport;
+		private Exception failure;
+		private final int offset;
+
+		/**
+		 * @param _sink - cannot be <code>null</code>
+		 * @param seekOffset - when positive, orders to pipe bytes to the sink starting from specified offset, not from the first byte available in DataAccess
+		 */
+		public ContentPipe(ByteChannel _sink, int seekOffset) {
+			assert _sink != null;
+			sink = _sink;
+			cancelSupport = CancelSupport.Factory.get(_sink);
+			offset = seekOffset;
+		}
+		
+		protected void prepare(int revisionNumber, DataAccess da) throws HgException, IOException {
+			if (offset > 0) { // save few useless reset/rewind operations
+				da.seek(offset);
+			}
+		}
+
+		public void next(int revisionNumber, int actualLen, int baseRevision, int linkRevision, int parent1Revision, int parent2Revision, byte[] nodeid, DataAccess da) {
+			try {
+				prepare(revisionNumber, da); // XXX perhaps, prepare shall return DA (sliced, if needed)
+				final ProgressSupport progressSupport = ProgressSupport.Factory.get(sink);
+				ByteBuffer buf = ByteBuffer.allocate(512);
+				progressSupport.start(da.length());
+				while (!da.isEmpty()) {
+					cancelSupport.checkCancelled();
+					da.readBytes(buf);
+					buf.flip();
+					// XXX I may not rely on returned number of bytes but track change in buf position instead.
+					int consumed = sink.write(buf); 
+					// FIXME in fact, bad sink implementation (that consumes no bytes) would result in endless loop. Need to account for this 
+					buf.compact();
+					progressSupport.worked(consumed);
+				}
+				progressSupport.done(); // XXX shall specify whether #done() is invoked always or only if completed successfully.
+			} catch (IOException ex) {
+				recordFailure(ex);
+			} catch (CancelledException ex) {
+				recordFailure(ex);
+			} catch (HgException ex) {
+				recordFailure(ex);
+			}
+		}
+		
+		public void checkCancelled() throws CancelledException {
+			cancelSupport.checkCancelled();
+		}
+
+		protected void recordFailure(Exception ex) {
+			assert failure == null;
+			failure = ex;
+		}
+
+		public void checkFailed() throws HgException, IOException, CancelledException {
+			if (failure == null) {
+				return;
+			}
+			if (failure instanceof IOException) {
+				throw (IOException) failure;
+			}
+			if (failure instanceof CancelledException) {
+				throw (CancelledException) failure;
+			}
+			if (failure instanceof HgException) {
+				throw (HgException) failure;
+			}
+			throw new HgBadStateException(failure);
+		}
+	}
 }