SignalRChat – Using RxJS to add live notificaitons
I have been playing around a little with Reactive Extensions for javascript and must say that I am really impressed. RxJS a port of the .NET based version which is, as they describe it on their site, “a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.“. So what does this mean? Let me give you an example.
I wanted to add a live notification function to the chat, as soon as someone is typing, three dots should appear next to the user’s name. This means that as soon as the user starts we need to send a message to the server, this message is then broadcasted to all the connected clients. To not flood the server and the users, maximum one message per second should be sent.
Keydown – time since last message >= 1 second, or this is the first keydown in the sequence => send message
Keydown – time since last message < 1 second => do nothing
There is a built in method called Throttle that does almost this, we could use it, but then the first keystroke would get delayed and I did not want this. So being a beginner at RX I struggled for a while trying to get to where I wanted. I came up with a solution that did what it I wanted, but it was not so elegant. Then I found this post on StackOverflow, with a solution by Sergey Aldoukhov that solved the problem in a very elegant way, so I just decided to use it instead. Sergey had created an extension method called OneInTime, this is the code:
Rx.Observable.prototype.OneInTime = function (delay) {
return this
.Take(1)
.Merge(Rx.Observable.Empty().Delay(delay))
.Repeat();
};
So what does this mean? Take(1) is the simple part, we take 1 event from the stream. The merge expression is used to merge multiple IObservable streams into single IObservable stream. Each value on the source streams is projected into the result stream until all source streams complete. The Repeat method with no argument will repeatedly subÂscribe to the same observable collection indefinitely. So by taking one, merging with the delayed empty stream and then calling Repeat(), we take one event from the stream, and then skips all the events during the next second. After one second we repeat. Exactly what we wanted to achieve!
And this is how I would use it in the chat (#msg is the text input):
$("#msg").toObservable("keypress") // Create an observable from the keypress event
.OneInTime(1000)
.Subscribe(function () { chat.userTyping(); });
Pretty neat right?
The code is live at http://signalrchat.apphb.com/ and you can find the source on GitHub.
One thought on “SignalRChat – Using RxJS to add live notificaitons”