JEXP                       JEXP

Hinter den Kulissen: GPars (groovy parallel systems)

Die meisten Entwickler, die Brian Goetz' "Java Concurrency in Practice" gelesen haben, bekamen erst grosse Augen, dann ein nachdenkliches Gesicht und legten das Buch mit einem "das ist mir zu hoch beseite". Nun ist nebenläufiges Programmieren keine Sache vor der man einfach die Augen verschliessen kann und dann geht sie vorbei. Diese "schönen" Zeiten sind vorüber. Da die von Java angebotenen low-level Mechanismen, um korrekt nebenläufig zu programmieren zu können - "synchronized", wait(), notify[All](), "volatile" und "final" weder trivial noch gut zu komplexen Systemen komponierbar sind, haben sich diverse Autoren daran gemacht, das Leben für Entwickler einfacher zu gestalten.

Da sind zum einen Doug Lea’s Helfer in java.util.concurrent, das schon vielerorts beschriebene Fork-Join-Framework von Java 7, Akka, die Scala Aktoren Bibliothek, die Ansätze in Clojure aus der letzten Kolumne und das schon früher beschriebene Disruptor Pattern von LMAX. Neben diesen gibt es noch viele weitere Ansätze.

Einem davon, dem GPars Framework, wollen wir heute ins Getriebe schauen. GPars ist ein Groovy Framework von Václav Pech, das teilweise in Johannes Links' Artikel in JavaSpektrum 05/10 aus der Nutzersicht vorgestellt wurde. GPars stellt eine Vielzahl von Möglichkeiten bereit, in Groovy und Java Programmen mit einfach zu benutzenden Konzepten, parallele Programmierung anzugehen, ohne sich mit den Basisprimitiven herumzuärgern. Damit wird auch die Fehlerträchtigkeit der nebenläufigen Programmierung deutlich reduziert. Nur zum kleinen Teil wird die Mächtigkeit von Groovy benutzt, um z.B. mit Closures und DSLs übersichtlichen Code zu ermöglichen. Insgesamt stellt GPars Mechanismen bereit, die genauso gut in Java genutzt werden können.

Mit GPars bekommt man ein ganzes Potpourri an Konstrukten bereitgestellt, das man in seiner nebenläufigen Programmierung nutzen kann:

  • "parallel collections" ermöglichen Operationen auf Datencontainern (halb)automatisch zu parallelisieren

  • Map/Reduce, eine Umsetzung des bekannten Google Mechanismus zur parallelen Verarbeitung grosser Datenmengen

  • asynchrone Funktionen erlauben existierende Funktionen in asynchrone Wrapper umzuwandeln und mittels einer DSL zu verketten

  • Aktoren bieten den von Erlang und Akka genutzen Ansatz leichtgewichtige Nachrichtensender und -empfänger mit lokalen Zustand zu vernetzen

  • Dataflow ist eine workflow-artige Herangehensweise, um Aufgaben mittels Datenflüssen zu verketten und zu synchronisieren

  • Kanban-Flow steuert den Parallelitätsgrad wie im Kanbanprozess aus der Lean-Produktion mit einer festen Anzahl an Datencontainern

Von allen Angeboten, die GPars offeriert, will ich heute nur auf die "Parallel Collections" und "Dataflow" eingehen. Den Abschluss soll ein kleiner Ausblick auf Dierk Königs interessanten Kanban Flow Ansatz bilden.

Als Beispielprogramm möchte ich eine serielle Groovy Variante des "Game Of Life" nutzen, die ich vor längerem geschrieben habe. Der Game Of Life Algorithmus ist sehr vielfältig einsetzbar, z.b. für Code-Retreats und Sprachenvergleich. Ausserdem konnte ich diese Gelegenheit nutzen, um endlich einmal parallele Versionen davon zu schreiben.

GPars ist ein CodeHaus open-source Projekt das jetzt als Teil von Groovy ausgeliefert wird, somit ist es unkompliziert sich die Quellen zu besorgen und in die Tiefen der Implementierung abzutauchen. Glücklicherweise ist GPars (anders als Doug Lea’s Schöpfungen) sehr gut strukturiert, dokumentiert und leicht lesbar. Danke Václav!

Basis-Infrastruktur in GPars

