Node.js + Socket.io = Real-Time IO.

The use of javascript for implementing server-side logic is not breaking news these days, but definitively Node.js is gaining relevance as one of the hottest technologies in this area. There are multiple reasons that explain this trend, but clearly one of those is the asynchronous event-driven model and it’s advantages for dealing with BigData problems.

When dealing with real-time requirements, Socket.io can play an important role on the web application architecture, providing an abstraction layer for the communication between the browser and the server. The Node.js event-driven model combined with the Socket.io real-time capabilities offer a good solution to face BigData challenges on domains where real-time capability is important

I’ll try to show some examples of the combination of these two technologies for implementing real-time web operations.

Socket.io based client-server communication

Lets consider the basic and default configuration of socket.io, described as follows:

  • Client side javascript
"use strict";
 
jQuery(document).ready(function() {
 
  var socket = io.connect();
 
  socket.on('connect', function() {
    console.log('connected');
  });
  socket.on('disconnect', function() {
    console.log('disconnected');
  });
  socket.on('error', function(err) {
    if(err === 'handshake error') {
      console.log('handshake error', err);
    } else {
      console.log('io error', err);
    }
  });
  socket.on('updates', function(newUpdates) {
  });
  $("#target").click(function() {
    socket.emit('myEvent');
  });
 });

The script uses JQuery to provide support for UI operations manipulating the DOM by the ‘updates’  event handler. This event is emitted by the server’s StreamAssembler, which I’ll describe later. Obviously, Socket.io does not require at all JQuery and it could be even defined inside a Javascript tag in the html page.

The client script can also emit events though the socket, which will be handled by server Node.js event loop. It’s a bidirectional communication channel.

  • Server side javascript
'use strict';
 
var express = require('express');
var fs = require('fs');
var indexBuffer = fs.readFileSync('index.html').toString();
var app = express();
var io = require('socket.io');
var http = require('http');
var server = http.createServer(app);
 
app.use(express.bodyParser());
app.use('/scripts', express.static(__dirname + '/scripts'));
 
app.get('/',
  function(req, res) {
  console.log('Request to "/" ...');
  res.contentType('text/html');
  res.send(indexBuffer);
});
 
server.listen(8080);
io = io.listen(server);
 
io.configure(function (){
  io.set('log level', 1);
});
 
io.sockets.on('connection', function (socket) {
  console.log('got socket.io connection - id: %s', socket.id);
  var assembler = new StreamAssembler(keys, socket, redisClient);
 
  socket.on('myEvent', function() {
    console.log('"myEvent" event received');
  });
 
  socket.on('disconnect', function() {
    // needs to be stopped explicitly or it will continue
    // listening to redis for updates.
    if(assembler)
      assembler.stop();
  });
});

This code represents a minimalistic http server with Socket.io support. It just creates the server using the express module and makes the Socket.io process listening the http server. The Socket.io configuration just sets the log level, but it might be used for other purposes, like authentication.

The sever also sets up the StreamAssembler, which is the responsible of collecting, aggregating and assembling the raw data retrieved from the database and emitting events for the client (Browsers) through the Socket.io communication channel.

Stateful processing and data handy in-memory

The Node.js even-driven model eases the development of client/server stateful logic, which is very important when implementing distributed systems devoted to online processing of stream data and for assembling in one place all the context required for servicing a web request. It also helps to define async states-machine patterns thanks to the single-threaded approach, so the implementation results simpler and easier to debug than the typical multi-thread based solutions.

Also, perhaps even more important when dealing with real-time requirements, the in-memory data processing is mandatory to really provide a real-time user experience. Node.js provides a programming model fully aligned with this real-time approach in mind.

So, lets consider we have access to a large storage where huge data streams are stored and manipulated. Lets consider Redis as a cache system for such large storage, to be used for real-time purposes.

The StreamAssembler component receives a constant raw data stream and produces structured data, aggregating data from different sources, always under the limit of the window size in order to ensuring all the operations are executed in memory, taking into account the server’s HW specifications.

It uses the redis-sync module for exploiting the Redis Replication interface and monitoring the Redis storage, looking for commands that alter the database status on specific keys. It might also use the redis-sync module for replicating specific keys from the Redis (cache) database to the main storage (sediment), larger and usually offering better performance on write operations (Cassandra or HBase, for instance).

