Clojure Async Channels: Kindergarden Party

Let’s learn about Clojure async channels by organizing a kindergarden party.

The parents bake cake and bring them to the event. Cake has to be cut in pieces, put on tables etc. Everybody is doing something to make it a nice event.

Preparation

Create a new leiningen project and update the project.clj to

(defproject sample "0.1.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.5.1"]
                 [org.clojure/core.async "REPLACE_THIS_WITH_THE_LATEST_VERSION_ON_http://clojure.github.io/core.async/"]
                 [org.jgroups/jgroups "3.4.4.Final"]])

Handing over a cake

Anne is preparing the table with the cakes. She takes a cake and puts it on a table.

Open a REPL session to execute the code.

(require '[clojure.core.async :refer :all])

Create a channel:

(def anne (chan))

Let Anne wait in the background:

(thread (println "Put" (<!! anne) "on the table"))

<!! reads from a channel and waits until something is put onto the channel.

A crib for me: The channel is !! and you send something to it >!! or read from it <!!

Pass the cake to the channel.

(>!! anne "Apple cake")

And we see:

Put Apple cake on the table

A channel is blocking. If you write to a channel, you will wait until a read from this channel happens.

Tuning our code a bit

Our code is not very efficient.

(thread (println "Put" (<!! anne) "on the table"))

This code created a thread waiting in the background. This blocks a JVM thread even if Anne is doing nothing. You can verify it by starting a jconsole.

To improve, we can use a go block. It runs in the background but releases the background thread when Anne is doing nothing. In a go block <! is used instead of <!!

(go (println "Put" (<! anne) "on the table"))

Next, let Anne take over the job for a while and simulate parents bringing a couple of cakes over time.

Anne:

(go (loop [cake (<! anne)]
       (if cake
          (do (println (str "Anne puts " cake " cake on the table"))
              (Thread/sleep 1000)
              (recur (<! anne)))
          (println (str "I, Anne go home")))))

Or a bit shorter:

(go-loop [cake (<! anne)]
	     (if cake
	         (do (println (str "Anne puts " cake " cake on the table"))
	             (Thread/sleep 1000)
	             (recur (<! anne)))
	         (println (str "I, Anne go home"))))

The parents:

(go (doseq [cake ["Apple" "Pineapple" "Chocolade" "Creme"]]
	    (println "About to deliver cake " cake)
	    (>! anne cake)
	    (Thread/sleep (rand-int 1000))))

A closed channel returns nil. If you close the channel, Anne is going home. :

(close! anne)

The problem we are facing is that parents are queuing up, as there are too many cakes. So we setup a small table where the parents can at least put 2 cakes, which are then taken by Anne. Basically, we create a buffered channel.

(def anne (chan 2))

Holger starts helping Anne now. So we introduce the small table as channel and both Anne and Holger fetch cake from it to place it on the tables.

(defn put-cake-on-the-table [name in-table]
	(go-loop [cake (<! in-table)]
	       (if cake
	         (do (println (str name " puts " cake " cake on the table"))
	             (Thread/sleep 1000)
	             (recur (<! in-table)))
	         (println (str "I," name " go home")))))
	
(def small-table (chan 2))
(put-cake-on-the-table "Anne" small-table)
(put-cake-on-the-table "Holger" small-table)

Let’s introduce a method for delivering cakes.

(defn deliver-some-cakes [table]
	(go (doseq [cake ["Apple" "Pineapple" "Chocolade" "Creme"]]
	    (println (str "About to deliver cake " cake))
	    (>! table cake)
	    (Thread/sleep (rand-int 1000)))))
	
(deliver-some-cakes small-table)	

Splitting work

Anne and Holger are going to specialize. Anne takes only fruit cakes whereas Holger takes all other cakes.

We can achieve this using a splitter.

A cake is now a map: {:name “Apple” :type "fruit"}

First we enhance the put-cake-on-the-table and deliver-some-cake methods to work with a map.

(defn put-cake-on-the-table [name in-table]
	(go-loop [{cake :name} (<! in-table)]
	       (if cake
	         (do (println (str name " puts " cake " cake on the table"))
	             (Thread/sleep 2000)
	             (recur (<! in-table)))
	         (println (str "I," name " go home")))))
	
(defn deliver-some-cakes [in-table]
	(go (doseq [{:keys [name] :as cake} [{:name "Apple" :type "fruit"}
	                          {:name "Pineapple" :type "fruit"}
	                          {:name "Chocolade" :type "other"}
	                          {:name "Creme" :type "other"}]]
	    (println (str "About to deliver cake " name))
	    (>! in-table cake)
	    (println (str "Delivered cake " name))
	    (Thread/sleep (rand-int 1000)))))

Then we can split the work.

(def small-table (chan 2))
(let [[cake-for-anne cake-for-holger] (split #(= (:type %) "fruit") small-table)]
	(put-cake-on-the-table "Anne" cake-for-anne)
	(put-cake-on-the-table "Holger" cake-for-holger))
(deliver-some-cakes small-table)

Next Karl is going to help out. He is making small labels for every cake, before Anne and Holger put them on the tables.

(defn make-label [my-name in-table me]
	(go-loop [{name :name :as cake} (<! in-table)]
	       (if cake
	         (do (println (str my-name " makes label for " name))
	             (>! me cake)
	             (recur (<! in-table))))))

(def karl (chan))
(def small-table (chan 2))
(let [[cake-for-anne cake-for-holger] (split #(= (:type %) "fruit") karl)]
	(put-cake-on-the-table "Anne" cake-for-anne)
	(put-cake-on-the-table "Holger" cake-for-holger))
(make-label "Karl" small-table karl)
(deliver-some-cakes small-table)

Merging work

Judie has a nice hand writing and starts writing labels as well. Anne and Holger can now take cakes from both of them.

We can merge messages from two channels into one using merge.

(def karl (chan))
(def judie (chan))
(def small-table (chan 2))
(let [[cake-for-anne cake-for-holger] (split #(= (:type %) "fruit") 
(merge [karl judie]))]
	(put-cake-on-the-table "Anne" cake-for-anne)
	(put-cake-on-the-table "Holger" cake-for-holger))
(make-label "Karl" small-table karl)
(make-label "Judie" small-table judie)
(deliver-some-cakes small-table)

Alternating

As more and more cakes are delivered, the small entry table has not enough capacity. We set up a second table and ask Karl and Judie to take cakes from both tables.

If no cake is their, they just do nothing for a moment.

(defn make-label [my-name in-tables me]
	(go-loop [[{name :name :as cake} _] (alts! in-tables :default "nothing")]
	       (if cake
	         (do
	           (if (= cake "nothing")
	             (do (println "Do nothing for a moment")
	                 (Thread/sleep 500))
	             (do (println (str my-name " makes label for " name))
	                 (>! me cake)))
	           (recur (alts! in-tables :default "nothing")))
	         (println (str my-name " goes home")))))

(def karl (chan))
(def judie (chan))
(def small-table-1 (chan 2))
(def small-table-2 (chan 2))
(let [[cake-for-anne cake-for-holger] (split #(= (:type %) "fruit")
	                                         (merge [karl judie]))]
	(put-cake-on-the-table "Anne" cake-for-anne)
	(put-cake-on-the-table "Holger" cake-for-holger))
(make-label "Karl"  [small-table-1 small-table-2] karl)
(make-label "Judie" [small-table-1 small-table-2] judie)
(deliver-some-cakes small-table-1)

The function alts! alternates between a vector of channel. It none of the channel contains messages, it can block or fall back to a default value.

(alts! in-tables :default "nothing")

You can use alts! to write to different channels as well.

Timeout

Karl and Judie decide not wait for ever for cakes. If no cake is delivered for a long time, they go home.

This can be achieved by adding a timeout channel to the alts! function. It creates a channel which closes after the defined milliseconds. A closed channel returns nil.

The modified method looks like

(defn make-label [my-name in-tables me]
	(go-loop [[{name :name :as cake} _] (alts! (cons (timeout 5000) in-tables))]
		(if cake
	       (do (println (str my-name " makes label for " name))
	           (>! me cake)
	           (recur (alts! (cons (timeout 500) in-tables))))
	       (println (do (println (str my-name " does nothing for a moment"))
	                    (Thread/sleep 500))))))
							

(def karl (chan))
(def judie (chan))
(def small-table-1 (chan 2))
(def small-table-2 (chan 2))
(let [[cake-for-anne cake-for-holger] 
	(split #(= (:type %) "fruit") (merge [karl judie]))]
	(put-cake-on-the-table "Anne" cake-for-anne)
	(put-cake-on-the-table "Holger" cake-for-holger))
(make-label "Karl"  [small-table-1 small-table-2] karl)
(make-label "Judie" [small-table-1 small-table-2] judie)
(deliver-some-cakes small-table-1)

Network

If we need more computing power, we can start distributing the application across multiple computers. So we need a channel, which actually writes to a remote channel.

Let’s write a simple solution using reliable messaging via JGroups.

The basic usage pattern is to create a channel, connect to a cluster name, set a receiver to react on incoming messages and to use the channel to send messages.

Use the following to use the JGroups default configuration. It uses UDP multicast for node detection.

(def c (org.jgroups.JChannel.))

Or specify a customized configuration if you like.

(def c (org.jgroups.JChannel. (clojure.java.io/resource "jgroups.xml")))

(def r (reify org.jgroups.Receiver
         (receive [this msg]
           (println (.getObject msg) (.getSrc msg)))
         (viewAccepted [this view]
           (println view))))
(.setReceiver c r)
(.connect c "foo")
(.send c nil "hello")

If you struggle sending messages, have a look in the JGroups documentation. It has a trouble shooting section.

It is easier, to use JGroups messaging, if we provide a couple of utility classes. Let’s built a channel which sends messages to remote addresses and a receiver which passes
the message to a local channel.

Here is the source code.

(ns sample.jgroups-async
	(:import (org.jgroups Channel Address Receiver))
	(:require [clojure.core.async.impl.protocols :as protocols]
	      [clojure.core.async.impl.channels :as channels]
	      [clojure.core.async :refer [close! >!!]]))

clojure.core.async defines protocols. If you implement WritePort, then you can use your channel with the async API >!! to send messages.

(deftype JgroupsWriteChannel [^Channel jgroups-channel ^Address target-address]
	protocols/WritePort
	(protocols/put! [port val fn0-handler]
		(when (nil? val)
			(throw (IllegalArgumentException. "Can't put nil on Jgroups channel")))
			(.send jgroups-channel (org.jgroups.Message. target-address nil val))
			(channels/box nil))
	protocols/Channel
	(protocols/close! [chan]
		(.close jgroups-channel)))

A factory method bootstraps JGroups and creates instances of the JGroupsWriteChannel.

(defn create-write-channel [cluster-name target-address]
	(let [jc (org.jgroups.JChannel.)]
	(.setReceiver jc (reify Receiver
	                   (viewAccepted [this view]
	                     (println view))))
	(.connect jc cluster-name)
	(JgroupsWriteChannel. jc target-address)))

The receiver is composed of a JGroups channel and a async channel.

(deftype JgroupsReceiver [jgroups-channel local-channel]
	protocols/Channel
	(protocols/close! [chan]
	(.close jgroups-channel)
	(close! local-channel)))

A factory method bootstraps JGroups and creates instances of JgroupsReceiver.

(defn create-remote-receiver
	[cluster-name port]
	(let [jc (org.jgroups.JChannel.)]
		(.setReceiver jc (reify Receiver
	    	(receive [this msg] (>!! port (.getObject msg)))
        	(viewAccepted [this view] (println view))))
	(.connect jc cluster-name)
	(JgroupsReceiver. jc port)))

Let’s play with it.

(require '[clojure.core.async :refer :all])
(def anne (create-remote-receiver "foo" (chan)))
(def joe (create-write-channel "foo" (.getAddress(.jgroups_channel anne))))
(go-loop [m (<! (:local-channel anne))] 
	(if m (do (println "Anne " m)
		(recur (<! (:local-channel anne))))))

Now, we can send messages around

(>!! joe "Hello Anne")

Once you are done, close the JGroups cluster and the channels.

	
(doseq [c [anne joe]] (close! c))

As an exercise, try to split the cake preparation steps across two nodes.

Summary

If you ever wanted to parallelize processing in an application, Clojure async is a great tool set to achieve this. I hope you enjoyed the article.

Sebastian Hennebrueder