JEXP                       JEXP

Stromschnellen, oder wo man mit Java 8 Streams aufpassen muss.

Stromschnellen, oder wo man mit Java 8 Streams aufpassen muss.

Wie hier und vielerorts beschrieben sind die bekanntesten Features von Java 8 Lambda’s und die Streams-API.

Diese erlauben eine kompakte und elegante Notation von Operationen auf Datenströmen und -containern. Damit nähert sich Java den funktionalen Ansätzen anderer Sprachen wie C#, Scala, Groovy, Ruby an, die schon seit langem erlauben, Operationen in einer kompakten Syntax darzustellen.

Lambda’s könnten als syntaktische Vereinfachung der Nutzung von anonymen inneren Klassen interpretiert werden, besonders solchen die nur eine überschreibbare Methode enthalten (Single Abstract Method = SAM). Diese SAM’s sind Interfaces und Klassen, die maximal eine abstrakte Methode deklarieren, es können aber durchaus noch mehr nicht-abstrakte Methoden existieren. Um diese Art von Interfaces explizit zu dokumentieren wurde mit Java8 auch die Annotation @FunctionalInterface eingeführt.

Hier ein paar Bespiele:

Consumer<T>, Transformer, Reducer, …​

Die Collections API wurde um Methoden und Konstrukte (Streams-API) erweitert, die eine datenstrombasierte Sicht auf die enthaltenen Informationen zulässt und aufeinanderfolgende Operationen durch Verkettung von Methodenaufrufen mit Lambda-Codeblöcken als Parameter ermöglicht.

Diese Operationen umfassen Filterung, Transformation, Aggregation, Gruppierung.

Streams-Beispiel:
Map<Integer, List<Integer>> ungeradeQuadrateIn100Gruppen =
  range(1, 1_000)
   .filter(i -> i % 2 == 1)
   .map(i -> i * i)
   .mapToObj((i) -> i)
   .collect(Collectors.groupingBy((i) -> i % 100));

Operationen die auf einem Stream aufgerufen wurden, werden erst ausgeführt, wenn der Stream abgeschlossen wird, z.B mit .collect(), .toArray() oder .sum(). Die Operationen werden auch soweit wie möglich nach Bedarf (lazy) soweit nötig auf den Daten ausgeführt. Für einige Operationen wie Sortierung usw. ist das natürlich nicht möglich.

Soweit, so gut.

Ein weiteres Feature, das in Vorträgen und Artikeln über die Java8 Neuerungen immer hervorgehoben wurde, ist die leichte Parallelisierung von Operationen. Besonders im Zeitalter von Multi-Core Servern ist das natürlich ein wichtiges Argument, da man mit möglichst wenig Aufwand die vorhandene Hardware-Parallelität ausnutzen möchte.

Einfach ein stream.parallelStream() einfügen und schon wird magischerweise alles viel schneller.

Unter der Haube wird für die Parallelisierung von Operationen das Fork-Join-Framework genutzt, das von Doug Lea schon von einigen Jahren entwickelt wurde und welches bisher als separate Bibliothek verfügbar war. Ich hatte über einen Aspekt davon (ParallelArray) kurz im Artikel über GPars berichtet. Fork-Join ein teile-und-herrsche Ansatz, der die Datenmengen fortlaufender segmentiert, bis sie klein genug sind für eine sequentielle Abarbeitung. Jeder Thread arbeitet eine Warteschlange von Operationen ab und kann bei Leerlauf anderen Threads Arbeit abnehmen (Work-Stealing), indem er Operationen vom Ende ihrer Warteschlange entfernt. Alle Arbeitsthreads dieses Parallelisierungsmechanismus werden in einem Threadpool gemanaged, der über die ganze JVM geteilt wird. Das ist der Common-Fork-Join-Pool.

Wo liegt jetzt eigentlich das Problem?

Es ist nicht nur ein singuläres Problem sondern mehrere, die sich auch noch gegenseitig ungünstig beeinflussen.

Zum einen gibt einige Spezifika des Fork-Join Ansatzes die beachtet werden müssen damit man den Mechanismus auch tatsächlich ausnutzen kann. Fork-Join ist für klassische Massendatenverarbeitung gemacht, d.h. ein Datensegment sollte sinnvollerweise ca. 10000 Operationen zur Abarbeitung benötigen, um den Overhead des Managements der Parallelisierung zu kompensieren. D.h. eine einfache Operation mit vielen Daten oder bei weniger Daten entsprechend komplexere Operationen. Dass bedeutet aber auch, dass die Datenmengen die verarbeitet werden sollten mehrere Millionen oder Millarden gleichartige Elemente umfassen sollten, damit Fork-Join seine Stärken ausspielen kann.

Beispiel: Berechnung von Primzahlen bis zu einem Max-Wert, O(sqrt(n))

