JEXP                       JEXP

"Neues" von I/O

Obwohl Java’s NIO (New I/O) Subsystem schon im Jahre 2002 mit Java 1.4 im Rahmen von JSR 51 eingeführt wurde, haben die meisten Entwickler damit nur wenig zu tun, und selbst jetzt sind es zumeist Bibliotheken und hochperformante Server, die diese Infrastruktur-APIs nutzen. Java 7 brachte dann ein Update mit NIO.2 (JSR 203), das sich vor allem auf Dateioperationen und asynchrone Kanäle erstreckte.

Dabei bietet NIO diverse praktische Features, die schnelle, nicht-blockierende Interaktion mit Dateisystem, Netzwerk-Puffern, Speicherzugriff und Interprozesskommunikation unterstützen und gar nicht so schwierig zu nutzen sind, wenn man weiss, wie. Die verschiedenen Klassen und APIs sind auf diverse Packages verstreut und bieten gemeinsam direkten Zugriff auf low-level Betriebssystemfunktionen, die besonders für geschwindigkeitskritische Aufgaben einen Unterschied machen. Damit ist auch ersichtlich, dass die Operation von Java-Anwendungen, die NIO benutzen viel mehr an das jeweilige Betriebssystem gekoppelt sind, als mit der Nutzung der allgemeinen Input/Output Bibliotheken.

In diesem Artikel werde ich die APIs von NIO.2 kurz vorstellen, den klassischen NIO APIs wird ein zukünftiger Artikel gewidmet sein.

Schnelles Lesen von Dateien

Für viele sequentielle Operationen sind die klassischen I/O APIs mehr als schnell genug, so kommt man mit java.io.File, den Input- und OutputStreams und besonders deren gepufferten Varianten (BufferedInputStream) ziemlich weit.

Als ein Beispiel: Das Lesen einer 4 GB grossen Datei in einer Schleife dauert mit BufferedInputStream, nur 7 Sekunden (Beispielcode), also immerhin knapp 550 MB pro Sekunde. Das Aufsummieren im Schleifenkörper soll die Verarbeitung der gelesenen Informationen repräsentieren.

private long measureReadFile(File file) throws IOException {
    byte[] buffer = new byte[MB*16];
    BufferedInputStream is = new BufferedInputStream(new FileInputStream(file), MB);
    int read;
    long count=0, sum = 0;
    long time = System.currentTimeMillis();
    while ((read = is.read(buffer))!=-1) {
        for (int i=0;i<read;i++) {
            count++;
            sum += buffer[i];
        }
    }
    is.close();
    System.out.printf("reading %s time %d ms, size %d MB%n", file, (System.currentTimeMillis() - time), count / MB);
    return sum;
}
// writing target/big.file time 5175 ms, size 4000 MB
// reading target/big.file time 7065 ms, size 4000 MB

Zum Vergleich, das Kommandozeilentool dd (Block-Kopierer) benötigt immer noch 5.8 Sekunden für die gleiche Aufgabe.

dd if=~/trash/test.db/neostore.relationshipstore.db of=/dev/null bs=1048576
4011+1 records in
4011+1 records out
4206698718 bytes transferred in 5.880777 secs (715330441 bytes/sec)

Mit NIO sieht das wie folgt aus:

public long measureReadFileFileChannel(File file) throws Exception {
    ByteBuffer buffer = ByteBuffer.allocate(MB * 16);
    FileChannel channel = new RandomAccessFile(file, "r").getChannel();
    int read;
    long count=0, sum = 0;
    long time = System.currentTimeMillis();
    while ((read = channel.read(buffer))!=-1) {
        buffer.flip();
        for (int i=0;i<read;i++) {
            count++;
            sum += buffer.get();
        }
        buffer.clear();
    }
    channel.close();
    System.out.printf("reading %s time %d ms, size %d MB%n", file, (System.currentTimeMillis() - time), count / MB);
    return sum;
}
// reading target/big.file time 6634 ms, size 4000 MB

Wie immer sollte man also sorgfältig messen, ob es wirklich die IO-Leistung ist, die ein System beeinträchtigt, oder andere Flaschenhälse im Programm viel mehr Leistung kosten.

API Übersicht

Die NIO API gibt es nun schon seit 12 Jahren und unterstützt Funktionen wie:

  • (halb-)automatisches Mapping von Dateien in den Speicher

  • File-Channel API für bequemen Zugriff auf Betriebsysstemfunktionen für Dateihandles und Dateioperationen

  • Puffer-Abstraktion für kontinuierliche Speicherbereiche, die sich sowohl auf dem als auch ausserhalb des Heaps befinden können

  • Selektoren für nicht-blockierende Notifikation für Aktivität auf Kanälen

