tikhomirov@10: /* tikhomirov@584: * Copyright (c) 2010-2013 TMate Software Ltd tikhomirov@74: * tikhomirov@74: * This program is free software; you can redistribute it and/or modify tikhomirov@74: * it under the terms of the GNU General Public License as published by tikhomirov@74: * the Free Software Foundation; version 2 of the License. tikhomirov@74: * tikhomirov@74: * This program is distributed in the hope that it will be useful, tikhomirov@74: * but WITHOUT ANY WARRANTY; without even the implied warranty of tikhomirov@74: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the tikhomirov@74: * GNU General Public License for more details. tikhomirov@74: * tikhomirov@74: * For information on how to redistribute this software under tikhomirov@74: * the terms of a license other than GNU General Public License tikhomirov@102: * contact TMate Software at support@hg4j.com tikhomirov@10: */ tikhomirov@74: package org.tmatesoft.hg.internal; tikhomirov@10: tikhomirov@456: import static org.tmatesoft.hg.util.LogFacility.Severity.Error; tikhomirov@456: import static org.tmatesoft.hg.util.LogFacility.Severity.Warn; tikhomirov@456: tikhomirov@10: import java.io.File; tikhomirov@10: import java.io.FileInputStream; tikhomirov@534: import java.io.FileOutputStream; tikhomirov@10: import java.io.IOException; tikhomirov@10: import java.nio.ByteBuffer; tikhomirov@10: import java.nio.MappedByteBuffer; tikhomirov@10: import java.nio.channels.FileChannel; tikhomirov@10: tikhomirov@617: import org.tmatesoft.hg.core.HgIOException; tikhomirov@295: import org.tmatesoft.hg.core.SessionContext; tikhomirov@425: import org.tmatesoft.hg.util.LogFacility; tikhomirov@158: tikhomirov@10: /** tikhomirov@74: * tikhomirov@74: * @author Artem Tikhomirov tikhomirov@74: * @author TMate Software Ltd. tikhomirov@10: */ tikhomirov@10: public class DataAccessProvider { tikhomirov@338: /** tikhomirov@338: * Boundary to start using file memory mapping instead of regular file access, in bytes. tikhomirov@338: * Set to 0 to indicate mapping files into memory shall not be used. tikhomirov@338: * If set to -1, file of any size would be mapped in memory. tikhomirov@338: */ tikhomirov@338: public static final String CFG_PROPERTY_MAPIO_LIMIT = "hg4j.dap.mapio_limit"; tikhomirov@338: public static final String CFG_PROPERTY_MAPIO_BUFFER_SIZE = "hg4j.dap.mapio_buffer"; tikhomirov@338: public static final String CFG_PROPERTY_FILE_BUFFER_SIZE = "hg4j.dap.file_buffer"; tikhomirov@425: tikhomirov@425: private static final int DEFAULT_MAPIO_LIMIT = 100 * 1024; // 100 kB tikhomirov@425: private static final int DEFAULT_FILE_BUFFER = 8 * 1024; // 8 kB tikhomirov@425: private static final int DEFAULT_MAPIO_BUFFER = DEFAULT_MAPIO_LIMIT; // same as default boundary tikhomirov@10: tikhomirov@10: private final int mapioMagicBoundary; tikhomirov@456: private final int bufferSize, mapioBufSize; tikhomirov@295: private final SessionContext context; tikhomirov@584: tikhomirov@295: public DataAccessProvider(SessionContext ctx) { tikhomirov@456: context = ctx; tikhomirov@456: PropertyMarshal pm = new PropertyMarshal(ctx); tikhomirov@494: mapioMagicBoundary = mapioBoundaryValue(pm.getInt(CFG_PROPERTY_MAPIO_LIMIT, DEFAULT_MAPIO_LIMIT)); tikhomirov@456: bufferSize = pm.getInt(CFG_PROPERTY_FILE_BUFFER_SIZE, DEFAULT_FILE_BUFFER); tikhomirov@456: mapioBufSize = pm.getInt(CFG_PROPERTY_MAPIO_BUFFER_SIZE, DEFAULT_MAPIO_BUFFER); tikhomirov@338: } tikhomirov@338: tikhomirov@456: public DataAccessProvider(SessionContext ctx, int mapioBoundary, int regularBufferSize, int mapioBufferSize) { tikhomirov@295: context = ctx; tikhomirov@494: mapioMagicBoundary = mapioBoundaryValue(mapioBoundary); tikhomirov@27: bufferSize = regularBufferSize; tikhomirov@456: mapioBufSize = mapioBufferSize; tikhomirov@584: } tikhomirov@584: tikhomirov@494: // ensure contract of CFG_PROPERTY_MAPIO_LIMIT, for mapioBoundary == 0 use MAX_VALUE so that no file is memmap-ed tikhomirov@494: private static int mapioBoundaryValue(int mapioBoundary) { tikhomirov@494: return mapioBoundary == 0 ? Integer.MAX_VALUE : mapioBoundary; tikhomirov@494: } tikhomirov@494: tikhomirov@606: public DataAccess createReader(File f, boolean shortRead) { tikhomirov@10: if (!f.exists()) { tikhomirov@10: return new DataAccess(); tikhomirov@10: } tikhomirov@10: try { tikhomirov@617: FileChannel fc = new FileInputStream(f).getChannel(); // FIXME SHALL CLOSE FIS, not only channel tikhomirov@420: long flen = fc.size(); tikhomirov@606: if (!shortRead && flen > mapioMagicBoundary) { tikhomirov@26: // TESTS: bufLen of 1024 was used to test MemMapFileAccess tikhomirov@425: return new MemoryMapFileAccess(fc, flen, mapioBufSize, context.getLog()); tikhomirov@10: } else { tikhomirov@10: // XXX once implementation is more or less stable, tikhomirov@10: // may want to try ByteBuffer.allocateDirect() to see tikhomirov@10: // if there's any performance gain. tikhomirov@338: boolean useDirectBuffer = false; // XXX might be another config option tikhomirov@158: // TESTS: bufferSize of 100 was used to check buffer underflow states when readBytes reads chunks bigger than bufSize tikhomirov@425: return new FileAccess(fc, flen, bufferSize, useDirectBuffer, context.getLog()); tikhomirov@10: } tikhomirov@10: } catch (IOException ex) { tikhomirov@10: // unlikely to happen, we've made sure file exists. tikhomirov@456: context.getLog().dump(getClass(), Error, ex, null); tikhomirov@10: } tikhomirov@10: return new DataAccess(); // non-null, empty. tikhomirov@10: } tikhomirov@534: tikhomirov@618: public DataSerializer createWriter(final Transaction tr, File f, boolean createNewIfDoesntExist) { tikhomirov@534: if (!f.exists() && !createNewIfDoesntExist) { tikhomirov@534: return new DataSerializer(); tikhomirov@534: } tikhomirov@618: // TODO invert RevlogStreamWriter to send DataSource here instead of grabbing DataSerializer tikhomirov@618: // to control the moment transaction gets into play and whether it fails or not tikhomirov@618: return new TransactionAwareFileSerializer(tr, f); tikhomirov@534: } tikhomirov@10: tikhomirov@10: private static class MemoryMapFileAccess extends DataAccess { tikhomirov@10: private FileChannel fileChannel; tikhomirov@425: private long position = 0; // always points to buffer's absolute position in the file tikhomirov@425: private MappedByteBuffer buffer; tikhomirov@420: private final long size; tikhomirov@26: private final int memBufferSize; tikhomirov@425: private final LogFacility logFacility; tikhomirov@10: tikhomirov@425: public MemoryMapFileAccess(FileChannel fc, long channelSize, int bufferSize, LogFacility log) { tikhomirov@10: fileChannel = fc; tikhomirov@10: size = channelSize; tikhomirov@425: logFacility = log; tikhomirov@420: memBufferSize = bufferSize > channelSize ? (int) channelSize : bufferSize; // no reason to waste memory more than there's data tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@26: public boolean isEmpty() { tikhomirov@26: return position + (buffer == null ? 0 : buffer.position()) >= size; tikhomirov@26: } tikhomirov@26: tikhomirov@26: @Override tikhomirov@420: public DataAccess reset() throws IOException { tikhomirov@420: longSeek(0); tikhomirov@420: return this; tikhomirov@420: } tikhomirov@420: tikhomirov@420: @Override tikhomirov@158: public int length() { tikhomirov@420: return Internals.ltoi(longLength()); tikhomirov@420: } tikhomirov@420: tikhomirov@420: @Override tikhomirov@420: public long longLength() { tikhomirov@51: return size; tikhomirov@51: } tikhomirov@51: tikhomirov@51: @Override tikhomirov@420: public void longSeek(long offset) { tikhomirov@26: assert offset >= 0; tikhomirov@26: // offset may not necessarily be further than current position in the file (e.g. rewind) tikhomirov@26: if (buffer != null && /*offset is within buffer*/ offset >= position && (offset - position) < buffer.limit()) { tikhomirov@420: // cast is ok according to check above tikhomirov@420: buffer.position(Internals.ltoi(offset - position)); tikhomirov@26: } else { tikhomirov@26: position = offset; tikhomirov@26: buffer = null; tikhomirov@26: } tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@420: public void seek(int offset) { tikhomirov@420: longSeek(offset); tikhomirov@420: } tikhomirov@420: tikhomirov@420: @Override tikhomirov@10: public void skip(int bytes) throws IOException { tikhomirov@26: assert bytes >= 0; tikhomirov@26: if (buffer == null) { tikhomirov@26: position += bytes; tikhomirov@26: return; tikhomirov@26: } tikhomirov@26: if (buffer.remaining() > bytes) { tikhomirov@26: buffer.position(buffer.position() + bytes); tikhomirov@26: } else { tikhomirov@26: position += buffer.position() + bytes; tikhomirov@26: buffer = null; tikhomirov@26: } tikhomirov@10: } tikhomirov@10: tikhomirov@26: private void fill() throws IOException { tikhomirov@26: if (buffer != null) { tikhomirov@26: position += buffer.position(); tikhomirov@26: } tikhomirov@10: long left = size - position; tikhomirov@440: for (int i = 0; i < 3; i++) { tikhomirov@440: try { tikhomirov@440: buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, position, left < memBufferSize ? left : memBufferSize); tikhomirov@440: return; tikhomirov@440: } catch (IOException ex) { tikhomirov@440: if (i == 2) { tikhomirov@440: throw ex; tikhomirov@440: } tikhomirov@440: if (i == 0) { tikhomirov@440: // if first attempt failed, try to free some virtual memory, see Issue 30 for details tikhomirov@456: logFacility.dump(getClass(), Warn, ex, "Memory-map failed, gonna try gc() to free virtual memory"); tikhomirov@440: } tikhomirov@440: try { tikhomirov@440: buffer = null; tikhomirov@440: System.gc(); tikhomirov@440: Thread.sleep((1+i) * 1000); tikhomirov@440: } catch (Throwable t) { tikhomirov@456: logFacility.dump(getClass(), Error, t, "Bad luck"); tikhomirov@440: } tikhomirov@440: } tikhomirov@440: } tikhomirov@26: } tikhomirov@26: tikhomirov@26: @Override tikhomirov@26: public void readBytes(byte[] buf, int offset, int length) throws IOException { tikhomirov@26: if (buffer == null || !buffer.hasRemaining()) { tikhomirov@26: fill(); tikhomirov@26: } tikhomirov@26: // XXX in fact, we may try to create a MappedByteBuffer of exactly length size here, and read right away tikhomirov@26: while (length > 0) { tikhomirov@26: int tail = buffer.remaining(); tikhomirov@26: if (tail == 0) { tikhomirov@26: throw new IOException(); tikhomirov@26: } tikhomirov@26: if (tail >= length) { tikhomirov@26: buffer.get(buf, offset, length); tikhomirov@26: } else { tikhomirov@26: buffer.get(buf, offset, tail); tikhomirov@26: fill(); tikhomirov@26: } tikhomirov@26: offset += tail; tikhomirov@26: length -= tail; tikhomirov@26: } tikhomirov@26: } tikhomirov@26: tikhomirov@26: @Override tikhomirov@26: public byte readByte() throws IOException { tikhomirov@26: if (buffer == null || !buffer.hasRemaining()) { tikhomirov@26: fill(); tikhomirov@26: } tikhomirov@26: if (buffer.hasRemaining()) { tikhomirov@26: return buffer.get(); tikhomirov@26: } tikhomirov@26: throw new IOException(); tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@10: public void done() { tikhomirov@26: buffer = null; tikhomirov@10: if (fileChannel != null) { tikhomirov@10: try { tikhomirov@10: fileChannel.close(); tikhomirov@10: } catch (IOException ex) { tikhomirov@456: logFacility.dump(getClass(), Warn, ex, null); tikhomirov@10: } tikhomirov@10: fileChannel = null; tikhomirov@10: } tikhomirov@10: } tikhomirov@10: } tikhomirov@10: tikhomirov@10: // (almost) regular file access - FileChannel and buffers. tikhomirov@10: private static class FileAccess extends DataAccess { tikhomirov@10: private FileChannel fileChannel; tikhomirov@10: private ByteBuffer buffer; tikhomirov@420: private long bufferStartInFile = 0; // offset of this.buffer in the file. tikhomirov@425: private final long size; tikhomirov@425: private final LogFacility logFacility; tikhomirov@10: tikhomirov@425: public FileAccess(FileChannel fc, long channelSize, int bufferSizeHint, boolean useDirect, LogFacility log) { tikhomirov@10: fileChannel = fc; tikhomirov@10: size = channelSize; tikhomirov@425: logFacility = log; tikhomirov@420: final int capacity = size < bufferSizeHint ? (int) size : bufferSizeHint; tikhomirov@10: buffer = useDirect ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity); tikhomirov@10: buffer.flip(); // or .limit(0) to indicate it's empty tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@10: public boolean isEmpty() { tikhomirov@10: return bufferStartInFile + buffer.position() >= size; tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@420: public DataAccess reset() throws IOException { tikhomirov@420: longSeek(0); tikhomirov@420: return this; tikhomirov@420: } tikhomirov@420: tikhomirov@420: @Override tikhomirov@158: public int length() { tikhomirov@420: return Internals.ltoi(longLength()); tikhomirov@51: } tikhomirov@51: tikhomirov@51: @Override tikhomirov@420: public long longLength() { tikhomirov@420: return size; tikhomirov@51: } tikhomirov@420: tikhomirov@51: @Override tikhomirov@420: public void longSeek(long offset) throws IOException { tikhomirov@23: if (offset > size) { tikhomirov@404: throw new IllegalArgumentException(String.format("Can't seek to %d for the file of size %d (buffer start:%d)", offset, size, bufferStartInFile)); tikhomirov@23: } tikhomirov@10: if (offset < bufferStartInFile + buffer.limit() && offset >= bufferStartInFile) { tikhomirov@420: // cast to int is safe, we've checked we fit into buffer tikhomirov@420: buffer.position(Internals.ltoi(offset - bufferStartInFile)); tikhomirov@10: } else { tikhomirov@10: // out of current buffer, invalidate it (force re-read) tikhomirov@10: // XXX or ever re-read it right away? tikhomirov@10: bufferStartInFile = offset; tikhomirov@10: buffer.clear(); tikhomirov@10: buffer.limit(0); // or .flip() to indicate we switch to reading tikhomirov@10: fileChannel.position(offset); tikhomirov@10: } tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@420: public void seek(int offset) throws IOException { tikhomirov@420: longSeek(offset); tikhomirov@420: } tikhomirov@420: tikhomirov@420: @Override tikhomirov@10: public void skip(int bytes) throws IOException { tikhomirov@10: final int newPos = buffer.position() + bytes; tikhomirov@10: if (newPos >= 0 && newPos < buffer.limit()) { tikhomirov@10: // no need to move file pointer, just rewind/seek buffer tikhomirov@10: buffer.position(newPos); tikhomirov@10: } else { tikhomirov@10: // tikhomirov@420: longSeek(bufferStartInFile + newPos); tikhomirov@10: } tikhomirov@10: } tikhomirov@10: tikhomirov@10: private boolean fill() throws IOException { tikhomirov@10: if (!buffer.hasRemaining()) { tikhomirov@10: bufferStartInFile += buffer.limit(); tikhomirov@10: buffer.clear(); tikhomirov@10: if (bufferStartInFile < size) { // just in case there'd be any exception on EOF, not -1 tikhomirov@10: fileChannel.read(buffer); tikhomirov@10: // may return -1 when EOF, but empty will reflect this, hence no explicit support here tikhomirov@10: } tikhomirov@10: buffer.flip(); tikhomirov@10: } tikhomirov@10: return buffer.hasRemaining(); tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@10: public void readBytes(byte[] buf, int offset, int length) throws IOException { tikhomirov@26: if (!buffer.hasRemaining()) { tikhomirov@26: fill(); tikhomirov@26: } tikhomirov@26: while (length > 0) { tikhomirov@26: int tail = buffer.remaining(); tikhomirov@26: if (tail == 0) { tikhomirov@26: throw new IOException(); // shall not happen provided stream contains expected data and no attempts to read past isEmpty() == true are made. tikhomirov@26: } tikhomirov@26: if (tail >= length) { tikhomirov@26: buffer.get(buf, offset, length); tikhomirov@10: } else { tikhomirov@26: buffer.get(buf, offset, tail); tikhomirov@26: fill(); tikhomirov@10: } tikhomirov@26: offset += tail; tikhomirov@26: length -= tail; tikhomirov@10: } tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@10: public byte readByte() throws IOException { tikhomirov@10: if (buffer.hasRemaining()) { tikhomirov@10: return buffer.get(); tikhomirov@10: } tikhomirov@10: if (fill()) { tikhomirov@10: return buffer.get(); tikhomirov@10: } tikhomirov@10: throw new IOException(); tikhomirov@10: } tikhomirov@10: tikhomirov@10: @Override tikhomirov@10: public void done() { tikhomirov@10: if (buffer != null) { tikhomirov@10: buffer = null; tikhomirov@10: } tikhomirov@10: if (fileChannel != null) { tikhomirov@10: try { tikhomirov@10: fileChannel.close(); tikhomirov@10: } catch (IOException ex) { tikhomirov@456: logFacility.dump(getClass(), Warn, ex, null); tikhomirov@10: } tikhomirov@10: fileChannel = null; tikhomirov@10: } tikhomirov@10: } tikhomirov@10: } tikhomirov@618: tikhomirov@618: /** tikhomirov@618: * Appends serialized changes to the end of the file tikhomirov@618: */ tikhomirov@618: private static class TransactionAwareFileSerializer extends DataSerializer { tikhomirov@618: tikhomirov@618: private final Transaction transaction; tikhomirov@618: private final File file; tikhomirov@618: private FileOutputStream fos; tikhomirov@618: private File transactionFile; tikhomirov@618: private boolean writeFailed = false; tikhomirov@534: tikhomirov@618: public TransactionAwareFileSerializer(Transaction tr, File f) { tikhomirov@618: transaction = tr; tikhomirov@618: file = f; tikhomirov@534: } tikhomirov@534: tikhomirov@534: @Override tikhomirov@618: public void write(byte[] data, int offset, int length) throws HgIOException { tikhomirov@618: try { tikhomirov@618: if (fos == null) { tikhomirov@618: transactionFile = transaction.prepare(file); tikhomirov@618: fos = new FileOutputStream(transactionFile, true); tikhomirov@618: } tikhomirov@618: fos.write(data, offset, length); tikhomirov@618: fos.flush(); tikhomirov@618: } catch (IOException ex) { tikhomirov@618: writeFailed = true; tikhomirov@618: transaction.failure(transactionFile, ex); tikhomirov@618: throw new HgIOException("Write failure", ex, transactionFile); tikhomirov@534: } tikhomirov@534: } tikhomirov@534: tikhomirov@534: @Override tikhomirov@618: public void done() throws HgIOException { tikhomirov@618: if (fos != null) { tikhomirov@618: assert transactionFile != null; tikhomirov@618: try { tikhomirov@618: fos.close(); tikhomirov@618: if (!writeFailed) { tikhomirov@618: // XXX, Transaction#done() assumes there's no error , but perhaps it's easier to tikhomirov@618: // rely on #failure(), and call #done() always (or change #done() to #success() tikhomirov@618: transaction.done(transactionFile); tikhomirov@618: } tikhomirov@618: fos = null; tikhomirov@618: } catch (IOException ex) { tikhomirov@618: if (!writeFailed) { tikhomirov@618: // do not eclipse original exception tikhomirov@618: transaction.failure(transactionFile, ex); tikhomirov@618: } tikhomirov@618: throw new HgIOException("Write failure", ex, transactionFile); tikhomirov@618: } tikhomirov@534: } tikhomirov@534: } tikhomirov@534: } tikhomirov@10: }