private long countPrimes(int max) {
   return range(1, max).parallel().filter(this::isPrime).count();
}
private boolean isPrime(long n) {
   return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
public void testSum() throws Exception {
    int sum = range(0, TOTAL).sum();
    System.out.println("testSum = " + sum);
}
@org.junit.Test
public void testSumParallel() throws Exception {
    int sum = range(0, TOTAL).parallel().sum();
    System.out.println("testSumParallel = " + sum);
}

@org.junit.Test
public void testSumBatched() throws Exception {
    int sum = range(0, TOTAL / BATCH).map(i -> range(i * BATCH, i * BATCH + BATCH).sum()).sum();
    System.out.println("testSumBatched = " + sum);
}
@org.junit.Test
public void testSumParallelBatched() throws Exception {
    int sum = range(0, TOTAL / BATCH).parallel().map(i -> range(i * BATCH, i * BATCH + BATCH).sum()).sum();
    System.out.println("testSumParallelBatched = " + sum);
}

Test:

  • single thread

  • FJP mit 10 Zahlen pro Operation

  • FJP mit 1000 Zahlen pro Operation

  • FJP mit 100000 Zahlen pro Operation

  • FJP mit 10000000 Zahlen pro Operation

single thread

stream.parallel

Ausser die Numerik fallen mir aber nicht viele Gebiete ein, die solche Datenmengen (Zahlenreihen) direkt im Prozessor oder Speicher generieren würden. Alle anderen Anwendungsfälle müssten diese Daten aus einer Quelle lesen: Dateisystem, Datenbank, Netzwerk usw.

Diese Leseoperationen sind aber blockierend, was uns zum nächsten Problem mit der Implementierung von Fork-Join in Java8 bringt. Fork-Join ist darauf ausgelegt, dass die Operationen self-contained und nebenwirkungsfrei sind, um optimal arbeiten zu können. Blockierende Operationen blockieren auch den Arbeitsthread, und bei genügend blockierenden Operationen sorgt das Work-Stealing dazu, dass bald alle Threads auf Ergebenisse warten und das System keinen Fortschritt mehr macht.

In einer besseren Implementierung gäb es eine extra API für blockierende Operationen, die dann basierend auf Selektoren z.B. über Callbacks ein effizientes Management dieser Blockaden vornehmen könnte. Zumindest aber sollte es einen separaten Pool für blockierende Operationen geben, so dass der Fork-Join-Pool nicht von diesen verstopft wird.

Erschwerend kommt hier dazu, dass nur ein Fork-Join-Pool in der gesamten JVM exisitiert, das ist der Common-Fork-Join-Pool, zugreifbar über ForkJoinPool.commonPool(). Dieser wird nicht nur für die Parallelverarbeitung von Streams sondern auch das parallele Sortieren und asynchrone Aufrufe via CompletionStage benutzt.

Der Grund dafür ist einleuchtend und wird auch bei OSX und iOS (GrandCentralDispatch???) ähnlich gehandhabt. Man will die Systemresourcen optimal ausnutzen indem das Erzeugen und Managen von Threads einer einzigen Stelle im System übertragen wird, die genau weiss, wieviel Resourcen (CPU-Cores) schon belegt oder noch frei sind.

Wenn jeder seine eigenen Threads und Threadpools instanziiert und startet, ist eine Verschwendung und der Wettbewerb um die vorhandenen Ausführungseinheiten schon vorprogrammiert. Ich habe schon ganz schlimme Varianten davon gesehen. Daher ist im Prinzip nichts dagegen einzuwenden.

Nur ist nicht jede Operation im System gleichen Kalibers. Zwischen sehr einfachen Operationen die nur wenige Taktzyklen benötigen bis zu aufwendigen Berechnungen, die im Sekunden- oder sogar Minutenbereich liegen ist alles vertreten. Und dazu kommen noch die blockierenden Operationen von denen man gar nicht weiss, wie lange sie benötigen. (Jedenfalls nicht wenn sie keinen standardmässigen Timeout haben.)

Diese Operationen mit sehr differenzierter Komplexität und Ausführungszeit sollten auch in verschiedenen Pools abgearbeitet werden. Andere Systeme haben Pools verschiedener Prioritäten und einen separaten Pool oder Mechanismus für blockierende Aufrufe.

Und wenn man das nicht hat?

Dann kann ein kleiner Teil der Anwendung durch seine Operationen das komplette Restsystem (JVM) lahm legen. Das ist dann fast wie eine Denial of Service (DOS) Attacke auf das Systems. Selbst wenn man keine Böswilligkeit unterstellt, ist es ein Fettnäpfchen in das man schnell hereintippt,

Leider sind viele Beispiele für die Parallelisierung von Streams genau so aufgebaut. Zum Beispiel hier:

private List<StockInfo> getStockInfo(Stream<String> symbols, int minVolume) {
  return symbols.parallel()
                .map(this::getStockInfo) //slow network operation
                .filter(si -> si.tradeVolume > minVolume)
                .collect(toList());
}

Sie führen für verschiedene Aktionen entfernte Aufrufe (z.B via REST) zu einer Börsen-API aus, um dann die Ergebnisse mittels der Streams API weiterzuverarbeiten. Das geht nur dann gut, wenn 1. das System genügend CPUs besitzt und 2. die entfernten APIs auch schnell genug antworten. Ansonsten kann man damit den Fork-Join-Pool schnell aushungern, so dass andere Module keinen Fortschritt in ihrer Datenverarbeitung mehr machen können.

Besonders in einer komplexeren Anwendung oder in einem App-Server kann das kritisch werden, wenn sowohl die Server-Infrastruktur als auch die Anwendung(en) den gemeinsamen Pool nutzen.

Hier ein Beispiel für den Effekt:

// Fork-Join-Pool mit Parallelität von X

// Start einer Summierung von Fenstern von jeweils einer Million Werten

// starten von X+1 remote Aurufen, die jeweils 30s blockieren bevor sie antworten

// Unsere numerische Berechnung wird für minimal 30 Sekunden blockiert

Man kann zwar seinen eigenen Fork-Join-Pool erzeugen und diesen benutzen, um Aufgaben auszuführen, aber das ist zum einen nicht mehr so elegant und zum anderen öffnet es dem schon genannten Problem der unkontrollierten Erzeugung von Threadpools Tür und Tor.

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

forkJoinPool.submit(() ->
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime)
        .collect(toList())
).get();
Parallelität des ForkJoinPools