NIO.2 fügt folgende Funktionen hinzu:

  • neue Path und Filesystem-Abstraktionen

  • erweiterte Datei-Operationen z.b. Metadatenzugriff, Änderungsnotifikationen für Verzeichnisse und mehr

  • asynchrone File-Channel-API und Socket-Channel mit Multicast-Unterstützung (Asynchronous[File,Socket,ServerSocket,Datagram]Channel)

Dateioperationen mit NIO.2

Mit NIO.2 gibt es eine Reihe von neuen APIs für die Interaktion mit dem Dateisystem und Dateien. Wer bisher mit java.io.File auf verschiedenen Betriebsysstemen gearbeitet hat, vermisste sicherlich schmerzlich eine konsistente, umfassende und effiziente Dateisystem-Abstraktion. NIO.2 bringt diese als FileSystem und dazu gehörend Path für Dateien und Verzeichnisse mit. Daneben gibt es Unterstützung für den Zugriff auf eine Vielzahl von Dateiattributen.

Einfachen Zugriff auf FileSystem und Path erlauben folgende Methoden:

FileSystem fs = FileSystems.getDefault();
Path path = fs.getPath("file.txt");

Path path = Paths.get("file.txt");

Neue Methoden, die man aus File nicht kennt, sind

  • path.startsWith/endsWith(Path)

  • path.normalize() → normalisiert relative Pfade

  • path.relativize(Path other) - konstruiert einen relativen Pfad zwischen diesem und dem Ziel-Pfad

  • path.toRealPath(LinkOption…​) → löst relative Pfade und symbolische Links auf

  • path.resolveSibling(Path other) → erzeugt einen Pfad aus dem aktuellen Parent und dem übergebenen Pfad

  • path.register(WatchService, WatchEvent.Kind) registiert einen Überwachungsmechanismus für Dateisystem-Änderungen (Kind ist z.B. StandardWatchEventKinds.ENTRY_MODIFY)

Dateisystemüberwachung

Wir kennen das von Editoren und IDEs, Auto-Compile-Tools oder Upload-Diensten. Wenn sich der Inhalt eines Verzeichnisses ändert, wird die geänderte Datei in der UI aktualisiert bzw. erneut verarbeitet. Bisher mussten Java-Tools dafür auf native Bibliotheken zugreifen oder selbst einen Poll-Thread laufen lassen. Jetzt ist diese Funktionalität in NIO.2 enthalten.

Wie kann ich selbst über Änderungen am Dateisystem informiert werden? Indem ich einen WatchService auf einem konkreten Pfad registriere, die interessanten Änderungsoperationen angebe und mich dann regelmässig nach Aktivitäten erkundige.

Hier ein Beispiel für das automatische Kopieren von Ressourcen in ein "target"-Verzeichnis.

Path source = Paths.get("src/main/resources");
Path target = Paths.get("target");

WatchService watchService = FileSystems.getDefault().newWatchService();
source.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
for (;;) {
    WatchKey key = watchService.poll(10, TimeUnit.SECONDS);
    if (key == null) break; // timeout
    for (WatchEvent evt : key.pollEvents()) {
        Path file = (Path) evt.context();
        Path sourceFile = source.resolve(file);
        Path targetFile = target.resolve(sourceFile);
        System.out.println("Event " + evt.kind() + " file " + file+" source "+sourceFile+" target "+targetFile);
        Files.createDirectories(targetFile.getParent());
        Files.copy(sourceFile, targetFile);
        key.reset();
    }
}

Traversierung von Dateibäumen

Eine API, deren Funktionalität man bisher immer über File.listFiles und FileNameFilter rekursiv implementieren musste, ist jetzt FileSystemVisitor enthalten, sie kann genutzt werden, um effizient über einen Dateisystembaum zu navigieren. Für Verzeichnisse gibt es zusätzlich Steuerungsmethoden, die über den Rückgabewert den weiteren Verlauf der Traversierung bestimmen. Für den Fehlerfall gibt es spezielle Callbacks die die aufgetretene Exception übergeben bekommen.

Die API ist:

public interface FileVisitor<T> {
	FileVisitResult preVisitDirectory(T dir, BasicFileAttributes attrs);
	FileVisitResult visitFile(T file, BasicFileAttributes attrs)
	FileVisitResult visitFileFailed(T file, IOException exc);
	FileVisitResult postVisitDirectory(T dir, IOException exc);
}

