WARNING:
hub are depreciated in Play2.1. They are replace by Concurrent.broadcast (which is also not documented). I did a very small example that demonstrate the concept and post it on github: https://github.com/benqua/BroadcasterInPlay2
The Problem
Write a stream based (server sent event, comet or WebSocket) web application where many clients need to get the same information (news feed or any kind of dashboard for example).
Why push the same information to all clients? Because this information may be costly to build or it is important every client gets the exact same, time dependent, information.
The Concepts
Basically we want to push an enumerator (think: “input data stream”) to many web clients.
The Code
First, a simple example without hub, the code of the controller could be:
//no hub: each client gets its own event def serverevents = Action { val timestep = 1000 //getmtime is an Enumarator that provides // a new number (time) every second val getmtime = Enumerator.fromCallback{ () => Promise.timeout( { //time in tenth of sec val currentTime = java.lang.System.currentTimeMillis() / 100 Logger.debug(currentTime toString) Some(currentTime +":ds")}, timestep, TimeUnit.MILLISECONDS ) } // the getmtime enumerator is pushed through the Enumerattee returned // by the apply method of the EventSource object, // to be transformed into a Server Sent Event. // The event is then sent to the client. Ok.stream(getmtime &> EventSource()).as("text/event-stream") }
If you run this example in two browsers, you will see that they get different results. It is also obvious, from the log in the Play console, that a new time list is generated for each client.
There is one line per client per second in the log.
If this small function (getting current time in milliseconds) were resource intensive, resource usage would grow linearly with the number of client. There is one call to the function per client every time step.
There is one line per client per second in the log.
If this small function (getting current time in milliseconds) were resource intensive, resource usage would grow linearly with the number of client. There is one call to the function per client every time step.
Now, let's add the hub and refactor a little bit the controller code:
val timestep = 1000 val timeEnum = Enumerator.fromCallback{ () => Promise.timeout( { val currentTime = java.lang.System.currentTimeMillis() Logger.debug(currentTime toString) Some(currentTime +":ds")}, timestep, TimeUnit.MILLISECONDS ) } // now, the hub that let us // "share" the timeEnum between all clients val hub = Concurrent.hub[String](timeEnum) //with hub: one event generation for all clients; def serverhubevents = Action { implicit val encoder = Comet.CometMessage.stringMessages Ok.stream(hub.getPatchCord &> EventSource()).as("text/event-stream") }
Note the use of the magic and undocumented hub.getPatchCord function which, at least conceptually, returns a copy of the timeEnum Enumerator.
With this code, all clients will get the exact same output. The (potentially) resource intensive function that feed the timeEnum would be called only once every time step, independently of the number of clients.
Conclusion
Hubs are a great way to multiplex an answer to many clients.
Sadly, there is no documentation for the moment.
Open Questions
- First, a Scala question: why do I have to add the “implicit” parameter in the hub version while it is not necessary in the version without hub?
- In the first example, if I remove the Enumerator definition from the end point function and put it in the controller object scope, I get the exact same result. In this case, shouldn't the Enumerator be shared between the clients? I don't know exactly what should happen, but I don't understand why each client gets its own enumerator:
val timestep = 1000 val timeEnum = Enumerator.fromCallback{ () => Promise.timeout( { val currentTime = java.lang.System.currentTimeMillis() Logger.debug(currentTime toString) Some(currentTime +":ds")}, timestep, TimeUnit.MILLISECONDS ) } //why do each client have its own timeEnum // (declared in the controller object scope)?? def servereventsshared = Action { implicit val encoder = Comet.CometMessage.stringMessages Ok.stream(timeEnum &> EventSource()).as("text/event-stream") }
This gives the exact same result as the version without hub.
Last question: how will the server know that a client browser window has been closed and that the stream shouldn't be pushed to this client anymore? no way... ?
Thanks
Thanks to Gaƫtan Renaudeau who wrote a very interesting experiment and blog post using, among other things, hub. His experiment raises my attention to this undocumented Play2 feature.