Die meisten Entwickler, die Brian Goetz' "Java Concurrency in Practice" gelesen haben, bekamen erst grosse
Augen, dann ein nachdenkliches Gesicht und legten das Buch mit einem "das ist mir zu hoch beseite". Nun ist
nebenläufiges Programmieren keine Sache vor der man einfach die Augen verschliessen kann und dann geht sie
vorbei. Diese "schönen" Zeiten sind vorüber. Da die von Java angebotenen low-level Mechanismen, um korrekt nebenläufig zu
programmieren zu können - "synchronized", wait(), notify[All](), "volatile" und "final" weder trivial noch gut zu
komplexen Systemen komponierbar sind, haben sich diverse Autoren daran gemacht, das Leben für Entwickler
einfacher zu gestalten.
Da sind zum einen Doug Lea’s Helfer in java.util.concurrent, das schon vielerorts beschriebene
Fork-Join-Framework von Java 7, Akka, die Scala Aktoren Bibliothek, die Ansätze in Clojure aus der letzten
Kolumne und das schon früher beschriebene Disruptor Pattern von LMAX. Neben diesen gibt es noch viele weitere
Ansätze.
Einem davon, dem GPars Framework, wollen wir heute ins Getriebe schauen. GPars ist ein Groovy Framework von
Václav Pech, das teilweise in Johannes Links' Artikel in JavaSpektrum 05/10 aus der Nutzersicht vorgestellt
wurde. GPars stellt eine Vielzahl von Möglichkeiten bereit, in Groovy und Java Programmen mit einfach zu
benutzenden Konzepten, parallele Programmierung anzugehen, ohne sich mit den Basisprimitiven herumzuärgern.
Damit wird auch die Fehlerträchtigkeit der nebenläufigen Programmierung deutlich reduziert. Nur zum kleinen
Teil wird die Mächtigkeit von Groovy benutzt, um z.B. mit Closures und DSLs übersichtlichen Code zu
ermöglichen. Insgesamt stellt GPars Mechanismen bereit, die genauso gut in Java genutzt werden können.
Mit GPars bekommt man ein ganzes Potpourri an Konstrukten bereitgestellt, das man in seiner nebenläufigen
Programmierung nutzen kann:
-
"parallel collections" ermöglichen Operationen auf Datencontainern (halb)automatisch zu parallelisieren
-
Map/Reduce, eine Umsetzung des bekannten Google Mechanismus zur parallelen Verarbeitung grosser Datenmengen
-
asynchrone Funktionen erlauben existierende Funktionen in asynchrone Wrapper umzuwandeln und mittels einer DSL zu verketten
-
Aktoren bieten den von Erlang und Akka genutzen Ansatz leichtgewichtige Nachrichtensender und -empfänger mit lokalen Zustand zu vernetzen
-
Dataflow ist eine workflow-artige Herangehensweise, um Aufgaben mittels Datenflüssen zu verketten und zu synchronisieren
-
Kanban-Flow steuert den Parallelitätsgrad wie im Kanbanprozess aus der Lean-Produktion mit einer festen Anzahl an Datencontainern
Von allen Angeboten, die GPars offeriert, will ich heute nur auf die "Parallel Collections" und "Dataflow" eingehen. Den
Abschluss soll ein kleiner Ausblick auf Dierk Königs interessanten Kanban Flow Ansatz bilden.
Als Beispielprogramm möchte ich eine serielle Groovy Variante des "Game Of Life" nutzen, die ich vor längerem
geschrieben habe. Der Game Of Life Algorithmus ist sehr vielfältig einsetzbar, z.b. für Code-Retreats und
Sprachenvergleich. Ausserdem konnte ich diese Gelegenheit nutzen, um endlich einmal parallele Versionen davon zu
schreiben.
GPars ist ein CodeHaus open-source Projekt das jetzt als Teil von Groovy ausgeliefert wird, somit ist es unkompliziert
sich die Quellen zu besorgen und in die Tiefen der Implementierung abzutauchen. Glücklicherweise ist GPars
(anders als Doug Lea’s Schöpfungen) sehr gut strukturiert, dokumentiert und leicht lesbar. Danke Václav!
Basis-Infrastruktur in GPars
GPars nutzt unter der Haube die existierenden Java Bibliotheken, um die parallele Ausführung von Funktionen zu
steuern. Vor allem die ThreadPools der ExecutorServices und das in JSR-166 entwickelte in Java7 vorhandene
Fork-Join-Framework werden zur Anwendung gebracht. Um diese Funktionalität auch auf Java 6 zur Verfügung zu
stellen, wird Doug Lea’s 166y Bibliothek für Fork-Join zusammen mit 166extras für Parallel-Arrays eingebunden.
Die PGroup
ist ein wichtiges Basiskonzept, sie erlaubt eine logische Gruppe von Aktoren, Flows, Operatoren oder parallelen Collections zu
erzeugen, welche einen gemeinsamen ThreadPool nutzen und über die PGroup gesteuert werden können. Die PGroup
bietet eine Menge Methoden zur Erzeugung der o.a. beteiligten Elemente und Konfiguration des ThreadPools.
Beispielsweise wird hier ein DataFlow-Task erzeugt. (auf den später noch eingegangen wird):
public DataflowVariable task(final Callable callable) {
final DataflowVariable result = new DataflowVariable();
threadPool.execute(new Runnable() {
public void run() {
Dataflow.activeParallelGroup.set(PGroup.this);
try {
result.bind(callable.call());
} catch (Exception e) {
result.bind(e);
} finally {
Dataflow.activeParallelGroup.remove();
}
}
});
return result;
}
Für das Ergebnis des Tasks wird eine Datenflussvariable angelegt an die dann entweder Ergebnis oder Exception
gebunden wird. Dann wird die aktuelle PGroup als aktiv gesetzt, der Task dem ThreadPool übergeben und
nach erfolgreicher Ausführung die PGroup wieder deaktiviert.
Parallel Collections
Zuerst einmal das obligatorische, einfache Beispiel:
import static groovyx.gpars.GParsPool.withPool
// oder
import static groovyx.gpars.GParsExecutorsPool.withPool
withPool { pool ->
(1.10000).toList().findAllParallel { it % 2 == 0}
.collectParallel { it * it }
.groupByParallel { it % 10 }
}
Was passiert hier? withPool
ist eine Methode aus GParsPool
und nutzt eine Groovy Category
, um den innerhalb
des Blocks genutzten Collections die parallelen Methoden (findAllParallel
, collectParallel
, usw.) hinzuzufügen. Es gibt auch die
Möglichkeit die Collection-Klasse manuell anzureichern, das passiert dann mit
ParallelEnhancer.enhanceClass(HashSet)
(oder enhanceInstance()
pro Objekt). Mittels
collection.makeConcurrent()
werden die originalen Methoden wie collect
, sort
usw. gekapselt
und werden dann parallel ausgeführt, so dass nicht einmal Quellcodeänderungen notwendig sind.
Ausserdem fügt withPool
einem internen Stack einen neuen Verweis auf den aktuellen PGroup
(ParallelGroup, s.o.) hinzu, der für die aktuelle Ausführung genutzt wird. Mit optionalen Parametern von withPool
lässt sich dieser ThreadPool noch konfigurieren.
Was passiert nun in diesen parallelen Methoden?
Die beiden Varianten von GParsPool
benutzen sehr verschiedene Mechanismen zur parallelen Ausführung von
Methoden auf Containern.
GParsPool macht es sich einfach und bedient sich der Parallel Arrays von Doug Lea (aus der 166extras
Bibliothek). Diese basieren auf Fork-Join und nutzen einen Fork-Join Pool zur Ausführung. Da es sich hierbei
um eine Java Bibliothek handelt, kann sie auch problemlos in eigenen Projekten direkt eingesetzt werden.
Eine wichtige Aussage im Fork-Join Framework [ForkJoinTask] ist, dass Container so in Fragmente zerlegt werden
sollten, das zwischen 100 und 10000 Basis-Operationen pro Task erfolgen, um optimal ausgeführt zu werden. Kleinere
Fragmente machen den Overhead zu teuer, wohingegen zu grosse Fragmente zu lange dauern um optimal verteilt zu
werden.
Für Parallel Arrays ist anzumerken, dass ihre Erzeugung mit O(n) relativ kostspielig ist, sie aber in der
Ausführung sehr gut skalieren. Die Implementierung von collectParallel()
wie folgt aus:
static <T> Collection<T> collectParallel(Collection collection, Closure<? extends T> cl) {
return GParsPoolUtilHelper.createPAFromCollection(collection, retrievePool()).
withMapping(new ClosureMapper(new CallClosure(cl))).all().asList();
}
Zuerst wird ein Parallel Array aus der Liste erzeugt (wenn es nicht schon eines ist). withMapping()
ist die
collect-Funktion von Parallel Arrays, der dann ein (doppelter) Wrapper für die übergebene Closure mitgegeben
wird. Mittels all().asList()
werden alle Elemente des Mappings zurückgeliefert und in eine Liste gewandelt.
Prinzipiell können Parallel Arrays dedizierte Operatoren übergeben werden. Um optimale Ausführung auch für
primitive Datentypen zu erlauben gibt es diese deklariert für alle möglichen Typ-Kombinationen (für Filter und
Mappings). Ebenso sind die angebotenen Methoden "dupliziert" um der kombinatorischen Explosionsvielfalt
gerecht zu werden. In typischer Doug Lea Manier manifestiert sich das ganze in einer höchst komplexen Klasse
mit ca. 8000 Zeilen Code.
Generell ist ein Operator, wie erwartet, definiert als:
interface Ops.Op<A,R> {
R op(A a);
}
In der Implementierung von [PAGameOfLife] wird deutlich, wie man parallel Arrays in Java einsetzen kann.
Ein paar andere Methoden von ParallelArray sind:
withFilter(predicate)
, withMapping(mapping)
, withBounds(from,to)
, sort()
, allUniqueElements()
,
all()
, reduce(reducer,base)
, apply(procedure)
, max()
Die ursprüngliche GPars-Implementierung für parallele Container bedient sich der Java 5 ThreadPools
und lässt für
die meisten Operationen einfach einen Task pro Container-Element ablaufen. Dort wäre die Bildung entsprechend
grosser Fragmente schon hilfreich, um den Overhead zu minimieren. In GParsExecutorsPoolUtil
werden die
Methoden deklariert, beispielsweise (vereinfacht):
public static def collectParallel(Object collection, Closure cl) {
return processResult(collection.collect(callParallel(cl)))
}
private static Future callParallel(Closure task) {
final ExecutorService pool = GParsExecutorsPool.retrieveCurrentPool()
if (!pool) throw new IllegalStateException("No ExecutorService ...")
return pool.submit(new Callable() {
@Override
Object call() {
return task()
}
})
}
static List<Object> processResult(List<Future<Object>> futures) {
final Collection<Throwable> exceptions = new ArrayList<Throwable>()
final List<Object> result = futures.collect {
try {
return it.get()
} catch (Throwable e) {
exceptions.add(e)
return e
}
}
if (exceptions.empty) return result
throw new AsyncException("Some asynchronous operations failed. ${exceptions}", exceptions)
}
Die Closure cl
mit der Operation für die Liste wird also pro Element der Collection in einem ThreadPool ausgeführt und die
Ergebnisse aus den zurückgegebenen Futures
dann anschliessend aufgesammelt und etwaige Exceptions
herausgezogen.
Auch das ist ebenso einfach direkt in Java umzusetzen. Den Charme von GPars macht die sinnvolle und einfache
API zu Nutzung dieser Features aus, ohne dass man in die Tiefen der Implementierung absteigen muss und sich
mit Pools, Konvertierungen, Wrappern und den low-level APIs herumschlagen muss.
Wo Licht ist, ist natürlich auch Schatten. An meinem "Game of Life" Beispiel [PGoLStatic] wird schnell deutlich, dass die
hohen Kosten der Erzeugung von Parallel Arrays nur für grosse, zu verarbeitende Datenmengen sinnvoll sind.
Daher hatte meine initiale Parallelisierung des Problems eher negative Auswirkungen. Erst mit einem entsprechend
grossen Bord wurde eine Einsparung von ca. 50% auf 4 Kernen sichtbar. Dazu war es aber auch noch notwendig,
die Gemütlichkeit von Groovy wie wir es kennen, zu verlassen. In der Kommunikation mit Václav erfuhr ich,
dass Groovy im dynamischen Modus Methodenaufrufe synchronisiert. Und zwar um Änderungen an den
Metaklassen determistisch zu bestimmten Zeitpunkten anzuwenden (ähnlich der Savepoints des JIT in Hotspot).
Somit wurde mein hübsches Programm mit Typdeklarationen und @CompileStatic
Annotationen "verschandelt", um dem zu
entgehen. Erst danach zeigte sich der gewünschte Leistungszuwachs. Natürlich stellt sich dann die Frage,
ob es nicht sinnvoller wäre, dann gleich ein Java Programm zu schreiben und in den sauren Apfel zu beissen und
ParallelArrays direkt zu verwenden.
DataFlow - Datenflüsse zur Arbeitssynchronisierung
Im zweiten Teil der Kolumne geht es um die Nutzung von Datenfluss-Modellierung via GPars.
Der Datenfluss Ansatz nimmt Anleihen bei bekannten Workflowmechanismen. Wie Mitarbeiter in einer Produktion
erst beginnen können, wenn all ihre Ausgangsstoffe vorhanden sind, so können Workflow-Prozesse erst beginnen
wenn ihre Vorbedingungen erfüllt sind und Daten bereitstehen. Eine ähnliche Herangehensweise ist auch von
Rule-Engines bekannt.
Auch hier erst einmal ein einfaches Beispiel von Dataflows. In 3 getrennten Aufgaben werden x und y Werte
zugewiesen und dann separat addiert. Dabei können die ersten beiden Aufgaben parallel ausgeführt werden und
die finale Ausgabe von z erfolgt erst, nachdem der Wert bereitsteht. Schön ist, dass die
Parallelitäts-Implementierung im Programm nicht prominent ist und die eigentliche Geschäftslogik überschattet.
Die im folgenden genutzte Dataflows
-Klasse ist ein Groovy-Bean das dynamisch Properties als DataFlowVariables
hinzufügt, so dass deren Nutzung noch bequemer wird. Die Lese- und Schreibzugriffe werden dann auf die dahinterliegenden
Datenflussvariablen-Instanzen weitergeleitet.
import groovyx.gpars.dataflow.*
import static groovyx.gpars.dataflow.Dataflow.task
final def flows = new Dataflows()
task {
flows.z = flows.x + flows.y
}
println "Result: ${flows.z}"
Im Datenfluss Ansatz wird die Synchronisierung potentiell paralleler Aufgaben über die Verfügbarkeit von
Variableninhalten vorgenommen. Normale Datenflussvariablen können nur einmal geschrieben werden und stehen
dann sofort allen darauf wartenden Beteiligten zur Verfügung. Wenn alle Inhalte, die für eine Operation
benötigt werden bereitstehen, kann diese Fortschritt machen, entweder bis zum Ende oder bis zur nächsten,
nicht erfüllten Abhängigkeit.
Es sollte darauf geachtet werden, dass jede Datenfluss-Aufgabe klein und überschaubar ist, deterministisch zum
Ende kommt und Variablen erst dann liest, wenn es sie wirklich benötigt.
Durch die Datenfluss Modellierung und den Fokus auf Daten statt Prozesse, können folgende Eigenschaften
erreicht werden:
Livelocks treten nicht auf da die Aufgaben nicht selbst über die Blockierung und Freigabe von Resource
bestimmen können.
Deadlocks können, wie schon bei Neo4j beschrieben einfach als Zyklen im Abhängigkeitsgraph erkannt werden und
sie treten immer wieder an denselben Stellen auf. Wenn z.B. Aufgabe 1 erst beta
liest und dann alpha
setzt,
dagegen Aufgabe 2 umgekehrt erst alpha
liest und dann beta
setzt, dann verklemmen sie sich immer wieder.
Aus den überschaubaren Tasks können komplexere Aufgaben durch Komposition entstehen. Etwas das mit den
Basis-Synchronisationsmechanismen von Java nicht ohne weiteres möglich ist. Brian Goetz sagt in seinem Buch [JCP]:
"Locking is not composable."
Wie funktioniert Dataflow nun in GPars.
Das Kernstück bilden DataflowVariables
, die aus der Basisklasse DataflowExpression
erben und eine Vielzahl
von Methoden anbieten, von denen uns erst einmal nur getVal()
und bind()
interessieren.
Wenn der Wert der Datenflussvariable mittels getVal()
gelesen werden soll und er noch nicht gebunden wurde,
wird der aktuelle Thread in eine verkettete Liste eingereiht und mittels LockSupport.park()
angehalten. In
bind()
bzw. doBindImpl()
wird der Variablenwert gesetzt und die Kette der Threads durchgegangen und sie wieder
aufgeweckt. So kann man sich einfach verdeutlichen, dass Tasks die gerade in einem ThreadPool ablaufen beim
Lesen der Datenflussvariable blockiert werden bis der Wert zur Verfügung steht. Die Ablaufreihenfolge
ergibt sich direkt aus der Verfügbarkeit der Werte die zur Folge hat, dass der Task dann weiterlaufen kann.
Ein kritischer Punkt ist hier natürlich die Größe des Threadpools, der ausreichen muss, um alle potentiell
blockierten Tasks parallel aufzunehmen, damit die Tasks, die die Werte ermitteln bzw. setzen noch Fortschritt
machen können. Die Ausführung der Tasks im Pool ist im ersten Code-Beispiel (aus PGroup) im Artikel dargestellt.
Es gibt auch die Möglichkeit mittels getValAsync() einen GPars Aktor (oder MessageStream, s.u. der
DataflowOperatorActor) dann eine Nachricht zu schicken, wenn der Wert gebunden wurde. Diese werden auch in die
o.a. verkettete Liste eingereiht und werden in doBindImpl() benachrichtigt.
// aus DataflowExpression, leicht gekürzt und abgewandelt
@Override
public T getVal() throws InterruptedException {
ThreadLink threadLink = null;
while (state.get() != S_VALUE_BOUND) {
if (threadLink == null) {
threadLink = new ThreadLink(Thread.currentThread(), null, null, null);
}
final ThreadLink previous = threads.get();
if (previous == alreadyProcessing) break;
threadLink.previous = previous;
if (threads.compareAndSet(previous, threadLink)) {
// ok, we are in the queue, so writer is responsible to process us
while (state.get() != S_VALUE_BOUND) {
LockSupport.park();
if (Thread.currentThread().isInterrupted()) handleInterruption(threadLink);
}
break;
}
}
return value;
}
protected void doBindImpl(final T value) {
this.value = value;
state.set(S_VALUE_BOUND);
final ThreadLink threadLink = threads.getAndSet(alreadyProcessing);
for (ThreadLink current = threadLink; current != null; current = current.previous) {
if (current.compareAndSet(false, true)) {
if (current.thread != null) {
// geparkten Thread aufwecken
LockSupport.unpark(current.thread);
} else {
// Nachricht an Aktor
if (current.callback != null) {
scheduleCallback(current.attachment, current.callback);
}
}
}
}
}
Es gibt in GPars viele Möglichkeiten mittels einer DSL (bzw. Groovy Operatoren) Datenflüsse miteinander zu
verketten bzw. wie Leitungssysteme oder logische Schaltungen aufzusplitten, abzuzweigen, wieder
zusammenzuführen oder per 1-aus-n Auswahl zu schalten. Für komplexere Verknüpfungen stehen auch Broadcasts
bzw. 1-zu-n Kommunikation mittels DataFlowStream
zur Verfügung.
Bei den Operatoren kann man zwischen zwei Mechanismen unterscheiden, der einfachere sind die schon
diskutierten Tasks die andere stellen eher generische Datenfluss-Operatoren dar, die eine Reihe von Eingabe-
und Ausgabe-Kanälen, ihren Parallelitätsgrad und den auszuführenden Code deklarieren. Diese
DataflowOperators
basieren auf einem allgemeinem DataFlowProcessor
, der auch noch für Selektoren (1 aus n,
DataflowSelector
) genutzt wird.
Interessanterweise bildet ein Aktor das Herzstück des DataflowProcessors
, an den die Datenfluss-Kanäle
abgelegt werden und dessen Empfangsmechanismen für das Sammeln der erwarteten Eingabewerte zuständig sind
(neben denen die schon aus den Eingabe-Kanälen direkt gelesen werden können).
Wenn alle geforderten Werte vorhanden sind wird der in Form einer Closure bereitgstellte Code mit diesen Parametern ausgeführt und der Aktor/DatenflussOperator steht für die nächste
Runde bereit. Der Nachrichtenempfang kümmert sich auch um das sanfte oder abrupte Ableben des Aktors beim Empfang der entsprechenden Nachrichten ("StopGently" oder "PoisonPill").
class DataflowOperatorActor extends DataflowProcessorActor {
private Map values = new HashMap(10);
....
final void afterStart() {
queryInputs(true);
}
private void queryInputs(final boolean initialRun) {
for (int i = 0; i < inputs.size(); i++) {
final DataflowReadChannel input = (DataflowReadChannel) inputs.get(i);
if (initialRun || !(input instanceof DataflowVariable)) {
// Registrierung dieses Aktors bei der Variablen, s.o. mit dem
// "Attachment" i also dem Parameter-Index
input.getValAsync(i, this);
} else {
try {
values.put(i, input.getVal()); // blockierendes Lesen aus dem Kanal
} catch (InterruptedException e) {
throw new IllegalStateException("....", e);
}
}
}
}
...
@Override
public final void onMessage(final Object message) {
if (message instanceof StopGently) {
stoppingGently = true;
return;
}
// eigentlicher Wert
final Object result = msg.get("result");
// numerischer Index
final Object attachment = msg.get("attachment");
if (checkPoison(result)) return;
values.put(attachment, result);
if (values.size() == inputs.size()) {
// ... sortieren nach Index
startTask(arrivedValues);
values = new HashMap(values.size()); // reset
// ... etwaiger stop
}
}
// callback in doBindImpl aus DataFlowExpression / DataFlowVariable
protected void scheduleCallback(final Object attachment, final MessageStream callback) {
if (attachment == null) {
callback.send(value);
} else {
Map message = new HashMap();
message.put(ATTACHMENT, attachment);
message.put(RESULT, value);
callback.send(message);
}
}
Das "attachment" bildet der numerische (Parameter-)Index des Wertes das vorher im erwähnten doBindImpl
der
DataFlowVariable zusammen mit dem eigentlichen Wert an den registrierten Ziel-Aktor geschickt wird.
Registriert wird es z.b. im queryInputs()
des Aktors.
Ganz allgemein erfolgt die Verkettung von Datenfluss-Operatoren mittels DataFlowChannels
von denen
DataFlowVariable
eine Inkarnation darstellt. Andere Varianten, die auch das Binden mehrerer Werte oder
Variablen erlauben sind DataFlowQueue
oder für 1:n / n:m Kommunikation ein DataflowBroadcast
der auf in
einem synchronisierendem WriteAdapter gekapselten DataFlowStreams
basiert. Der Broadcast erlaubt es beliebig
viele ReadChannels zu erzeugen, die dann alle ähnlich wie Datenflussvariablen (und intern auch darauf
aufbauend) mit getVal()
auf die in den Broadcast geschriebenen Werte zugreifen können. Es stellt also eine Art
Publish-Subscribe Modell dar.
Der DataflowStream auf dem das ganze basiert nutzt StreamCore<T>
das eine typisch funktionale Datenstruktur
aus Head (DataFlowVariable<T>
) und Rest (AtomicReference<StreamCore<T>>
) darstellt und bei allen Operationen
ebenso wie funktionale Äquivalente auf die rekursive Abarbeitung alá stream.apply(op) = op(head) &
rest.apply(oop) setzt.