Benutzt wird sie wie folgt, hier benutzen wir den Adapter SimpleFileVisitor zum Kopieren eines Verzeichnisbaumes:

Path target = Paths.get("data"); Path source = Paths.get("backup"); Files.walkFileTree(source, EnumSet.of(FileVisitOption.FOLLOW_LINKS), Integer.MAX_VALUE, new SimpleFileVisitor<Path>() { @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { Path targetdir = target.resolve(source.relativize(dir)); try { Files.copy(dir, targetdir); } catch (FileAlreadyExistsException e) { if (!Files.isDirectory(targetdir)) throw e; } return CONTINUE; } @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.copy(file, target.resolve(source.relativize(file))); return CONTINUE; } });

Asynchrone Operationen

Zusätzlich zu den den Channel-APIs in NIO wurden in NIO.2 asynchrone Varianten zusätzlich bereitgestellt. Diese erlauben für Operationen wie Verbinden, Lesen und Schreiben asynchrone Operationsausführung.

Dabei können deren Methoden entweder java.util.concurrent.Future Instanzen zurückggeben, auf die dann erst bei Bedarf mit ihrer get()-Operation zugegriffen wird. Alternativ nehmen sie einen java.nio.channels.CompletionHandler entgegen, dessen 2 Methoden für den Erfolgs- und Fehlerfall neben dem eigentlichen Parametern (Ergebnis bzw. Exception) auch noch ein weiteres Objekt (Attachment) vom originalen Aufruf durchreichen können.

public interface CompletionHandler<V,A> {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

Asynchrone Dateioperationen

Speziell für Dateioperationen kann der AsynchronousFileChannel für nichtblockierende Operationen genutzt werden. Eine neue Instanz davon kann direkt per statische "open" Factory-Methode erzeugt werden. Dieser Methode können auch diverse Flags zum Steuern der Interaktion mit der Datei mitgegeben werden. Das sind zum Beispiel: StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE wobei die letzte interessant für temporäre Dateien ist.

Die Methoden des AsynchronousFileChannel sind ähnlich derer des FileChannel, nur jeweils in 2 Varianten für den asynchronen Aufruf ausgelegt (Future und CompletionHandler). Ein weiterer Unterschied besteht darin, dass es hier keinen internen Zeiger auf eine aktuelle Position in der Datei gibt. Das ergibt sich aus der nichtdeterministischen, asynchronen Ausführung der Operationen. Daher muss immer eine absolute Postion in der Datei mitgegeben werden. Beim Lesen und Schreiben in/aus einem Puffer, wird als Ergebnis stets die Anzahl der verarbeiteten Bytes geliefert.

Hier das asynchrone Beispiel für das Lesen aus einer Datei:

{
    int bufferSize = MB;
    long fileSize = file.length();

    // count-down-latch for waiting for total completion
    int segments = (int) (fileSize / bufferSize);
    if ((long)segments * bufferSize < fileSize) segments++;
    CountDownLatch latch = new CountDownLatch(segments);

    System.out.printf("segments = %d file size %d buffer size %d%n",
                       segments, fileSize, bufferSize);

    // preparing a pool of buffers
    BlockingQueue<ByteBuffer> buffers = new ArrayBlockingQueue<>(100);
    for (int i=0;i<100;i++) {
      buffers.offer(ByteBuffer.allocate(bufferSize));
    }

    long time = System.currentTimeMillis();
    final AtomicLong sum = new AtomicLong();
    final AtomicLong totalCount = new AtomicLong();
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(file.getAbsolutePath()),
            StandardOpenOption.READ);
    for (long position=0; position < fileSize; position += bufferSize) {
      final ByteBuffer buffer = buffers.poll(5, TimeUnit.SECONDS);
      buffer.clear();
      String attachment = "Position " + position + " Segment " + position / bufferSize;
      channel.read(buffer,position, attachment,new CompletionHandlerFI<Integer, String>() {
        @Override
        public void done(Throwable exc, Integer bytesRead, String attachment) {
          if (exc != null) {
            System.err.println("Error"+ exc);
            return;
          }
          buffer.flip();
          int localSum = 0;
          for (int i=0;i<bytesRead;i++) {
            localSum += buffer.get();
          }
          long totalSum = sum.addAndGet(localSum);
          totalCount.addAndGet(bytesRead);
          System.out.printf("%s bytes read: %d bytes, localSum: %d totalSum (currently): %d "+
                            "latches %d total-count %d%n",
                            attachment, bytesRead, localSum, totalSum,
                            latch.getCount(), totalCount.get());
          buffers.offer(buffer);
          latch.countDown();
        }
    });
    }
    latch.await();
    channel.close();
    System.out.printf("reading %s time %d ms, size %d MB%n total-read %d total-sum %d",
                      file, (System.currentTimeMillis() - time), fileSize / MB,totalCount.get(),sum.get());
    return sum.get();
}

