{"id":225,"date":"2012-12-19T11:07:34","date_gmt":"2012-12-19T10:07:34","guid":{"rendered":"http:\/\/blogs.igalia.com\/jfernandez\/?p=225"},"modified":"2012-12-19T11:07:34","modified_gmt":"2012-12-19T10:07:34","slug":"node-js-socket-io-real-time-io","status":"publish","type":"post","link":"https:\/\/blogs.igalia.com\/jfernandez\/2012\/12\/19\/node-js-socket-io-real-time-io\/","title":{"rendered":"Node.js + Socket.io = Real-Time IO."},"content":{"rendered":"<p>The use of javascript for implementing server-side logic is not breaking news these days, but definitively <a href=\"http:\/\/nodejs.org\/\">Node.js<\/a> is gaining relevance as one of the hottest technologies in this area. There are multiple reasons that explain this trend, but clearly one of those is the asynchronous event-driven model and it&#8217;s advantages for dealing with <strong>BigData<\/strong> problems.<\/p>\n<p>When dealing with real-time requirements, <a href=\"http:\/\/socket.io\">Socket.io<\/a> can play an important role on the web application architecture, providing an abstraction layer for the communication between the browser and the server. The Node.js event-driven model combined with the Socket.io <strong>real-time<\/strong> capabilities offer a good solution to face BigData challenges on domains where real-time capability is important<\/p>\n<p>I&#8217;ll try to show some examples of the combination of these two technologies for implementing real-time web operations.<\/p>\n<h2>Socket.io based client-server communication<\/h2>\n<p>Lets consider the basic and default configuration of socket.io, described as follows:<\/p>\n<ul>\n<li>Client side javascript<\/li>\n<\/ul>\n<pre lang=\"JAVASCRIPT\">\"use strict\";\r\n\r\njQuery(document).ready(function() {\r\n\r\n  var socket = io.connect();\r\n\r\n  socket.on('connect', function() {\r\n    console.log('connected');\r\n  });\r\n  socket.on('disconnect', function() {\r\n    console.log('disconnected');\r\n  });\r\n  socket.on('error', function(err) {\r\n    if(err === 'handshake error') {\r\n      console.log('handshake error', err);\r\n    } else {\r\n      console.log('io error', err);\r\n    }\r\n  });\r\n  socket.on('updates', function(newUpdates) {\r\n  });\r\n  $(\"#target\").click(function() {\r\n    socket.emit('myEvent');\r\n  });\r\n\u00a0});<\/pre>\n<p>The script uses JQuery to provide support for UI operations manipulating the DOM by the &#8216;updates&#8217;\u00a0 event handler. This event is emitted by the server&#8217;s StreamAssembler, which I&#8217;ll describe later. Obviously, Socket.io does not require at all JQuery and it could be even defined inside a Javascript tag in the html page.<\/p>\n<p>The client script can also emit events though the socket, which will be handled by server Node.js event loop. It&#8217;s a <strong>bidirectional<\/strong> communication channel.<\/p>\n<ul>\n<li>Server side javascript<\/li>\n<\/ul>\n<pre lang=\"JAVASCRIPT\">'use strict';\r\n\r\nvar express = require('express');\r\nvar fs = require('fs');\r\nvar indexBuffer = fs.readFileSync('index.html').toString();\r\nvar app = express();\r\nvar io = require('socket.io');\r\nvar http = require('http');\r\nvar server = http.createServer(app);\r\n\r\napp.use(express.bodyParser());\r\napp.use('\/scripts', express.static(__dirname + '\/scripts'));\r\n\r\napp.get('\/',\r\n  function(req, res) {\r\n  console.log('Request to \"\/\" ...');\r\n  res.contentType('text\/html');\r\n  res.send(indexBuffer);\r\n});\r\n\r\nserver.listen(8080);\r\nio = io.listen(server);\r\n\r\nio.configure(function (){\r\n  io.set('log level', 1);\r\n});\r\n\r\nio.sockets.on('connection', function (socket) {\r\n  console.log('got socket.io connection - id: %s', socket.id);\r\n  var assembler = new StreamAssembler(keys, socket, redisClient);\r\n\r\n\u00a0 socket.on('myEvent', function() {\r\n\u00a0\u00a0\u00a0 console.log('\"myEvent\" event received');\r\n\u00a0 });\r\n\r\n  socket.on('disconnect', function() {\r\n    \/\/ needs to be stopped explicitly or it will continue\r\n    \/\/ listening to redis for updates.\r\n    if(assembler)\r\n      assembler.stop();\r\n  });\r\n});<\/pre>\n<p>This code represents a minimalistic http server with Socket.io support. It just creates the server using the express module and makes the Socket.io process listening the http server. The Socket.io configuration just sets the log level, but it might be used for other purposes, like authentication.<\/p>\n<p>The sever also sets up the <strong>StreamAssembler<\/strong>, which is the responsible of collecting, aggregating and assembling the raw data retrieved from the database and emitting events for the client (Browsers) through the Socket.io communication channel.<\/p>\n<h2>Stateful processing and data handy in-memory<\/h2>\n<p>The Node.js even-driven model eases the development of client\/server stateful logic, which is very important when implementing distributed systems devoted to online processing of stream data and for assembling in one place all the context required for servicing a web request. It also helps to define <strong>async states-machine<\/strong> <strong>patterns<\/strong> thanks to the <a title=\"Node.js is not single-threaded\" href=\"http:\/\/rickgaribay.net\/archive\/2012\/01\/28\/node-is-not-single-threaded.aspx\">single-threaded <\/a>approach, so the implementation results simpler and easier to debug than the typical multi-thread based solutions.<\/p>\n<p>Also, perhaps even more important when dealing with real-time requirements, the in-memory data processing is mandatory to really provide a real-time user experience. Node.js provides a programming model fully aligned with this real-time approach in mind.<\/p>\n<p>So, lets consider we have access to a large storage where huge data streams are stored and manipulated. Lets consider <strong>Redis<\/strong> as a cache system for such large storage, to be used for real-time purposes.<\/p>\n<p style=\"text-align: center\"><a href=\"http:\/\/blogs.igalia.com\/jfernandez\/files\/2012\/12\/Screenshot11.png\"><img loading=\"lazy\" decoding=\"async\" class=\"size-medium wp-image-332 aligncenter\" src=\"http:\/\/blogs.igalia.com\/jfernandez\/files\/2012\/12\/Screenshot11-300x142.png\" alt=\"\" width=\"300\" height=\"142\" srcset=\"https:\/\/blogs.igalia.com\/jfernandez\/files\/2012\/12\/Screenshot11-300x142.png 300w, https:\/\/blogs.igalia.com\/jfernandez\/files\/2012\/12\/Screenshot11.png 721w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p style=\"text-align: left\">The <strong>StreamAssembler<\/strong> component receives a constant raw data stream and produces structured data, <strong>aggregating<\/strong> data from different sources, always under the limit of the window size in order to ensuring all the operations are executed <strong>in memory<\/strong>, taking into account the server&#8217;s HW specifications.<\/p>\n<p style=\"text-align: left\">It uses the <a title=\"redis-sync at github\" href=\"https:\/\/github.com\/pconstr\/redis-sync\">redis-sync<\/a> module for <a title=\"Additional info about redis-sync\" href=\"https:\/\/blogs.igalia.com\/jfernandez\/2012\/11\/22\/exploting-the-redis-replication-interface-with-node-js\/\">exploiting the Redis Replication interface<\/a> and monitoring the Redis storage, looking for commands that alter the database status on specific keys. It might also use the redis-sync module for replicating specific keys from the Redis (cache) database to the main storage (sediment), larger and usually offering better performance on write operations (Cassandra or HBase, for instance).<\/p>\n<pre lang=\"JAVASCRIPT\">function StreamAssembler(keys, socket, redisClient) {\r\n  var that = this;\r\n\r\n  var updates = {};\r\n  var monitor = null;\r\n\r\n  function moveServerWindow() {\r\n    console.info('Moving server Window');\r\n    serverList = [];\r\n    var k;\r\n    for (k in updates) { serverList.push([k, updates[k].t]);}\r\n    serverList.sort(function(a, b) {\r\n      return a[1] &lt; b[1] ? -1 : 0;\r\n    });\r\n    while (serverList.length &gt; serverWindowLimit) {\r\n      var toDelete = serverList.pop();\r\n      delete updates[toDelete[0]];\r\n    }\r\n  }\r\n\r\n \u00a0function addUpdates(results) {\r\n    var idList = [];\r\n    var i, update, t, uk, u;\r\n    for(i = 0; i &lt; results.length; i += 2) {\r\n      update = JSON.parse(results[i]);\r\n      t = results[i + 1];\r\n\r\n      uk = update.id;\r\n      idList.push(uk);\r\n\r\n      u = updates[uk];\r\n\r\n      if(u === undefined) {\r\n        \/\/console.info(uk, 'not seen yet');\r\n        u = {t:t, data:update};\r\n        updates[uk] = u;\r\n      }\r\n    }\r\n    return idList;\r\n  }\r\n\r\n  function getRawData(key, cb) {\r\n    console.log('Getting raw data from: ' + key);\r\n    redisClient.zrange(key, '-100', '-1', 'withscores',\r\n                       function(err, results) {\r\n      if(err) return cb(err);\r\n      addUpdates(results);\r\n      cb(null);\r\n    });\r\n  } \r\n\r\n  function initialUpdate() {\r\n    console.log('initial update');\r\n    moveServerWindow();\r\n    socket.emit('updates', updates);\r\n  } \r\n\r\n  that.addRawDataKeys = function addRawDataKeys(keys) {\r\n    var rem = 0; var tlId;\r\n    for(key in keys) {\r\n      ++rem;\r\n      getRawData(keys[key], function(err) {\r\n        if(err) console.error(err);\r\n        --rem;\r\n        if(rem === 0) {\r\n          initialUpdate();\r\n          that.addMonitorKeys(keys);\r\n        }\r\n      });\r\n    }\r\n    if(rem === 0) {\r\n      console.log('No update keys'); \/\/ no updates to retrieve\r\n      initialUpdate();\r\n      that.addMonitorKeys(keys);\r\n    }\r\n  } \r\n\r\n  that.addRawDataKeys(keys); \r\n\r\n  that.addMonitorKeys = function addMonitorKeys(keys) {\r\n    if (monitor) {\r\n      monitor.addKeys(keys);\r\n    } else {\r\n      console.log('Creating new monitor');\r\n      monitor = new m.Monitor(keys);\r\n      monitor.on('changed', handleCommand);\r\n      monitor.connect();\r\n    }\r\n  } \r\n\r\n  that.stop = function() {\r\n    if (monitor) {\r\n      console.log('Stopping monitor');\r\n      monitor.disconnect(handleCommand);\r\n    }\r\n  } \r\n\r\n  function handleCommand(key, command, args) {\r\n    var i, t, u;\r\n    var tlId, id, values;\r\n    var key, suId, prop, v, enc, eng;\r\n    var newUpdates = [];\r\n    if(command === 'zadd') {\r\n      var values = [];\r\n      for(i = 0; i &lt; args.length; i += 2) {\r\n        t = Buffer.concat(args[i]).toString();\r\n        u = Buffer.concat(args[i + 1]).toString();\r\n        values.push(u);\r\n        values.push(t);\r\n        newUpdates.push(JSON.parse(u));\r\n      }\r\n      addUpdates(values);\r\n      moveServerWindow();\r\n      socket.emit('dUpdates', newUpdates);\r\n    }\r\n  }\r\n}<\/pre>\n<p>The StreamAssembler uses the specific socket, passed as argument and created by the Node.js server through the socket.io module, to emit two different events: <em>&#8220;updates&#8221;<\/em>,\u00a0 for the initial updates retrieved from the Redis database, and <em>&#8220;dUpdates&#8221;<\/em>, for incremental updates detected by the redis-sync monitor.<\/p>\n<h2>Some examples: system performance monitoring<\/h2>\n<p>With the diagram described above in mind, I&#8217;ve been playing with Node.js, and Socket.io to implement some illustrative examples of how such architecture works and how to implement real-time communication with Browsers.<\/p>\n<p>We could define a simple retriever of system performance data (e.g, top, netstat, &#8230;) and feed a Redis database with raw data from several hosts.<\/p>\n<p>The StreamAssembler will transform the raw data into structured data, to be displayed by the Browser using the <a href=\"http:\/\/d3js.org\/\">d3.js<\/a> library.<\/p>\n<p style=\"text-align: center\"><a href=\"http:\/\/blogs.igalia.com\/jfernandez\/files\/2012\/12\/Screenshot1.png\"><img loading=\"lazy\" decoding=\"async\" class=\"wp-image-331 aligncenter\" src=\"http:\/\/blogs.igalia.com\/jfernandez\/files\/2012\/12\/Screenshot1-300x225.png\" alt=\"\" width=\"300\" height=\"225\" srcset=\"https:\/\/blogs.igalia.com\/jfernandez\/files\/2012\/12\/Screenshot1-300x225.png 300w, https:\/\/blogs.igalia.com\/jfernandez\/files\/2012\/12\/Screenshot1.png 1024w\" sizes=\"auto, (max-width: 300px) 100vw, 300px\" \/><\/a><\/p>\n<p style=\"text-align: left\">There is a video of the code running available <a title=\"Igalia's TV channel\" href=\"http:\/\/www.youtube.com\/watch?v=Lrg68-89l04&amp;list=UUIpArN21nIT2lvZX-nkjP7A&amp;index=1\">here<\/a>; check also the source code <a title=\"Monitoring Demo at GitHub\" href=\"https:\/\/github.com\/javifernandez\/monitoring\">here<\/a>. It&#8217;s just a small example of the use of Node.js + Socket.io and the StreamAssembler pattern.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>The use of javascript for implementing server-side logic is not breaking news these days, but definitively Node.js is gaining relevance as one of the hottest technologies in this area. There are multiple reasons that explain this trend, but clearly one of those is the asynchronous event-driven model and it&#8217;s advantages for dealing with BigData problems. &hellip; <a href=\"https:\/\/blogs.igalia.com\/jfernandez\/2012\/12\/19\/node-js-socket-io-real-time-io\/\" class=\"more-link\">Continue reading <span class=\"screen-reader-text\">Node.js + Socket.io = Real-Time IO.<\/span><\/a><\/p>\n","protected":false},"author":20,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[15],"tags":[],"class_list":["post-225","post","type-post","status-publish","format-standard","hentry","category-bigdata"],"_links":{"self":[{"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/posts\/225","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/users\/20"}],"replies":[{"embeddable":true,"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/comments?post=225"}],"version-history":[{"count":74,"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/posts\/225\/revisions"}],"predecessor-version":[{"id":783,"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/posts\/225\/revisions\/783"}],"wp:attachment":[{"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/media?parent=225"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/categories?post=225"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blogs.igalia.com\/jfernandez\/wp-json\/wp\/v2\/tags?post=225"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}