GPars nutzt unter der Haube die existierenden Java Bibliotheken, um die parallele Ausführung von Funktionen zu steuern. Vor allem die ThreadPools der ExecutorServices und das in JSR-166 entwickelte in Java7 vorhandene Fork-Join-Framework werden zur Anwendung gebracht. Um diese Funktionalität auch auf Java 6 zur Verfügung zu stellen, wird Doug Lea’s 166y Bibliothek für Fork-Join zusammen mit 166extras für Parallel-Arrays eingebunden.

Die PGroup ist ein wichtiges Basiskonzept, sie erlaubt eine logische Gruppe von Aktoren, Flows, Operatoren oder parallelen Collections zu erzeugen, welche einen gemeinsamen ThreadPool nutzen und über die PGroup gesteuert werden können. Die PGroup bietet eine Menge Methoden zur Erzeugung der o.a. beteiligten Elemente und Konfiguration des ThreadPools.

Beispielsweise wird hier ein DataFlow-Task erzeugt. (auf den später noch eingegangen wird):

public DataflowVariable task(final Callable callable) {
    final DataflowVariable result = new DataflowVariable();
    threadPool.execute(new Runnable() {
        public void run() {
            Dataflow.activeParallelGroup.set(PGroup.this);
            try {
                result.bind(callable.call());
            } catch (Exception e) {
                    result.bind(e);
            } finally {
                Dataflow.activeParallelGroup.remove();
            }
        }
    });
    return result;
}

Für das Ergebnis des Tasks wird eine Datenflussvariable angelegt an die dann entweder Ergebnis oder Exception gebunden wird. Dann wird die aktuelle PGroup als aktiv gesetzt, der Task dem ThreadPool übergeben und nach erfolgreicher Ausführung die PGroup wieder deaktiviert.

Parallel Collections

Zuerst einmal das obligatorische, einfache Beispiel:

import static groovyx.gpars.GParsPool.withPool
// oder
import static groovyx.gpars.GParsExecutorsPool.withPool
withPool { pool ->
   (1.10000).toList().findAllParallel { it % 2 == 0}
       .collectParallel { it * it }
       .groupByParallel { it % 10 }
}

Was passiert hier? withPool ist eine Methode aus GParsPool und nutzt eine Groovy Category, um den innerhalb des Blocks genutzten Collections die parallelen Methoden (findAllParallel, collectParallel, usw.) hinzuzufügen. Es gibt auch die Möglichkeit die Collection-Klasse manuell anzureichern, das passiert dann mit ParallelEnhancer.enhanceClass(HashSet) (oder enhanceInstance() pro Objekt). Mittels collection.makeConcurrent() werden die originalen Methoden wie collect, sort usw. gekapselt und werden dann parallel ausgeführt, so dass nicht einmal Quellcodeänderungen notwendig sind.

Ausserdem fügt withPool einem internen Stack einen neuen Verweis auf den aktuellen PGroup (ParallelGroup, s.o.) hinzu, der für die aktuelle Ausführung genutzt wird. Mit optionalen Parametern von withPool lässt sich dieser ThreadPool noch konfigurieren.

Was passiert nun in diesen parallelen Methoden?

Die beiden Varianten von GParsPool benutzen sehr verschiedene Mechanismen zur parallelen Ausführung von Methoden auf Containern.

GParsPool macht es sich einfach und bedient sich der Parallel Arrays von Doug Lea (aus der 166extras Bibliothek). Diese basieren auf Fork-Join und nutzen einen Fork-Join Pool zur Ausführung. Da es sich hierbei um eine Java Bibliothek handelt, kann sie auch problemlos in eigenen Projekten direkt eingesetzt werden.

Eine wichtige Aussage im Fork-Join Framework [ForkJoinTask] ist, dass Container so in Fragmente zerlegt werden sollten, das zwischen 100 und 10000 Basis-Operationen pro Task erfolgen, um optimal ausgeführt zu werden. Kleinere Fragmente machen den Overhead zu teuer, wohingegen zu grosse Fragmente zu lange dauern um optimal verteilt zu werden.

Für Parallel Arrays ist anzumerken, dass ihre Erzeugung mit O(n) relativ kostspielig ist, sie aber in der Ausführung sehr gut skalieren. Die Implementierung von collectParallel() wie folgt aus:

static <T> Collection<T> collectParallel(Collection collection, Closure<? extends T> cl) {
    return GParsPoolUtilHelper.createPAFromCollection(collection, retrievePool()).
           withMapping(new ClosureMapper(new CallClosure(cl))).all().asList();
}

Zuerst wird ein Parallel Array aus der Liste erzeugt (wenn es nicht schon eines ist). withMapping() ist die collect-Funktion von Parallel Arrays, der dann ein (doppelter) Wrapper für die übergebene Closure mitgegeben wird. Mittels all().asList() werden alle Elemente des Mappings zurückgeliefert und in eine Liste gewandelt.

Prinzipiell können Parallel Arrays dedizierte Operatoren übergeben werden. Um optimale Ausführung auch für primitive Datentypen zu erlauben gibt es diese deklariert für alle möglichen Typ-Kombinationen (für Filter und Mappings). Ebenso sind die angebotenen Methoden "dupliziert" um der kombinatorischen Explosionsvielfalt gerecht zu werden. In typischer Doug Lea Manier manifestiert sich das ganze in einer höchst komplexen Klasse mit ca. 8000 Zeilen Code.

Generell ist ein Operator, wie erwartet, definiert als:

interface Ops.Op<A,R> { R op(A a); }

In der Implementierung von [PAGameOfLife] wird deutlich, wie man parallel Arrays in Java einsetzen kann.