// Ergebnis:
segments = 4000 file size 4194304000 buffer size 1048576
reading target/big.file time 3994 ms, size 4000 MB
 total-read 4194304000 total-sum 6815744000

Asynchrone Netzwerkkommunikation

Sowohl für den Server-Socket als auch für die serverseitige Verarbeitung von Client-Anfragen, sowie für Client-Verbindungen sind asynchrone Kanäle verfügbar.

Nachdem man einen Server-Kanal mittels bind() auf ein Interface + Port gebunden hat, kann man mittels serverChannel.accept() potentielle Client-Verbindungen als Future erhalten. Diese Future blockiert beim Aufruf von get() solange bis wirklich eine Client-Verbindung vorhanden ist (man kann natürlich auch mit Timeouts und Polling arbeiten) und liefert dann direkt den Kanal für die Verbindung zum Client zurück, der asynchron aus Puffern (Buffer) lesen, bzw. in sie schreiben kann.

Hier ein kleines Beispiel für die Client-Server Kommunikation:

Ein Server, der auf Nachrichten lauscht und in Großbuchstaben zurückgibt:

String attachment = "Accepted Connection on " + address;
server.accept(attachment, new CompletionHandlerFI<AsynchronousSocketChannel, String>() {
    @Override
    public void done(Throwable exc, AsynchronousSocketChannel worker, String attachment) throws Exception {
        if (exc != null)
            System.err.println("Exception while listening on " + attachment + " " + exc.getMessage());
        else {
            try {
                ByteBuffer buffer = ByteBuffer.allocate(1000);
                int bytesRead;
                while ((bytesRead = worker.read(buffer).get(10, TimeUnit.SECONDS)) != -1) {
                    String content = new String(buffer.array());
                    System.out.println("Message: " + content + " bytes-read " + bytesRead);
                    ByteBuffer response = ByteBuffer.wrap(content.toUpperCase().getBytes());
                    worker.write(response).get();
                }
            } finally {
                worker.close();
            }
        }
    }
});
Thread.currentThread().join();
server.close();

Der Client würde so aussehen:

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
client.connect(new InetSocketAddress(PORT)).get();

ByteBuffer buffer = ByteBuffer.wrap("ping".getBytes());
Integer bytesWritten = client.write(buffer).get();
System.out.println("Message: " + new String(buffer.array()) + " bytes-written " + bytesWritten);
buffer.flip();
Integer bytesRead = client.read(buffer).get();
System.out.println("Response: " + new String(buffer.array()) + " bytes-read " + bytesRead);

client.close();

Internes Management

Wie werden die asynchronen Kanäle intern in der JVM gehandhabt? Es gibt verschiedene Thread-Pools die jeweils für eine Gruppe von Kanälen zuständig sind. Diese Gruppen kann man selbst erzeugen und dann beim Erstellen der Kanäle selektieren, standardmässig landet alles im globalen Pool. Leider gilt das aber nur für die Netzwerkkanäle und nicht für die FileChannels, da die Trennung von Netzwerk-IO und Dateisystem-IO sonst nicht sicherzustellen ist. Für den AsynchronousFileChannel kann man der open-Methode daher nur einen ExecutorPool mitgeben.

AsynchronousChannelGroup group =
    AsynchronousChannelGroup.withFixedThreadPool(32, Executors.defaultThreadFactory());
AsynchronousServerSocketChannel channel =
	    AsynchronousServerSocketChannel.open(group);

Fazit

NIO ist schnell und nicht schwer zu benutzen. Die NIO.2 APIs bringen einige sehr nützliche Funktionalitäten mit, die nicht zu schwer zu nutzen sind. Bei den Asynchronen Kanälen ist es zwar etwas aufwendiger durch Futures und CompletionHandler, aber man bekommt dadurch eine bessere Skalierbarkeit über viele CPUs.

Last updated 2014-05-30 13:27:42 CEST
Impressum - Twitter - GitHub - StackOverflow - LinkedIn