function StreamAssembler(keys, socket, redisClient) {
  var that = this;
 
  var updates = {};
  var monitor = null;
 
  function moveServerWindow() {
    console.info('Moving server Window');
    serverList = [];
    var k;
    for (k in updates) { serverList.push([k, updates[k].t]);}
    serverList.sort(function(a, b) {
      return a[1] < b[1] ? -1 : 0;
    });
    while (serverList.length > serverWindowLimit) {
      var toDelete = serverList.pop();
      delete updates[toDelete[0]];
    }
  }
 
  function addUpdates(results) {
    var idList = [];
    var i, update, t, uk, u;
    for(i = 0; i < results.length; i += 2) {
      update = JSON.parse(results[i]);
      t = results[i + 1];
 
      uk = update.id;
      idList.push(uk);
 
      u = updates[uk];
 
      if(u === undefined) {
        //console.info(uk, 'not seen yet');
        u = {t:t, data:update};
        updates[uk] = u;
      }
    }
    return idList;
  }
 
  function getRawData(key, cb) {
    console.log('Getting raw data from: ' + key);
    redisClient.zrange(key, '-100', '-1', 'withscores',
                       function(err, results) {
      if(err) return cb(err);
      addUpdates(results);
      cb(null);
    });
  } 
 
  function initialUpdate() {
    console.log('initial update');
    moveServerWindow();
    socket.emit('updates', updates);
  } 
 
  that.addRawDataKeys = function addRawDataKeys(keys) {
    var rem = 0; var tlId;
    for(key in keys) {
      ++rem;
      getRawData(keys[key], function(err) {
        if(err) console.error(err);
        --rem;
        if(rem === 0) {
          initialUpdate();
          that.addMonitorKeys(keys);
        }
      });
    }
    if(rem === 0) {
      console.log('No update keys'); // no updates to retrieve
      initialUpdate();
      that.addMonitorKeys(keys);
    }
  } 
 
  that.addRawDataKeys(keys); 
 
  that.addMonitorKeys = function addMonitorKeys(keys) {
    if (monitor) {
      monitor.addKeys(keys);
    } else {
      console.log('Creating new monitor');
      monitor = new m.Monitor(keys);
      monitor.on('changed', handleCommand);
      monitor.connect();
    }
  } 
 
  that.stop = function() {
    if (monitor) {
      console.log('Stopping monitor');
      monitor.disconnect(handleCommand);
    }
  } 
 
  function handleCommand(key, command, args) {
    var i, t, u;
    var tlId, id, values;
    var key, suId, prop, v, enc, eng;
    var newUpdates = [];
    if(command === 'zadd') {
      var values = [];
      for(i = 0; i < args.length; i += 2) {
        t = Buffer.concat(args[i]).toString();
        u = Buffer.concat(args[i + 1]).toString();
        values.push(u);
        values.push(t);
        newUpdates.push(JSON.parse(u));
      }
      addUpdates(values);
      moveServerWindow();
      socket.emit('dUpdates', newUpdates);
    }
  }
}

The StreamAssembler uses the specific socket, passed as argument and created by the Node.js server through the socket.io module, to emit two different events: “updates”,  for the initial updates retrieved from the Redis database, and “dUpdates”, for incremental updates detected by the redis-sync monitor.

Some examples: system performance monitoring

With the diagram described above in mind, I’ve been playing with Node.js, and Socket.io to implement some illustrative examples of how such architecture works and how to implement real-time communication with Browsers.

We could define a simple retriever of system performance data (e.g, top, netstat, …) and feed a Redis database with raw data from several hosts.

The StreamAssembler will transform the raw data into structured data, to be displayed by the Browser using the d3.js library.

There is a video of the code running available here; check also the source code here. It’s just a small example of the use of Node.js + Socket.io and the StreamAssembler pattern.

Exploting the Redis replication interface with Node.js

The Redis replication interface can be easily exploited for other purposes by creating a new TCP connection and issuing the SYNC command. The Redis server will use such connection to stream any writing command as soon as it’s executed.

'use strict';
 
