{
int bufferSize = MB;
long fileSize = file.length();
// count-down-latch for waiting for total completion
int segments = (int) (fileSize / bufferSize);
if ((long)segments * bufferSize < fileSize) segments++;
CountDownLatch latch = new CountDownLatch(segments);
System.out.printf("segments = %d file size %d buffer size %d%n",
segments, fileSize, bufferSize);
// preparing a pool of buffers
BlockingQueue<ByteBuffer> buffers = new ArrayBlockingQueue<>(100);
for (int i=0;i<100;i++) {
buffers.offer(ByteBuffer.allocate(bufferSize));
}
long time = System.currentTimeMillis();
final AtomicLong sum = new AtomicLong();
final AtomicLong totalCount = new AtomicLong();
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(file.getAbsolutePath()),
StandardOpenOption.READ);
for (long position=0; position < fileSize; position += bufferSize) {
final ByteBuffer buffer = buffers.poll(5, TimeUnit.SECONDS);
buffer.clear();
String attachment = "Position " + position + " Segment " + position / bufferSize;
channel.read(buffer,position, attachment,new CompletionHandlerFI<Integer, String>() {
@Override
public void done(Throwable exc, Integer bytesRead, String attachment) {
if (exc != null) {
System.err.println("Error"+ exc);
return;
}
buffer.flip();
int localSum = 0;
for (int i=0;i<bytesRead;i++) {
localSum += buffer.get();
}
long totalSum = sum.addAndGet(localSum);
totalCount.addAndGet(bytesRead);
System.out.printf("%s bytes read: %d bytes, localSum: %d totalSum (currently): %d "+
"latches %d total-count %d%n",
attachment, bytesRead, localSum, totalSum,
latch.getCount(), totalCount.get());
buffers.offer(buffer);
latch.countDown();
}
});
}
latch.await();
channel.close();
System.out.printf("reading %s time %d ms, size %d MB%n total-read %d total-sum %d",
file, (System.currentTimeMillis() - time), fileSize / MB,totalCount.get(),sum.get());
return sum.get();
}
// Ergebnis:
segments = 4000 file size 4194304000 buffer size 1048576
reading target/big.file time 3994 ms, size 4000 MB
total-read 4194304000 total-sum 6815744000