tikhomirov@10: /*
tikhomirov@584:  * Copyright (c) 2010-2013 TMate Software Ltd
tikhomirov@74:  *  
tikhomirov@74:  * This program is free software; you can redistribute it and/or modify
tikhomirov@74:  * it under the terms of the GNU General Public License as published by
tikhomirov@74:  * the Free Software Foundation; version 2 of the License.
tikhomirov@74:  *
tikhomirov@74:  * This program is distributed in the hope that it will be useful,
tikhomirov@74:  * but WITHOUT ANY WARRANTY; without even the implied warranty of
tikhomirov@74:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
tikhomirov@74:  * GNU General Public License for more details.
tikhomirov@74:  *
tikhomirov@74:  * For information on how to redistribute this software under
tikhomirov@74:  * the terms of a license other than GNU General Public License
tikhomirov@102:  * contact TMate Software at support@hg4j.com
tikhomirov@10:  */
tikhomirov@74: package org.tmatesoft.hg.internal;
tikhomirov@10: 
tikhomirov@456: import static org.tmatesoft.hg.util.LogFacility.Severity.Error;
tikhomirov@456: import static org.tmatesoft.hg.util.LogFacility.Severity.Warn;
tikhomirov@456: 
tikhomirov@10: import java.io.File;
tikhomirov@10: import java.io.FileInputStream;
tikhomirov@534: import java.io.FileOutputStream;
tikhomirov@10: import java.io.IOException;
tikhomirov@10: import java.nio.ByteBuffer;
tikhomirov@10: import java.nio.MappedByteBuffer;
tikhomirov@10: import java.nio.channels.FileChannel;
tikhomirov@10: 
tikhomirov@617: import org.tmatesoft.hg.core.HgIOException;
tikhomirov@295: import org.tmatesoft.hg.core.SessionContext;
tikhomirov@425: import org.tmatesoft.hg.util.LogFacility;
tikhomirov@158: 
tikhomirov@10: /**
tikhomirov@74:  * 
tikhomirov@74:  * @author Artem Tikhomirov
tikhomirov@74:  * @author TMate Software Ltd.
tikhomirov@10:  */
tikhomirov@10: public class DataAccessProvider {
tikhomirov@338: 	/**
tikhomirov@338: 	 * Boundary to start using file memory mapping instead of regular file access, in bytes.  
tikhomirov@338: 	 * Set to 0 to indicate mapping files into memory shall not be used.
tikhomirov@338: 	 * If set to -1, file of any size would be mapped in memory.
tikhomirov@338: 	 */
tikhomirov@338: 	public static final String CFG_PROPERTY_MAPIO_LIMIT				= "hg4j.dap.mapio_limit";
tikhomirov@338: 	public static final String CFG_PROPERTY_MAPIO_BUFFER_SIZE		= "hg4j.dap.mapio_buffer";
tikhomirov@338: 	public static final String CFG_PROPERTY_FILE_BUFFER_SIZE		= "hg4j.dap.file_buffer";
tikhomirov@425: 	
tikhomirov@425: 	private static final int DEFAULT_MAPIO_LIMIT = 100 * 1024; // 100 kB
tikhomirov@425: 	private static final int DEFAULT_FILE_BUFFER =   8 * 1024; // 8 kB
tikhomirov@425: 	private static final int DEFAULT_MAPIO_BUFFER = DEFAULT_MAPIO_LIMIT; // same as default boundary
tikhomirov@10: 
tikhomirov@10: 	private final int mapioMagicBoundary;
tikhomirov@456: 	private final int bufferSize, mapioBufSize;
tikhomirov@295: 	private final SessionContext context;
tikhomirov@584: 	
tikhomirov@295: 	public DataAccessProvider(SessionContext ctx) {
tikhomirov@456: 		context = ctx;
tikhomirov@456: 		PropertyMarshal pm = new PropertyMarshal(ctx);
tikhomirov@494: 		mapioMagicBoundary = mapioBoundaryValue(pm.getInt(CFG_PROPERTY_MAPIO_LIMIT, DEFAULT_MAPIO_LIMIT));
tikhomirov@456: 		bufferSize = pm.getInt(CFG_PROPERTY_FILE_BUFFER_SIZE, DEFAULT_FILE_BUFFER);
tikhomirov@456: 		mapioBufSize = pm.getInt(CFG_PROPERTY_MAPIO_BUFFER_SIZE, DEFAULT_MAPIO_BUFFER);
tikhomirov@338: 	}
tikhomirov@338: 	
tikhomirov@456: 	public DataAccessProvider(SessionContext ctx, int mapioBoundary, int regularBufferSize, int mapioBufferSize) {
tikhomirov@295: 		context = ctx;
tikhomirov@494: 		mapioMagicBoundary = mapioBoundaryValue(mapioBoundary);
tikhomirov@27: 		bufferSize = regularBufferSize;
tikhomirov@456: 		mapioBufSize = mapioBufferSize;
tikhomirov@584: 	}
tikhomirov@584: 	
tikhomirov@494: 	// ensure contract of CFG_PROPERTY_MAPIO_LIMIT, for mapioBoundary == 0 use MAX_VALUE so that no file is memmap-ed
tikhomirov@494: 	private static int mapioBoundaryValue(int mapioBoundary) {
tikhomirov@494: 		return mapioBoundary == 0 ? Integer.MAX_VALUE : mapioBoundary;
tikhomirov@494: 	}
tikhomirov@494: 
tikhomirov@606: 	public DataAccess createReader(File f, boolean shortRead) {
tikhomirov@10: 		if (!f.exists()) {
tikhomirov@10: 			return new DataAccess();
tikhomirov@10: 		}
tikhomirov@10: 		try {
tikhomirov@619: 			FileInputStream fis = new FileInputStream(f);
tikhomirov@619: 			long flen = f.length();
tikhomirov@606: 			if (!shortRead && flen > mapioMagicBoundary) {
tikhomirov@26: 				// TESTS: bufLen of 1024 was used to test MemMapFileAccess
tikhomirov@619: 				return new MemoryMapFileAccess(fis, flen, mapioBufSize, context.getLog());
tikhomirov@10: 			} else {
tikhomirov@10: 				// XXX once implementation is more or less stable,
tikhomirov@10: 				// may want to try ByteBuffer.allocateDirect() to see
tikhomirov@10: 				// if there's any performance gain. 
tikhomirov@338: 				boolean useDirectBuffer = false; // XXX might be another config option
tikhomirov@158: 				// TESTS: bufferSize of 100 was used to check buffer underflow states when readBytes reads chunks bigger than bufSize
tikhomirov@619: 				return new FileAccess(fis, flen, bufferSize, useDirectBuffer, context.getLog());
tikhomirov@10: 			}
tikhomirov@10: 		} catch (IOException ex) {
tikhomirov@10: 			// unlikely to happen, we've made sure file exists.
tikhomirov@456: 			context.getLog().dump(getClass(), Error, ex, null);
tikhomirov@10: 		}
tikhomirov@10: 		return new DataAccess(); // non-null, empty.
tikhomirov@10: 	}
tikhomirov@534: 	
tikhomirov@618: 	public DataSerializer createWriter(final Transaction tr, File f, boolean createNewIfDoesntExist) {
tikhomirov@534: 		if (!f.exists() && !createNewIfDoesntExist) {
tikhomirov@534: 			return new DataSerializer();
tikhomirov@534: 		}
tikhomirov@618: 		// TODO invert RevlogStreamWriter to send DataSource here instead of grabbing DataSerializer
tikhomirov@618: 		// to control the moment transaction gets into play and whether it fails or not
tikhomirov@618: 		return new TransactionAwareFileSerializer(tr, f);
tikhomirov@534: 	}
tikhomirov@10: 
tikhomirov@10: 	private static class MemoryMapFileAccess extends DataAccess {
tikhomirov@619: 		private FileInputStream fileStream;
tikhomirov@10: 		private FileChannel fileChannel;
tikhomirov@425: 		private long position = 0; // always points to buffer's absolute position in the file
tikhomirov@425: 		private MappedByteBuffer buffer;
tikhomirov@420: 		private final long size;
tikhomirov@26: 		private final int memBufferSize;
tikhomirov@425: 		private final LogFacility logFacility;
tikhomirov@10: 
tikhomirov@619: 		public MemoryMapFileAccess(FileInputStream fis, long channelSize, int bufferSize, LogFacility log) {
tikhomirov@619: 			fileStream = fis;
tikhomirov@619: 			fileChannel = fis.getChannel();
tikhomirov@10: 			size = channelSize;
tikhomirov@425: 			logFacility = log;
tikhomirov@420: 			memBufferSize = bufferSize > channelSize ? (int) channelSize : bufferSize; // no reason to waste memory more than there's data 
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@10: 		@Override
tikhomirov@26: 		public boolean isEmpty() {
tikhomirov@26: 			return position + (buffer == null ? 0 : buffer.position()) >= size;
tikhomirov@26: 		}
tikhomirov@26: 		
tikhomirov@26: 		@Override
tikhomirov@420: 		public DataAccess reset() throws IOException {
tikhomirov@420: 			longSeek(0);
tikhomirov@420: 			return this;
tikhomirov@420: 		}
tikhomirov@420: 
tikhomirov@420: 		@Override
tikhomirov@158: 		public int length() {
tikhomirov@420: 			return Internals.ltoi(longLength());
tikhomirov@420: 		}
tikhomirov@420: 		
tikhomirov@420: 		@Override
tikhomirov@420: 		public long longLength() {
tikhomirov@51: 			return size;
tikhomirov@51: 		}
tikhomirov@51: 		
tikhomirov@51: 		@Override
tikhomirov@420: 		public void longSeek(long offset) {
tikhomirov@26: 			assert offset >= 0;
tikhomirov@26: 			// offset may not necessarily be further than current position in the file (e.g. rewind) 
tikhomirov@26: 			if (buffer != null && /*offset is within buffer*/ offset >= position && (offset - position) < buffer.limit()) {
tikhomirov@420: 				// cast is ok according to check above
tikhomirov@420: 				buffer.position(Internals.ltoi(offset - position));
tikhomirov@26: 			} else {
tikhomirov@26: 				position = offset;
tikhomirov@26: 				buffer = null;
tikhomirov@26: 			}
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@10: 		@Override
tikhomirov@420: 		public void seek(int offset) {
tikhomirov@420: 			longSeek(offset);
tikhomirov@420: 		}
tikhomirov@420: 
tikhomirov@420: 		@Override
tikhomirov@10: 		public void skip(int bytes) throws IOException {
tikhomirov@26: 			assert bytes >= 0;
tikhomirov@26: 			if (buffer == null) {
tikhomirov@26: 				position += bytes;
tikhomirov@26: 				return;
tikhomirov@26: 			}
tikhomirov@26: 			if (buffer.remaining() > bytes) {
tikhomirov@26: 				buffer.position(buffer.position() + bytes);
tikhomirov@26: 			} else {
tikhomirov@26: 				position += buffer.position() + bytes;
tikhomirov@26: 				buffer = null;
tikhomirov@26: 			}
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@26: 		private void fill() throws IOException {
tikhomirov@26: 			if (buffer != null) {
tikhomirov@26: 				position += buffer.position(); 
tikhomirov@26: 			}
tikhomirov@10: 			long left = size - position;
tikhomirov@440: 			for (int i = 0; i < 3; i++) {
tikhomirov@440: 				try {
tikhomirov@440: 					buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < memBufferSize ? left : memBufferSize);
tikhomirov@440: 					return;
tikhomirov@440: 				} catch (IOException ex) {
tikhomirov@440: 					if (i == 2) {
tikhomirov@440: 						throw ex;
tikhomirov@440: 					}
tikhomirov@440: 					if (i == 0) {
tikhomirov@440: 						// if first attempt failed, try to free some virtual memory, see Issue 30 for details
tikhomirov@456: 						logFacility.dump(getClass(), Warn, ex, "Memory-map failed, gonna try gc() to free virtual memory");
tikhomirov@440: 					}
tikhomirov@440: 					try {
tikhomirov@440: 						buffer = null;
tikhomirov@440: 						System.gc();
tikhomirov@440: 						Thread.sleep((1+i) * 1000);
tikhomirov@440: 					} catch (Throwable t) {
tikhomirov@456: 						logFacility.dump(getClass(), Error, t, "Bad luck");
tikhomirov@440: 					}
tikhomirov@440: 				}
tikhomirov@440: 			}
tikhomirov@26: 		}
tikhomirov@26: 
tikhomirov@26: 		@Override
tikhomirov@26: 		public void readBytes(byte[] buf, int offset, int length) throws IOException {
tikhomirov@26: 			if (buffer == null || !buffer.hasRemaining()) {
tikhomirov@26: 				fill();
tikhomirov@26: 			}
tikhomirov@26: 			// XXX in fact, we may try to create a MappedByteBuffer of exactly length size here, and read right away
tikhomirov@26: 			while (length > 0) {
tikhomirov@26: 				int tail = buffer.remaining();
tikhomirov@26: 				if (tail == 0) {
tikhomirov@26: 					throw new IOException();
tikhomirov@26: 				}
tikhomirov@26: 				if (tail >= length) {
tikhomirov@26: 					buffer.get(buf, offset, length);
tikhomirov@26: 				} else {
tikhomirov@26: 					buffer.get(buf, offset, tail);
tikhomirov@26: 					fill();
tikhomirov@26: 				}
tikhomirov@26: 				offset += tail;
tikhomirov@26: 				length -= tail;
tikhomirov@26: 			}
tikhomirov@26: 		}
tikhomirov@26: 
tikhomirov@26: 		@Override
tikhomirov@26: 		public byte readByte() throws IOException {
tikhomirov@26: 			if (buffer == null || !buffer.hasRemaining()) {
tikhomirov@26: 				fill();
tikhomirov@26: 			}
tikhomirov@26: 			if (buffer.hasRemaining()) {
tikhomirov@26: 				return buffer.get();
tikhomirov@26: 			}
tikhomirov@26: 			throw new IOException();
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@10: 		@Override
tikhomirov@10: 		public void done() {
tikhomirov@26: 			buffer = null;
tikhomirov@619: 			if (fileStream != null) {
tikhomirov@654: 				new FileUtils(logFacility, this).closeQuietly(fileStream);
tikhomirov@619: 				fileStream = null;
tikhomirov@619: 				fileChannel = null; // channel is closed together with stream
tikhomirov@10: 			}
tikhomirov@10: 		}
tikhomirov@10: 	}
tikhomirov@10: 
tikhomirov@10: 	// (almost) regular file access - FileChannel and buffers.
tikhomirov@10: 	private static class FileAccess extends DataAccess {
tikhomirov@619: 		private FileInputStream fileStream;
tikhomirov@10: 		private FileChannel fileChannel;
tikhomirov@10: 		private ByteBuffer buffer;
tikhomirov@420: 		private long bufferStartInFile = 0; // offset of this.buffer in the file.
tikhomirov@425: 		private final long size;
tikhomirov@425: 		private final LogFacility logFacility;
tikhomirov@10: 
tikhomirov@619: 		public FileAccess(FileInputStream fis, long channelSize, int bufferSizeHint, boolean useDirect, LogFacility log) {
tikhomirov@619: 			fileStream = fis;
tikhomirov@619: 			fileChannel = fis.getChannel();
tikhomirov@10: 			size = channelSize;
tikhomirov@425: 			logFacility = log;
tikhomirov@420: 			final int capacity = size < bufferSizeHint ? (int) size : bufferSizeHint;
tikhomirov@10: 			buffer = useDirect ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
tikhomirov@10: 			buffer.flip(); // or .limit(0) to indicate it's empty
tikhomirov@10: 		}
tikhomirov@10: 		
tikhomirov@10: 		@Override
tikhomirov@10: 		public boolean isEmpty() {
tikhomirov@10: 			return bufferStartInFile + buffer.position() >= size;
tikhomirov@10: 		}
tikhomirov@10: 		
tikhomirov@10: 		@Override
tikhomirov@420: 		public DataAccess reset() throws IOException {
tikhomirov@420: 			longSeek(0);
tikhomirov@420: 			return this;
tikhomirov@420: 		}
tikhomirov@420: 
tikhomirov@420: 		@Override
tikhomirov@158: 		public int length() {
tikhomirov@420: 			return Internals.ltoi(longLength());
tikhomirov@51: 		}
tikhomirov@51: 		
tikhomirov@51: 		@Override
tikhomirov@420: 		public long longLength() {
tikhomirov@420: 			return size;
tikhomirov@51: 		}
tikhomirov@420: 
tikhomirov@51: 		@Override
tikhomirov@420: 		public void longSeek(long offset) throws IOException {
tikhomirov@23: 			if (offset > size) {
tikhomirov@404: 				throw new IllegalArgumentException(String.format("Can't seek to %d for the file of size %d (buffer start:%d)", offset, size, bufferStartInFile));
tikhomirov@23: 			}
tikhomirov@10: 			if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) {
tikhomirov@420: 				// cast to int is safe, we've checked we fit into buffer
tikhomirov@420: 				buffer.position(Internals.ltoi(offset - bufferStartInFile));
tikhomirov@10: 			} else {
tikhomirov@10: 				// out of current buffer, invalidate it (force re-read) 
tikhomirov@10: 				// XXX or ever re-read it right away?
tikhomirov@10: 				bufferStartInFile = offset;
tikhomirov@10: 				buffer.clear();
tikhomirov@10: 				buffer.limit(0); // or .flip() to indicate we switch to reading
tikhomirov@10: 				fileChannel.position(offset);
tikhomirov@10: 			}
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@10: 		@Override
tikhomirov@420: 		public void seek(int offset) throws IOException {
tikhomirov@420: 			longSeek(offset);
tikhomirov@420: 		}
tikhomirov@420: 
tikhomirov@420: 		@Override
tikhomirov@10: 		public void skip(int bytes) throws IOException {
tikhomirov@10: 			final int newPos = buffer.position() + bytes;
tikhomirov@10: 			if (newPos >= 0 && newPos < buffer.limit()) {
tikhomirov@10: 				// no need to move file pointer, just rewind/seek buffer 
tikhomirov@10: 				buffer.position(newPos);
tikhomirov@10: 			} else {
tikhomirov@10: 				//
tikhomirov@420: 				longSeek(bufferStartInFile + newPos);
tikhomirov@10: 			}
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@10: 		private boolean fill() throws IOException {
tikhomirov@10: 			if (!buffer.hasRemaining()) {
tikhomirov@10: 				bufferStartInFile += buffer.limit();
tikhomirov@10: 				buffer.clear();
tikhomirov@10: 				if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 
tikhomirov@10: 					fileChannel.read(buffer);
tikhomirov@10: 					// may return -1 when EOF, but empty will reflect this, hence no explicit support here   
tikhomirov@10: 				}
tikhomirov@10: 				buffer.flip();
tikhomirov@10: 			}
tikhomirov@10: 			return buffer.hasRemaining();
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@10: 		@Override
tikhomirov@10: 		public void readBytes(byte[] buf, int offset, int length) throws IOException {
tikhomirov@26: 			if (!buffer.hasRemaining()) {
tikhomirov@26: 				fill();
tikhomirov@26: 			}
tikhomirov@26: 			while (length > 0) {
tikhomirov@26: 				int tail = buffer.remaining();
tikhomirov@26: 				if (tail == 0) {
tikhomirov@26: 					throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past isEmpty() == true are made.
tikhomirov@26: 				}
tikhomirov@26: 				if (tail >= length) {
tikhomirov@26: 					buffer.get(buf, offset, length);
tikhomirov@10: 				} else {
tikhomirov@26: 					buffer.get(buf, offset, tail);
tikhomirov@26: 					fill();
tikhomirov@10: 				}
tikhomirov@26: 				offset += tail;
tikhomirov@26: 				length -= tail;
tikhomirov@10: 			}
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@10: 		@Override
tikhomirov@10: 		public byte readByte() throws IOException {
tikhomirov@10: 			if (buffer.hasRemaining()) {
tikhomirov@10: 				return buffer.get();
tikhomirov@10: 			}
tikhomirov@10: 			if (fill()) {
tikhomirov@10: 				return buffer.get();
tikhomirov@10: 			}
tikhomirov@10: 			throw new IOException();
tikhomirov@10: 		}
tikhomirov@10: 
tikhomirov@10: 		@Override
tikhomirov@10: 		public void done() {
tikhomirov@619: 			buffer = null;
tikhomirov@619: 			if (fileStream != null) {
tikhomirov@654: 				new FileUtils(logFacility, this).closeQuietly(fileStream);
tikhomirov@619: 				fileStream = null;
tikhomirov@10: 				fileChannel = null;
tikhomirov@10: 			}
tikhomirov@10: 		}
tikhomirov@10: 	}
tikhomirov@618: 	
tikhomirov@618: 	/**
tikhomirov@618: 	 * Appends serialized changes to the end of the file
tikhomirov@618: 	 */
tikhomirov@618: 	private static class TransactionAwareFileSerializer extends DataSerializer {
tikhomirov@618: 		
tikhomirov@618: 		private final Transaction transaction;
tikhomirov@618: 		private final File file;
tikhomirov@618: 		private FileOutputStream fos;
tikhomirov@618: 		private File transactionFile;
tikhomirov@618: 		private boolean writeFailed = false;
tikhomirov@534: 
tikhomirov@618: 		public TransactionAwareFileSerializer(Transaction tr, File f) {
tikhomirov@618: 			transaction = tr;
tikhomirov@618: 			file = f;
tikhomirov@534: 		}
tikhomirov@534: 		
tikhomirov@534: 		@Override
tikhomirov@618: 		public void write(byte[] data, int offset, int length) throws HgIOException {
tikhomirov@618: 			try {
tikhomirov@618: 				if (fos == null) {
tikhomirov@618: 					transactionFile = transaction.prepare(file);
tikhomirov@618: 					fos = new FileOutputStream(transactionFile, true);
tikhomirov@618: 				}
tikhomirov@618: 				fos.write(data, offset, length);
tikhomirov@618: 				fos.flush();
tikhomirov@618: 			} catch (IOException ex) {
tikhomirov@618: 				writeFailed = true;
tikhomirov@618: 				transaction.failure(transactionFile, ex);
tikhomirov@618: 				throw new HgIOException("Write failure", ex, transactionFile);
tikhomirov@534: 			}
tikhomirov@534: 		}
tikhomirov@534: 		
tikhomirov@534: 		@Override
tikhomirov@618: 		public void done() throws HgIOException {
tikhomirov@618: 			if (fos != null) {
tikhomirov@618: 				assert transactionFile != null;
tikhomirov@618: 				try {
tikhomirov@618: 					fos.close();
tikhomirov@618: 					if (!writeFailed) {
tikhomirov@618: 						// XXX, Transaction#done() assumes there's no error , but perhaps it's easier to 
tikhomirov@618: 						// rely on #failure(), and call #done() always (or change #done() to #success()
tikhomirov@618: 						transaction.done(transactionFile);
tikhomirov@618: 					}
tikhomirov@618: 					fos = null;
tikhomirov@618: 				} catch (IOException ex) {
tikhomirov@618: 					if (!writeFailed) {
tikhomirov@618: 						// do not eclipse original exception
tikhomirov@618: 						transaction.failure(transactionFile, ex);
tikhomirov@618: 					}
tikhomirov@618: 					throw new HgIOException("Write failure", ex, transactionFile);
tikhomirov@618: 				}
tikhomirov@534: 			}
tikhomirov@534: 		}
tikhomirov@534: 	}
tikhomirov@10: }