Exploting the Redis replication interface with Node.js

The Redis replication interface can be easily exploited for other purposes by creating a new TCP connection and issuing the SYNC command. The Redis server will use such connection to stream any writing command as soon as it’s executed.

'use strict';
 
var util = require('util');
var client = net.connect(6379);
 
client.on('connect', function(a) {
  console.log('syncing ...');
  client.write('sync\r\n');
});
client.on('data', function(data) {
  console.log('Streaming commands ...');
});
client.on('error', function(err) {
  console.log(err);
});
client.on('end', function() {
  console.log('end');
});

At Igalia, we’ve been working on building smart and distributed components for real-time data streams analysis in collaboration with Perceptive Constructs. We are using several of its Redis components to face our BigData and real-time challenges, but perhaps one of the most useful ones has been the redis-sync module.

The redis-sync component is a Node.js module for exploiting the Redis replication interface in order to monitor all the writing commands executed by the server. It emits different signals providing Node.js data structures for the command arguments, which might be handled by top level Javascript applications.

The redis-sync component might get advantage of the rdb-parser component, which helps to parse generic Redis RDB dumps, in order load all the changes in the database prior to the sync call.

The rdb-parser

The rdb-parser module generates Node.js data structures from Redis RDB dumps or commands replies, based on the Redis new Unified Request Protocol.  The current development status offers almost a complete parser for all the Redis entities:

  • REDIS_STRING
  • REDIS_LIST
  • REDIS_SET
  • REDIS_ZSET
  • REDIS_HASH

The rdb-parser emits the ‘entity’ signal for every Bulk Reply detected in the buffer, with the following structure:

that.emit(‘entity’, [REDIS_TYPE, key, data]);

The parsing process is triggered with the function write(data), assigning a new buffer to parse. The process is implemented using a simple states machine pattern and it ensues the data manipulation is binary safe, also according to the Redis unified protocol. The example provided in the repository is quite illustrative.  Just type:

node ./test.js < ./dump.rdb

The redis-sync module

The redis-sync module uses the Redis replication interface to monitor and stream Redis commands which modify the database state. This component is very useful for implementing real-time capabilities but also for data migration into larger databases, since Redis is a pure in-memory storage and data size is too precious.

As the rdb-parser, the redis-sync module is implemented using a simple states machine pattern and supports both, unified protocol and inline commands; it provides binary safeness as well. You can check the usage examples here.

We already commented that it might use the rdb-parser internally for dealing with the initial sync Bulk Reply:

case 'bulkReplyLenR':
  if(data[i] === 10) { // \n
    ++i;
    if((that.listeners('entity').length &gt; 0 || that.listeners('rdb').length &gt; 0) &amp;&amp; !readRDB) {
      if(!rdbParser) {
        rdbParser = new rdb.Parser();
        rdbParser.on('entity', function(e) {
          that.emit('entity', e);
        });
      if(that.listeners('rdb').length === 0) {
        rdbParser.on('error', function(err) {
          // stream is used internally, error handling is done at the outer level });
        }
      }
      that.emit('rdb', rdbParser);
      startReadingBytes(bulkReplyLen, false,
           function(buf) { rdbParser.write(buf); },
           function() { rdbParser.end(); readRDB = true; rdbParser = undefined; connectedOK(); state = 'ready';});
    } else {
      startReadingBytes(bulkReplyLen, false,
           function(buf) { that.emit('bulkReplyData', buf); },
           function() { that.emit('bulkReplyEnd'); readRDB = true; connectedOK(); state = 'ready';});
    }
  }
  break;

Once the initial sync is done and the corresponding Bulk Reply parsed by the rdb-parser, the readRDB variable determines whether a new sync (reconnection) or a regular command reply is being processed.

In order to receive new commands just listen to the “command” or “inlineCommand” events:

  • that.emit(‘command’, command, unifiedArgs.slice(1));
  • that.emit(‘inlineCommand’, inlineCommandBuffers);

Monitoring specific keys

A very basic use case for the redis-sync module would be to monitor individual keys and triggering specific actions. The listeners of such actions will be notified by the Node.js even loop, providing kind of real-time capabilities to the client.

'use strict';
 
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var redisSync = require('redis-sync');
 
function Monitor(keys) {
  var that = this;
  var sync = new redisSync.Sync();
  sync.on('command', function(command, args) {
    var key = Buffer.concat(args[0]).toString();
    if (keys.indexOf(key) !== -1) {
      console.log('key %s changed: %s', key, command);
      that.emit('changed', key, command, args);
    }
  });
  sync.on('error', function(err) {
    console.error(err);
  });
  that.connect = function(p, h) {
    sync.connect(p, h);
  };
}
 
util.inherits(Monitor, EventEmitter);
exports.Monitor = Monitor;