var util = require('util');
var client = net.connect(6379);
 
client.on('connect', function(a) {
  console.log('syncing ...');
  client.write('sync\r\n');
});
client.on('data', function(data) {
  console.log('Streaming commands ...');
});
client.on('error', function(err) {
  console.log(err);
});
client.on('end', function() {
  console.log('end');
});

At Igalia, we’ve been working on building smart and distributed components for real-time data streams analysis in collaboration with Perceptive Constructs. We are using several of its Redis components to face our BigData and real-time challenges, but perhaps one of the most useful ones has been the redis-sync module.

The redis-sync component is a Node.js module for exploiting the Redis replication interface in order to monitor all the writing commands executed by the server. It emits different signals providing Node.js data structures for the command arguments, which might be handled by top level Javascript applications.

The redis-sync component might get advantage of the rdb-parser component, which helps to parse generic Redis RDB dumps, in order load all the changes in the database prior to the sync call.

The rdb-parser

The rdb-parser module generates Node.js data structures from Redis RDB dumps or commands replies, based on the Redis new Unified Request Protocol.  The current development status offers almost a complete parser for all the Redis entities:

  • REDIS_STRING
  • REDIS_LIST
  • REDIS_SET
  • REDIS_ZSET
  • REDIS_HASH

The rdb-parser emits the ‘entity’ signal for every Bulk Reply detected in the buffer, with the following structure:

that.emit(‘entity’, [REDIS_TYPE, key, data]);

The parsing process is triggered with the function write(data), assigning a new buffer to parse. The process is implemented using a simple states machine pattern and it ensues the data manipulation is binary safe, also according to the Redis unified protocol. The example provided in the repository is quite illustrative.  Just type:

node ./test.js < ./dump.rdb

The redis-sync module

The redis-sync module uses the Redis replication interface to monitor and stream Redis commands which modify the database state. This component is very useful for implementing real-time capabilities but also for data migration into larger databases, since Redis is a pure in-memory storage and data size is too precious.

As the rdb-parser, the redis-sync module is implemented using a simple states machine pattern and supports both, unified protocol and inline commands; it provides binary safeness as well. You can check the usage examples here.

We already commented that it might use the rdb-parser internally for dealing with the initial sync Bulk Reply:

case 'bulkReplyLenR':
  if(data[i] === 10) { // \n
    ++i;
    if((that.listeners('entity').length &gt; 0 || that.listeners('rdb').length &gt; 0) &amp;&amp; !readRDB) {
      if(!rdbParser) {
        rdbParser = new rdb.Parser();
        rdbParser.on('entity', function(e) {
          that.emit('entity', e);
        });
      if(that.listeners('rdb').length === 0) {
        rdbParser.on('error', function(err) {
          // stream is used internally, error handling is done at the outer level });
        }
      }
      that.emit('rdb', rdbParser);
      startReadingBytes(bulkReplyLen, false,
           function(buf) { rdbParser.write(buf); },
           function() { rdbParser.end(); readRDB = true; rdbParser = undefined; connectedOK(); state = 'ready';});
    } else {
      startReadingBytes(bulkReplyLen, false,
           function(buf) { that.emit('bulkReplyData', buf); },
           function() { that.emit('bulkReplyEnd'); readRDB = true; connectedOK(); state = 'ready';});
    }
  }
  break;

Once the initial sync is done and the corresponding Bulk Reply parsed by the rdb-parser, the readRDB variable determines whether a new sync (reconnection) or a regular command reply is being processed.

In order to receive new commands just listen to the “command” or “inlineCommand” events:

  • that.emit(‘command’, command, unifiedArgs.slice(1));
  • that.emit(‘inlineCommand’, inlineCommandBuffers);

Monitoring specific keys

A very basic use case for the redis-sync module would be to monitor individual keys and triggering specific actions. The listeners of such actions will be notified by the Node.js even loop, providing kind of real-time capabilities to the client.

'use strict';
 
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var redisSync = require('redis-sync');
 
function Monitor(keys) {
  var that = this;
  var sync = new redisSync.Sync();
  sync.on('command', function(command, args) {
    var key = Buffer.concat(args[0]).toString();
    if (keys.indexOf(key) !== -1) {
      console.log('key %s changed: %s', key, command);
      that.emit('changed', key, command, args);
    }
  });
  sync.on('error', function(err) {
    console.error(err);
  });
  that.connect = function(p, h) {
    sync.connect(p, h);
  };
}
 
util.inherits(Monitor, EventEmitter);
exports.Monitor = Monitor;

My first Strata Conference

This year was the first time the Strata Conference reached Europe and thanks to Igalia, I could be there to evaluate the investment we have been doing on BigData technologies.

This trip is part of the roadmap of the Distributed Computing team we created at Igalia with the aim of exploring a field where Open Source is a key and how our already more than ten years of experience as Open Source consultants would fit in such a competitive area.

We have been lately collaborating with the company Perceptive Constructs to increase our data modelling and machine learning capabilities. Both companies were present at the Strata Conference to showcase our work on Distributed and Smart components for Social Media streams analysis. We will unveil our achievements in future posts, but first I’ll share my impressions about the conference and the future of the BigData area.

O’Relly Strata Conferences

These conferences were usually US events, with presence in both coasts (New York, San Francisco and Santa Clara), but this time was the first EU conference so it was very important for us to attend. There is a great activity in UK regarding BigData and the Open Data commitment is very important in that area, which is causing a lot of start-ups can grow up there.

The conference is what you could expect from a big event like this, quite expensive but very well organized and fully oriented to networking and business. There were some events very interesting, like the Start-up Competition, connecting young companies and independent developers with investors and entrepreneurs.  The Office Hours gave us the possibility of face-to-face meetings with some of the speakers, which was a great thing in my opinion.

I’ll comment on the talks I’ve considered most relevant, but just mentioned before that I think the contents were very well structured, with a good mix of technical and inspiring stuff. The keynotes were a great warm-up for a conference which I think tries to show the social aspects behind the BigData field and how it could help to acquire better knowledge in an era of access to the information we have never seen before.

The Talks

First of all, I think it’s worth sharing all the keynotes videos, but I would like to comment on the most remarkable ones, in my opinion.

Good Data, Good Values

It was a really inspiring talk, describing how Big Data can help to make a better world, supporting not so big companies and organizations to make sense the BigData they are generating. “No need to have big data for getting big insights”.

The Great Railway Caper: Big Data in 1955

The talk was interesting because it explained very well what BigData is and which are the actual challenges:

  • Partitioning
  • Slow storage media.
  • Algorithms.
  • Lots of storage.

Current situation haven’t changed since 1955:

  • Not enough ram to solve the entire problem.
  • Algorithm doesn’t exits yet.
  • Machines busy for other stuff.
  • Secondary storage are slow.
  • Having to deal with tight deadlines.

Keynote by Liam Maxwell

The UK Government is really pushing for BigData and committed with the Open Data initiative. I would like to see the Spanish government to continue the efforts to increase the transparency and openness regarding the public data.

They really want to work with SMEs, avoiding big players and vendor locking issue, which I personally think is the right approach. As it was stated many time during the conference:

  • Open Source + Open Data = Democratization of BigData.

BigData in retail

This talk was an excellent example of a domain where BigData could fit very well. On-line retail providers manage huge volumes of data from many countries and statistical models apply pretty well on consumer habits and depot stock trends.

They basically use matlab, so I guess the real-time analysis is not crucial. They focus on different angles:

  • Predicting how weather affects on sales.
  • Reducing depot stock holding.
  • Improving promotions.

Transparency transformed

They have developed a very cool system, kind of expert system for detecting, classifying and generating new knowledge on different topics: news and media channels, sports, real state, financial services, … They are now approaching regular companies to analyse their business processes.

  • Scheme: data – facts – angles – structure – narrative.
  • Fully automated process: meta-journalism.

There are some cases studies: financial analysis and on-line education.

  • Generating financial reports from isolated insights.
  • Interpretation of financial charts.
  • Giving advices to students and teachers.
  • Social networks are another example.
  • Challenge of dealing with unstructured data.

The core system is based on AI techniques (expert systems, probably) using pattern-matching rules.

  • They don’t predict, but it’s in the roadmap (long term).
  • They don’t expose API.
  • They don’t do machine-learning.