class PAGameOfLife {

Ein paar andere Methoden von ParallelArray sind:

withFilter(predicate), withMapping(mapping), withBounds(from,to), sort(), allUniqueElements(), all(), reduce(reducer,base), apply(procedure), max()

Die ursprüngliche GPars-Implementierung für parallele Container bedient sich der Java 5 ThreadPools und lässt für die meisten Operationen einfach einen Task pro Container-Element ablaufen. Dort wäre die Bildung entsprechend grosser Fragmente schon hilfreich, um den Overhead zu minimieren. In GParsExecutorsPoolUtil werden die Methoden deklariert, beispielsweise (vereinfacht):

public static def collectParallel(Object collection, Closure cl) {
    return processResult(collection.collect(callParallel(cl)))
}
private static Future callParallel(Closure task) {
    final ExecutorService pool = GParsExecutorsPool.retrieveCurrentPool()
    if (!pool) throw new IllegalStateException("No ExecutorService ...")
    return pool.submit(new Callable() {
        @Override
        Object call() {
            return task()
        }
    })
}
static List<Object> processResult(List<Future<Object>> futures) {
    final Collection<Throwable> exceptions = new ArrayList<Throwable>()
final List<Object> result = futures.collect {
    try {
        return it.get()
    } catch (Throwable e) {
        exceptions.add(e)
        return e
    }
}
    if (exceptions.empty) return result
    throw new AsyncException("Some asynchronous operations failed. ${exceptions}", exceptions)
}

Die Closure cl mit der Operation für die Liste wird also pro Element der Collection in einem ThreadPool ausgeführt und die Ergebnisse aus den zurückgegebenen Futures dann anschliessend aufgesammelt und etwaige Exceptions herausgezogen.

Auch das ist ebenso einfach direkt in Java umzusetzen. Den Charme von GPars macht die sinnvolle und einfache API zu Nutzung dieser Features aus, ohne dass man in die Tiefen der Implementierung absteigen muss und sich mit Pools, Konvertierungen, Wrappern und den low-level APIs herumschlagen muss.

Wo Licht ist, ist natürlich auch Schatten. An meinem "Game of Life" Beispiel [PGoLStatic] wird schnell deutlich, dass die hohen Kosten der Erzeugung von Parallel Arrays nur für grosse, zu verarbeitende Datenmengen sinnvoll sind. Daher hatte meine initiale Parallelisierung des Problems eher negative Auswirkungen. Erst mit einem entsprechend grossen Bord wurde eine Einsparung von ca. 50% auf 4 Kernen sichtbar. Dazu war es aber auch noch notwendig, die Gemütlichkeit von Groovy wie wir es kennen, zu verlassen. In der Kommunikation mit Václav erfuhr ich, dass Groovy im dynamischen Modus Methodenaufrufe synchronisiert. Und zwar um Änderungen an den Metaklassen determistisch zu bestimmten Zeitpunkten anzuwenden (ähnlich der Savepoints des JIT in Hotspot). Somit wurde mein hübsches Programm mit Typdeklarationen und @CompileStatic Annotationen "verschandelt", um dem zu entgehen. Erst danach zeigte sich der gewünschte Leistungszuwachs. Natürlich stellt sich dann die Frage, ob es nicht sinnvoller wäre, dann gleich ein Java Programm zu schreiben und in den sauren Apfel zu beissen und ParallelArrays direkt zu verwenden.

DataFlow - Datenflüsse zur Arbeitssynchronisierung

Im zweiten Teil der Kolumne geht es um die Nutzung von Datenfluss-Modellierung via GPars.

Der Datenfluss Ansatz nimmt Anleihen bei bekannten Workflowmechanismen. Wie Mitarbeiter in einer Produktion erst beginnen können, wenn all ihre Ausgangsstoffe vorhanden sind, so können Workflow-Prozesse erst beginnen wenn ihre Vorbedingungen erfüllt sind und Daten bereitstehen. Eine ähnliche Herangehensweise ist auch von Rule-Engines bekannt.

Auch hier erst einmal ein einfaches Beispiel von Dataflows. In 3 getrennten Aufgaben werden x und y Werte zugewiesen und dann separat addiert. Dabei können die ersten beiden Aufgaben parallel ausgeführt werden und die finale Ausgabe von z erfolgt erst, nachdem der Wert bereitsteht. Schön ist, dass die Parallelitäts-Implementierung im Programm nicht prominent ist und die eigentliche Geschäftslogik überschattet.

Die im folgenden genutzte Dataflows-Klasse ist ein Groovy-Bean das dynamisch Properties als DataFlowVariables hinzufügt, so dass deren Nutzung noch bequemer wird. Die Lese- und Schreibzugriffe werden dann auf die dahinterliegenden Datenflussvariablen-Instanzen weitergeleitet.

import groovyx.gpars.dataflow.*
import static groovyx.gpars.dataflow.Dataflow.task
final def flows = new Dataflows()
task {
    flows.z = flows.x + flows.y
}
task {
    flows.x = 10
}
task {
    flows.y = 5
}
println "Result: ${flows.z}"

Im Datenfluss Ansatz wird die Synchronisierung potentiell paralleler Aufgaben über die Verfügbarkeit von Variableninhalten vorgenommen. Normale Datenflussvariablen können nur einmal geschrieben werden und stehen dann sofort allen darauf wartenden Beteiligten zur Verfügung. Wenn alle Inhalte, die für eine Operation benötigt werden bereitstehen, kann diese Fortschritt machen, entweder bis zum Ende oder bis zur nächsten, nicht erfüllten Abhängigkeit.

Es sollte darauf geachtet werden, dass jede Datenfluss-Aufgabe klein und überschaubar ist, deterministisch zum Ende kommt und Variablen erst dann liest, wenn es sie wirklich benötigt.

Durch die Datenfluss Modellierung und den Fokus auf Daten statt Prozesse, können folgende Eigenschaften erreicht werden:

  • maximaler Durchsatz

  • keine Livelocks

  • deterministische Deadlocks