Der numerische Parameter des Constructors von ForkJoinPool ist keine maximale Anzahl von Threads, sondern die gewünschte "Parallelität" auf dieser JVM (für Server mit nur einer JVM, implizit "auf diesem Computer"). D.h. wieviele Threads parallel ausgeführt werden, nur bei Blockierung an bestimmten Concurrency-Primitiven (Phaser) werden neue Threads erzeugt, sonst aber nicht!

Die maximale Anzahl von Threads eines ForkJoinPools beträgt 32767, was in den praktisch allen Systemen eine irrelevante Zahl darstellt. Die "Parallelität" ergibt sich aus Runtime.getRuntime().availableProcessors() - 1, dh. auf dual-core Systemen ist sie nur 1 !

Auch sonst ist, nicht darauf zu vertrauen dass dieser Funktionsaufruf den korrekten Wert für "Cores oder Hardware-Threads" liefert, manchmal sind es die maximalen HyperThreads, manchmal die Sockets, manchmal die Cores. Einmal hatte ich sogar auf einem SuperComputer mit CPU-Allocation, dass die Gesamtzahl der Prozessoren zurückgeliefert wurde (4096) anstatt der mir zugeteilten 16 Prozessoren. Zum Glück kann man diesen Wert von aussen mit der System-Property -Djava.util.concurrent.ForkJoinPool.common.parallelism=128 kontrollieren.

Alle Aufgaben die innerhalb eines Pools implizit von .parallel() erzeugt werden, werden im aktuellen Pool gescheduled.

Ein Aspekt, der mir nicht bekannt war, ist das bei der normalen Nutzung von parallel() der aktuelle Thread auch als Worker-Thread benutzt wird. Das ist ein ähnliches Verhalten wie bei der CallerRunsPolicy als RejectedExecutionHandler eines normalen Threadpools (s.u.).

Damit wird er nicht nur blockiert (was ja gewünscht ist), sondern auch mit zum Arbeiten genutzt. Prinzipiell ist das schon ok, nur wenn der aktuelle Thread anders konfiguriert ist, als die Pool-Threads (z.b. Priorität, ThreadLocals, Daemon-Status) bzw. Exceptions während der Ausführung auftreten, dann verhält dieser sich nicht wie die Pool-Threads, was inkonsistentes Verhalten zur Folge haben kann.

Der Common-ForkJoinPool nutzt Daemon Threads, was ggf. nicht gewollt sein kann, man kann die ThreadFactory und den Exception-Handler für den Pool mittels System-Properties konfigurieren.

private ExecutorService createPool(int threads, int queueSize) {
    return new ThreadPoolExecutor(1, threads, KEEP_ALIVE, SECONDS,
             new LinkedBlockingDeque<>(queueSize),
             new ThreadPoolExecutor.CallerRunsPolicy());
}

Ein weiterer Vorteil ist, dass man dem submit() einen Timeout mitgeben kann, dh. wenn die Aktion zulange dauert, wird sie vorzeitig abgebrochen.

Alternatives Beispiel, dass blockierende Aufrufe via Selector und Callback-API (Completion-Stage) abhandelt und nur ihre Ergebnisse in die Fork-Join-Operation einfliessen lassen.

Das funktioniert auch

Blockierende Operationen

Minimale Operator-Size Parallelisierungs-Overhead

Merge of Operations (ala Scala.views)

Last updated 2018-10-21 23:33:58 CEST
Impressum - Twitter - GitHub - StackOverflow - LinkedIn - Medium