  • einfacher Code

Livelocks treten nicht auf da die Aufgaben nicht selbst über die Blockierung und Freigabe von Resource bestimmen können.

Deadlocks können, wie schon bei Neo4j beschrieben einfach als Zyklen im Abhängigkeitsgraph erkannt werden und sie treten immer wieder an denselben Stellen auf. Wenn z.B. Aufgabe 1 erst beta liest und dann alpha setzt, dagegen Aufgabe 2 umgekehrt erst alpha liest und dann beta setzt, dann verklemmen sie sich immer wieder.

Aus den überschaubaren Tasks können komplexere Aufgaben durch Komposition entstehen. Etwas das mit den Basis-Synchronisationsmechanismen von Java nicht ohne weiteres möglich ist. Brian Goetz sagt in seinem Buch [JCP]: "Locking is not composable."

Wie funktioniert Dataflow nun in GPars.

Das Kernstück bilden DataflowVariables, die aus der Basisklasse DataflowExpression erben und eine Vielzahl von Methoden anbieten, von denen uns erst einmal nur getVal() und bind() interessieren.

Wenn der Wert der Datenflussvariable mittels getVal() gelesen werden soll und er noch nicht gebunden wurde, wird der aktuelle Thread in eine verkettete Liste eingereiht und mittels LockSupport.park() angehalten. In bind() bzw. doBindImpl() wird der Variablenwert gesetzt und die Kette der Threads durchgegangen und sie wieder aufgeweckt. So kann man sich einfach verdeutlichen, dass Tasks die gerade in einem ThreadPool ablaufen beim Lesen der Datenflussvariable blockiert werden bis der Wert zur Verfügung steht. Die Ablaufreihenfolge ergibt sich direkt aus der Verfügbarkeit der Werte die zur Folge hat, dass der Task dann weiterlaufen kann. Ein kritischer Punkt ist hier natürlich die Größe des Threadpools, der ausreichen muss, um alle potentiell blockierten Tasks parallel aufzunehmen, damit die Tasks, die die Werte ermitteln bzw. setzen noch Fortschritt machen können. Die Ausführung der Tasks im Pool ist im ersten Code-Beispiel (aus PGroup) im Artikel dargestellt.

Es gibt auch die Möglichkeit mittels getValAsync() einen GPars Aktor (oder MessageStream, s.u. der DataflowOperatorActor) dann eine Nachricht zu schicken, wenn der Wert gebunden wurde. Diese werden auch in die o.a. verkettete Liste eingereiht und werden in doBindImpl() benachrichtigt.

// aus DataflowExpression, leicht gekürzt und abgewandelt
@Override
public T getVal() throws InterruptedException {
    ThreadLink threadLink = null;
    while (state.get() != S_VALUE_BOUND) {
        if (threadLink == null) {
            threadLink = new ThreadLink(Thread.currentThread(), null, null, null);
        }
final ThreadLink previous = threads.get();
if (previous == alreadyProcessing) break;
        threadLink.previous = previous;
        if (threads.compareAndSet(previous, threadLink)) {
            // ok, we are in the queue, so writer is responsible to process us
            while (state.get() != S_VALUE_BOUND) {
                LockSupport.park();
                if (Thread.currentThread().isInterrupted()) handleInterruption(threadLink);
            }
            break;
        }
    }
    return value;
}
protected void doBindImpl(final T value) {
    this.value = value;
    state.set(S_VALUE_BOUND);
final ThreadLink threadLink = threads.getAndSet(alreadyProcessing);
    for (ThreadLink current = threadLink; current != null; current = current.previous) {
        if (current.compareAndSet(false, true)) {
            if (current.thread != null) {
 // geparkten Thread aufwecken
                LockSupport.unpark(current.thread);
            } else {
                // Nachricht an Aktor
                if (current.callback != null) {
                    scheduleCallback(current.attachment, current.callback);
                }
            }
        }
    }
}

Es gibt in GPars viele Möglichkeiten mittels einer DSL (bzw. Groovy Operatoren) Datenflüsse miteinander zu verketten bzw. wie Leitungssysteme oder logische Schaltungen aufzusplitten, abzuzweigen, wieder zusammenzuführen oder per 1-aus-n Auswahl zu schalten. Für komplexere Verknüpfungen stehen auch Broadcasts bzw. 1-zu-n Kommunikation mittels DataFlowStream zur Verfügung.

Bei den Operatoren kann man zwischen zwei Mechanismen unterscheiden, der einfachere sind die schon diskutierten Tasks die andere stellen eher generische Datenfluss-Operatoren dar, die eine Reihe von Eingabe- und Ausgabe-Kanälen, ihren Parallelitätsgrad und den auszuführenden Code deklarieren. Diese DataflowOperators basieren auf einem allgemeinem DataFlowProcessor, der auch noch für Selektoren (1 aus n, DataflowSelector) genutzt wird.

Interessanterweise bildet ein Aktor das Herzstück des DataflowProcessors, an den die Datenfluss-Kanäle abgelegt werden und dessen Empfangsmechanismen für das Sammeln der erwarteten Eingabewerte zuständig sind (neben denen die schon aus den Eingabe-Kanälen direkt gelesen werden können).

Wenn alle geforderten Werte vorhanden sind wird der in Form einer Closure bereitgstellte Code mit diesen Parametern ausgeführt und der Aktor/DatenflussOperator steht für die nächste Runde bereit. Der Nachrichtenempfang kümmert sich auch um das sanfte oder abrupte Ableben des Aktors beim Empfang der entsprechenden Nachrichten ("StopGently" oder "PoisonPill").

class DataflowOperatorActor extends DataflowProcessorActor {
    private Map values = new HashMap(10);
....
    final void afterStart() {
        queryInputs(true);
    }
private void queryInputs(final boolean initialRun) {
    for (int i = 0; i < inputs.size(); i++) {
        final DataflowReadChannel input = (DataflowReadChannel) inputs.get(i);
        if (initialRun || !(input instanceof DataflowVariable)) {
// Registrierung dieses Aktors bei der Variablen, s.o. mit dem
// "Attachment" i also dem Parameter-Index
            input.getValAsync(i, this);
        } else {
            try {
                values.put(i, input.getVal()); // blockierendes Lesen aus dem Kanal
            } catch (InterruptedException e) {
                throw new IllegalStateException("....", e);
            }
        }
    }
}
...
@Override
public final void onMessage(final Object message) {
    if (message instanceof StopGently) {
        stoppingGently = true;
        return;
    }
    // eigentlicher Wert
    final Object result = msg.get("result");
    // numerischer Index
    final Object attachment = msg.get("attachment");
    if (checkPoison(result)) return;
    values.put(attachment, result);
    if (values.size() == inputs.size()) {
        // ... sortieren nach Index
        startTask(arrivedValues);
        values = new HashMap(values.size()); // reset
		// ... etwaiger stop
    }
}
// callback in doBindImpl aus DataFlowExpression / DataFlowVariable
protected void scheduleCallback(final Object attachment, final MessageStream callback) {
    if (attachment == null) {
        callback.send(value);
    } else {
        Map message = new HashMap();
        message.put(ATTACHMENT, attachment);
        message.put(RESULT, value);
        callback.send(message);
    }
}

Das "attachment" bildet der numerische (Parameter-)Index des Wertes das vorher im erwähnten doBindImpl der DataFlowVariable zusammen mit dem eigentlichen Wert an den registrierten Ziel-Aktor geschickt wird. Registriert wird es z.b. im queryInputs() des Aktors.

Ganz allgemein erfolgt die Verkettung von Datenfluss-Operatoren mittels DataFlowChannels von denen DataFlowVariable eine Inkarnation darstellt. Andere Varianten, die auch das Binden mehrerer Werte oder Variablen erlauben sind DataFlowQueue oder für 1:n / n:m Kommunikation ein DataflowBroadcast der auf in einem synchronisierendem WriteAdapter gekapselten DataFlowStreams basiert. Der Broadcast erlaubt es beliebig viele ReadChannels zu erzeugen, die dann alle ähnlich wie Datenflussvariablen (und intern auch darauf aufbauend) mit getVal() auf die in den Broadcast geschriebenen Werte zugreifen können. Es stellt also eine Art Publish-Subscribe Modell dar.

Der DataflowStream auf dem das ganze basiert nutzt StreamCore<T> das eine typisch funktionale Datenstruktur aus Head (DataFlowVariable<T>) und Rest (AtomicReference<StreamCore<T>>) darstellt und bei allen Operationen ebenso wie funktionale Äquivalente auf die rekursive Abarbeitung alá stream.apply(op) = op(head) & rest.apply(oop) setzt.

Kanban Flow

Dierk König hat vor einiger Zeit das Kanban Flow Pattern vorgestellt [König Kanbanflow]. Es bedient sich dem aus dem Lean Manufacturing bekannten Kanban Prinzip, das darauf basiert dass ein Beteiligter aktiv nach Arbeit (oder Werkstücken) fragt (Pull-Prinzip), wenn er freie Kapazität hat.

sidebar/box

Die Repräsentation dieser Nachfrage erfolgt durch eine Karte (oder Kiste, "Tray") aus einer limitierten (Work in Progress, WIP) Menge, die an das konkrete Werkstück gekoppelt wird solange es in Bearbeitung ist. D.h. die maximale Anzahl von Stücken im System ist durch die Anzahl Karten vorgegeben, sie sollte auf die Anzahl der Arbeitstationen limitiert sein, so dass keine Zwischenlager notwendig sind und alle Arbeit "just-in-time" erfolgen kann. In solchen Systemen wird auch schnell deutlich, wo ein Flaschenhals existiert. Davor (bzw. am Ende der vorherigen Stationen) würde sich die Arbeit sammeln.

Die Abbildung dieser Karten erfolgt durch KanbanTray-Objekte (also "Kisten"), die die eigentlichen "Produkt"-Daten kapseln und deren Anzahl im System limitiert ist. Nur dadurch kann in KanbanFlow aufbauend Datenflussvariablen und Aufgaben (Tasks) die Parallelität kontrolliert und Deadlocks/Verklemmungen verhindert werden. Das ist auch darin begründet, dass das System erst einmal keine Zyklen enthalten darf. Die feste Anzahl limitiert auch automatisch die Grösse der Warteschlangen im System auf maximal n Elemente, so dass keine Überläufe oder Blockierungen durch zu hohe Produktionsfrequenz auftreten. Die Anzahl von Karten lässt auch leicht die Last auf dem System steuern und es bei WIP==0 anhalten, WIP==1 sequentiell abarbeiten oder WIP > 0 langsam wieder anfahren bis mit WIP==SUM(erzeuger + verbraucher) der optimale Wert erreicht ist.

simples Code Beispiel:

import static groovyx.gpars.dataflow.ProcessingNode.node
import groovyx.gpars.dataflow.KanbanFlow
def producer = node { down -> down << UUID.randomUUID() }
def consumer = node { up   -> println up.take() }
new KanbanFlow().with {
    link producer to consumer
    start()
    Thread.sleep(500)
    stop()
}

Diese einfachste Konfiguration ist ein simpler KanbanLink zwischen einen Erzeuger und einem Verbraucher, die über einen Hauptkanal (downstream) für KanbanTray mit Produkt und Rückkanal (upstream) für die leeren KanbanTrays miteinander verbunden sind. Dabei bestehen die beiden Kanäle aus DataFlowQueues, die jeweils Ausgaben und Eingaben pro Erzeuger und Verbraucher abbilden. Das Hinzufügen und Entfernen von Containern erfolgt durch Setzen eines neuen Objektes auf dem Upstream bzw. Auslesen eines Objektes vom Downstream eines KanbanLinks. Wenn ein Produkt in eine Kiste gelegt wird, bindet sich diese an den Hauptkanal vom Erzeuger zum Verbraucher, für das Entnehmen des Produktes und zurücksenden, bindet sich die Kiste an den Rückkanal zum Erzeuger. Fortschritt auf der jeweils anderen Seite kann erst erfolgen wenn der vorherige Container aus dem jeweiligen Kanal entnommen wurde.

class KanbanTray {
KanbanLink link
Object product
    // Produkt hinzufügen und tray downstream senden
    void bind(product) {
        this.product = product
        link.downstream.bind this
    }
	// Produkt entnehmen und tray upstream senden
    Object take() {
        def result = product
        product = null
        link.upstream.bind this
        return result
    }
}

Für komplexere Setups ist es einfach, mehrere Erzeuber und Verbraucher in 1:n, n:1 oder 1:n:1 Konstellationen zu konfigurieren. Dann würden an der entsprechenden DataFlowQueue einfach mehrere Verbraucher "warten" bzw. mehrere Erzeuger in diesselbe Queue schreiben.

Thread-Sicherheit wird erreicht, indem die Erzeuger und Verbraucher möglichst zustandslos sind, der einzige Zustand ist das Produkt das auf Reise geschickt wird. Und dieser wird immer nur von einem Verarbeitungsknoten zu einer Zeit bearbeitet.

Prinzipiell ist es auch möglich Zyklen im Setup zu haben (muss explizit erlaubt werden). Dann wird ein Erzeuger zu seinem eigenen Verbraucher und bildet einen Generator. Wenn er seinen Zustand nur im Produkt hält ist er per Definition thread-sicher, ansonsten muss der interne Zustand synchronisiert werden.

Last updated 2012-08-05 23:43:01 CEST
Impressum - Twitter - GitHub - StackOverflow - LinkedIn