Meeting Messaging Demands in Integration Scenarios: Weighing Options

In today's complex integration landscape, reliable messaging is a must-have. With a multitude of messaging frameworks and technologies at our disposal, choosing the right fit can be overwhelming. From traditional message queues (MQ) to modern open-source solutions like Kafka, RabbitMQ, and ActiveMQ, each framework has evolved to address specific needs. As microservices continue to gain popularity, engineers are seeking more agile, deployable, and cost-effective solutions. However, every messaging framework comes with its own set of infrastructure and maintenance challenges.

In a recent project, I came across a proposal to leverage MongoDB's capped collection feature, combined with its tailable cursor, as an alternative to traditional messaging infrastructure. This raises several questions:

  • Is this approach suitable for all messaging needs?
  • Can it replace established messaging frameworks like Kafka and RabbitMQ?
  • What are the potential pitfalls?

While MongoDB's capped collection feature is well-known and has been extensively discussed, most articles only provide a superficial overview, focusing on basic implementation without exploring deeper. A comprehensive messaging framework must address a range of challenges beyond mere asynchronous message delivery. In this series of articles, we will delve into these challenges and examine the feasibility of building a messaging infrastructure using MongoDB. For instance, you can explore the potential of MongoDB in building a scalable AMQP-based messaging framework on https://carsnewstoday.com.

Unlocking the Power of Capped Collections and Tailable Cursors in MongoDB

To address the above questions, it's crucial to understand how capped collections and tailable cursors function in MongoDB. A capped collection is a collection with a specified limit, either in terms of document count or total size. This limit enables the collection to behave like a fixed-size circular linked list, maintaining insertion order and providing high throughput for insertions. You can create a capped collection using a MongoDB command. Note that entries in a capped collection cannot be deleted or updated in a way that alters their initial size.

db.createCollection( "log", { capped: true, size: 100000 } )

In stark contrast to a conventional cursor, a tailable cursor offers a more adaptive approach to interacting with a collection, akin to the "tail -f" command. It reads documents in their inherent order. Unlike other cursor types, a tailable cursor remains open even after the client has read all current documents that match the filter. When a new document is inserted and matches the filter, the client receives the new document. If the connection is lost, the implementation driver re-establishes it.

Upon closer examination of this behavior, we can see that it bears a striking resemblance to a FIFO (First-In-First-Out) list, which can be leveraged to build a messaging framework. In this scenario, producers would insert data into capped collections, and consumers would receive the data as it becomes available.

Constructing a Messaging Protocol

In any messaging framework, protocols are essential to facilitate message exchange between different parties. These protocols can vary across messaging frameworks. For instance, RabbitMQ employs the AMQP protocol, where messages pass through an exchange. Publishers send messages to an exchange, and subscribers bind a queue to the exchange using binding rules to receive the messages. The consumer can either fetch or pull messages from the exchange, or the broker can push them automatically. In this article, we will delve into how to implement the AMQP 0-9-1 protocol using MongoDB's tailable cursor feature.

To commence, we need to create a Broker interface that will manage this process. The broker should have two primary functions:

  • publish: This function publishes a message to a specific channel or exchange.
  • subscribe: This function subscribes to a message at a specific exchange.

Our broker will encapsulate the MongoDB-based messaging service under this interface. We have two options to implement this interface: as a standalone microservice or as a library. For simplicity, let's take the library approach for now. With this approach, our architecture will resemble the following.

In this example, we've taken key considerations into account to implement the above interface effectively.

  • A single capped collection implements one exchange.
  • Every message published to the exchange must be linked to a specific routing key.
  • Multiple subscribers can be connected to a single exchange.
  • Subscribers can listen to all messages published to the exchange, filtered by a specific routing key. The routing key is a pivotal concept in RabbitMQ, defining the binding between a subscriber and the exchange through a queue. In our example, a tailable cursor acts as a queue for each subscriber, created based on the filter criteria set by the routing key.

If you have experience with the AMQP paradigm, you may be aware that AMQP 0-9-1 brokers provide four distinct exchange categories:

  • Point-to-point exchange
  • Broadcast exchange
  • Attribute-based exchange
  • Pattern-matching exchange

In my forthcoming series of articles, I will delve into each of these exchange categories, commencing with the Point-to-Point Exchange. This exchange type routes messages based on a specified message key.

using-mongodb-capped-collection-as-messaging-frame_img_0.png

Configuring the Message Broker

The following code snippet implements the broker interface outlined above

//broker.js
const SIZE=1000000;
const MAX_QSIZE=1000
const {MongoClient}=require('mongodb')


class Broker{
    
    constructor(client,option){
        this.option=option;
        this.client=client;
        
        
    }
    /*
    * The factory function to create a Broker instance . The option takes following attributes.
    * url : connection string to mongodb instance
    * dbname: database name
    * name:  exchange name
    */

    static async create(option) {
        let client=null;
        try{
            client = await MongoClient.connect(option.url,{useUnifiedTopology: true });
            const db = client.db(option.dbname);
            option.qsize=option.qsize||MAX_QSIZE;
            //creating capped collection if it does not exist
            let exist=await db.listCollections({ name: option.name }).hasNext();
            if(!exist){
                let result=await db.createCollection(option.name, {capped: true, size: SIZE,max:option.qsize})
                console.log(" Broker  got created with max queue size ",option.qsize);
            }
            //creating the Broker instance
            let broker=new Broker(client,option);
            return broker;
        }catch(e){
            console.log('broker creation failed ',e)
            if(!!client){ 
                //close the connection if creation failed but connection exist
                client.close()
            }
            throw e
        }
        
    }
    /*
    * subscribe by routingkey
    */
    async subscribe(routingkey,next){

       
        var filter = {routingkey:routingkey};

        if('function' !== typeof next) throw('Callback function not defined');

        let db=this.client.db(this.option.dbname)
    
        let collection=await db.collection(this.option.name)  
        var cursorOptions = {
                    tailable: true,
                    awaitdata: true,
                    numberOfRetries: -1
        };
        const tailableCursor = collection.find(filter, cursorOptions);
        //create stream from tailable cursor
        var stream =tailableCursor.stream();
        console.log('queue is waiting for message ...')
        stream.on('data', next);

    }
    /* 
    * publish a message i.e. insert a message to the capped collection.
    * routingkey : the routingkey of the message 
    * message : message payload. This could be string or any data type or any vald javascript object.
    */
    async publish(routingkey,message){
        let data={};
        data.routingkey=routingkey;
        data.message=message;
      	data._id=new Date().getTime();
        let db= this.client.db(this.option.dbname);
        let result=await db.collection(this.option.name).insertOne(data);
        if(result.result.ok==1){
            console.log('message published to exchange ',this.option.name," with routing  key ",routingkey );
        }
        return result;
    }
   async destroy(){
       if(this.client)
       this.client.close();
   }

}
module.exports=Broker

The code snippet above embodies the comprehensive source code for a module dubbed broker.js, which exports a Broker class.

Initialization

To harness the module, the caller must instantiate a new instance of the Broker class by invoking the static create function. This function accepts an options object as an argument, which must encompass the obligatory attributes: url, dbname, and name. The create method establishes a database connection and creates a capped collection with the same name as the exchange name, defaulting to a maximum size of 1000. However, the caller can override this default size by providing a qsize attribute in the options object passed to the create function.

Message Publishing

The publish method necessitates two mandatory arguments: routingkey and message. Messages are disseminated using the routingkey, which involves inserting a record into the capped collection. The message and routingkey are combined into a JavaScript object to define the payload of the inserted record, allowing for message filtering based on the routingkey during consumption.

Subscription

The subscribe method takes two mandatory arguments: routingkey and a callback function. It sets up a filter using the routingkey and creates a tailable cursor with the filter (line #71). Finally, it establishes a stream on the tailable cursor to create a continuous flow of data as it is inserted into the collection.

Our broker is now primed for use.

Let’s create a publisher and subscriber to exchange messages.

Configuring the Publisher

Assuming we have a broker for the stock market, the publisher can disseminate news from various stock exchanges using the routingkey value, which corresponds to the name of the stock exchange.

//publisher.js
const Broker=require('./broker');
const MONGO_URL='mongodb://localhost:27017?authSource=admin';
let options={
    url:MONGO_URL,
    dbname: "broker",
    name: "StockMarket"
}
Broker.create(options).then(async (broker)=>{
    await broker.publish("BSE","Index gone up by 5 %");
    broker.destroy();
}).catch(e=>{
    console.log('broker creation failed', e)
});

Create the Subscriber

Now let us create a subscriber for BSE stock exchange. The subscriber will subscribe to BSE related news.

//subscriber.js
const Broker=require('./broker');
const MONGO_URL='mongodb://localhost:27017?authSource=admin';
let options={
    url:MONGO_URL,
    dbname: "broker",
    name: "StockMarket"
}
Broker.create(options).then(async (broker)=>{
    broker.subscribe("BSE",(data)=>{
        let datetime=new Date();
        console.log(datetime, " data received from Stockmarket for BSE----->",data.message)
    })
   
}).catch(e=>{
    console.log('broker creation failed', e)
});

Launch Your npm Project Journey

With all the necessary components in place, it's time to create a new npm project and populate it with the required files. To set up a new npm project:

  • Create a new directory called broker and navigate into it.
  • Run npm init to bootstrap the project and provide the necessary information for your package.json file.

using-mongodb-capped-collection-as-messaging-frame_img_1.png

  • After completing these steps, your directory structure should resemble the following:

using-mongodb-capped-collection-as-messaging-frame_img_2.png

  • Next, execute npm install mongodb –save to install the required MongoDB driver package.
  • Before proceeding, ensure that your MongoDB server is up and running. By default, the program assumes that the MongoDB server is running on localhost at port 27017 with no authentication.
    • If your MongoDB URL and configuration differ, update the third line of both the subscriber and publisher files to point to the correct MongoDB instance.
  • Run node publisher to start the publisher, create the database, capped collection, and publish a message related to BSE news.

using-mongodb-capped-collection-as-messaging-frame_img_3.png

  • Open a new terminal and run node subscriber to trigger the subscription creation. Since the message is already present, it will print the message as well.

using-mongodb-capped-collection-as-messaging-frame_img_4.png

Look! The message has appeared in the console. The command is still running, waiting for further messages. Try running the publisher again in the previous console. Not surprisingly, the message appears in the other terminal again. We have successfully established message communication between the publisher and subscriber according to the AMQP direct exchange protocol.

Invert the Execution Sequence

Now, let's invert the order of execution.

  • Drop the capped collection and restart the subscriber. The subscriber will display the message “queue is waiting for message…”.
  • Run the publisher again to push some messages.

This time around, you'll observe that not a single message appears in the subscriber terminal.

It's probable that the stream hasn't been configured correctly, leading to aberrant behavior. I surmise that this might be attributed to a glitch in the Node MongoDB driver. Unless there's pre-existing data in the collection, the tailable cursor fails to initialize properly. As a result, any new data inserted subsequently won't be delivered to the callback function. To bypass this issue, let's devise a workaround.

If the subscription is the inaugural one on the exchange and there's no data, we can initialize the collection with dummy data devoid of a routing key. This data won't be delivered to any subscribers, but it will correctly initialize the cursor. To implement this, I've incorporated the necessary code subsequent to creating the collection in the create method. The revised method will resemble this:

 /*
    * The factory function to create a Broker instance . The option takes following attributes.
    * url : connection string to mongodb instance
    * dbname: database name
    * name:  exchange name
    */

    static async create(option) {
        let client=null;
        try{
            client = await MongoClient.connect(option.url,{useUnifiedTopology: true });
            const db = client.db(option.dbname);
            option.qsize=option.qsize||MAX_QSIZE;
            //creating capped collection if it does not exist
            let exist=await db.listCollections({ name: option.name }).hasNext();
            if(!exist){
                let collection=await db.createCollection(option.name, {capped: true, size: SIZE,max:option.qsize})
                console.log(" Broker  got created with max queue size ",option.qsize);
                await collection.insertOne({ init: true ,_id: new Date().getTime()});
            
                
            }
            //creating the Broker instance
            let broker=new Broker(client,option);
            return broker;
        }catch(e){
            console.log('broker creation failed ',e)
            if(!!client){ 
                //close the connection if creation failed but connection exist
                client.close()
            }
            throw e
        }
        
    }

Now, let's retrace our steps in reverse. This time, the message will resume printing.

Unraveling the Conundrum

But wait, is there anything else left to tackle?

  • Stop the subscriber by pressing Ctrl + C, and then restart it. Surprisingly, it prints two messages that were already consumed last time.

using-mongodb-capped-collection-as-messaging-frame_img_5.png

What's behind this phenomenon? Clearly, the program stopped and lost its memory. The cursor starts from the beginning again. In AMQP, how can we ensure that a consumer doesn't receive the same message again and that its state is sustained over restart?

The solution lies in acknowledgment. In our program, we overlooked this crucial step. The consumer needs to confirm that it has processed the messages. In Kafka, the consumer itself maintains the cursor position. However, in AMQP, the broker remembers it. Let's bring this memory into our broker. The revised architecture will look like this:

using-mongodb-capped-collection-as-messaging-frame_img_6.png

To build an acknowledgment mechanism, there are two possible scenarios: either the consumer handles acknowledgment itself or relies on auto-acknowledgment. Auto-acknowledgment occurs when the message is successfully delivered to the consumer. However, this may not work for all applications, so it's up to the consumer application to decide. To implement acknowledgment, we need a flag in the collection to confirm whether the consumer has received the message or not. The flag's behavior will be as follows:

  • If the flag is initialized to false.
  • If it's auto-acknowledgment, the broker sets this flag to true when it delivers the message to the consumer successfully.
  • If it's not auto-acknowledgment, the broker needs to provide a method for the consumer to set the flag when needed.

For the first two points, we need to modify the subscribe method. Previously, it only took a routing key as input; now, we need a way to pass the auto-acknowledgment flag as well. So, the input to the function is changed to an object with multiple properties instead of a single string. The revised subscribe method will look like this:

Subscription to a Specific Routing Key: A Refined Approach

A substantial overhaul has been implemented in the subscription mechanism, enabling it to pinpoint a specific routing key with precision. This refinement incorporates an additional parameter, processed, which is initialized as false. Consequently, the method will exclusively retrieve messages that have not undergone processing at least once.

The input to the subscribe method has undergone a transformation, thereby necessitating a revision to the subscriber.js file.

/*
 * Subscribe to a specific routing key
 * Suboption: declares the subscription options, i.e., routing key, autoAck
 */
async subscribe(suboption, next) {

    var filter = { routingKey: suboption.routingKey, processed: false };
    // Initializes autoAck default to true if not passed.
    suboption.autoAck = suboption.autoAck || true;
    if ('function' !== typeof next) throw ('Callback function not defined');

    let db = this.client.db(this.option.dbname);
    let collection = await db.collection(this.option.name);

    var cursorOptions = {
        tailable: true,
        awaitData: true,
        numberOfRetries: -1
    };
    const tailableCursor = collection.find(filter, cursorOptions);
    // Create stream from tailable cursor
    var stream = tailableCursor.stream();
    console.log('Queue is waiting for message ...');
    stream.on('data', async (data) => {
        next(data);
        // Set the auto-ack
        if (suboption.autoAck) {
            await collection.updateOne({ _id: data._id }, { $set: { processed: true } });
        }
    });
}

The revised subscriber.js will look like the above code after setting the auto-acknowledgment to true.

Getting Started with Message Queueing

To begin, we’ll create a broker and set up a connection to a MongoDB instance. We’ll use the following code to get started:

const broker = require('./broker');
const MONGO_URL = 'mongodb://localhost:27017?authSource=admin';
let options = {
    url: MONGO_URL,
    dbname: "broker",
    name: "StockMarket"
}
broker.create(options).then(async (broker) => {
    broker.subscribe({ routingKey: "BSE", autoAck: true }, (data) => {
        let datetime = new Date();
        console.log(datetime, " data received from Stockmarket for BSE----->", data.message)
       
    })
   
}).catch(e => {
    console.log('broker creation failed', e)
});

Next, proceed with the following steps:

  • Remove the collection
  • Execute the subscriber
  • Execute the publisher

After completing these steps, you should start noticing messages appearing in the subscriber terminal.

Now, attempt reinitializing the subscriber. This time, you won’t see any duplicate messages, as they were already processed previously.

Building upon this, you can also implement explicit acknowledgment by introducing a separate method in the broker. According to AMQP, a delivery tag is required when messages are delivered to the consumer, and this tag is sent back to acknowledge it later. As a result, the subscribe function needs to be modified again.

Subscribing to a Specific Routing Key

When working with message queues, it’s crucial to subscribe to a specific routing key to receive targeted messages. This process involves declaring subscription options, including the routing key and auto-acknowledgment.

The subscription options are defined by the suboption parameter, which includes the routing key and auto-acknowledgment settings.

async subscribe(suboption, next) {
    ...
}

In this code snippet, we initialize the auto-acknowledgment default to true if it’s not provided. We then create a filter to retrieve unprocessed messages with the specified routing key.

var filter = { routingkey: suboption.routingkey, processed: false };
suboption.autoAck = suboption.autoAck || true;

We also define the cursor options for the database query, including settings for a tailable cursor and retry mechanism.

let cursorOptions = {
    tailable: true,
    awaitdata: true,
    numberOfRetries: -1
};

Once the subscription is set up, we create a stream from the tailable cursor and listen for incoming messages.

const tailableCursor = collection.find(filter, cursorOptions);
var stream = tailableCursor.stream();
console.log('Queue is waiting for messages...');

When a message is received, we add a delivery tag to the message data and pass it to the callback function.

stream.on('data', async (data) => {
    data.deliveryTag = {};
    data.deliveryTag.id = data._id;
    data.deliveryTag.subscriptionName = subscriptionName;
    next(data);
    ...
});

Notably, at line 28, we added the delivery tag to the message data. The consumer is required to send back the same delivery tag to acknowledge the message.

Acknowledgment Method

Our acknowledgment method is designed to work in conjunction with the subscription process.

 /*
    * acknowledge a message delivery 
    */
   async ack(deliveryTag){
    let db=this.client.db(this.option.dbname);
    let collection=await db.collection(deliveryTag.subscriptionName)

    let result=await collection.updateOne(
        { _id:  deliveryTag.id },
        { $set: { processed:true  } },
    );
    if(result.result.ok==1){
        console.log('message acknowledged  ',deliveryTag.id,deliveryTag.subscriptionName)
    }
    
   }

And now the consumer needs to change again to utilize this explicit acknowledgment.

const Broker=require('./broker');
const MONGO_URL='mongodb://localhost:27017?authSource=admin';
let options={
    url:MONGO_URL,
    dbname: "broker",
    name: "StockMarket"
}
Broker.create(options).then(async (broker)=>{
    broker.subscribe({routingkey:"BSE",autoAck:false},(data)=>{
        let datetime=new Date();
        console.log(datetime, " data received from Stockmarket for BSE----->",data.message)
        broker.ack(data.deliveryTag)
    })
   
}).catch(e=>{
    console.log('broker creation failed', e)
});

Thus far, we have successfully established a Direct Exchange utilizing the AMQP 0-9-1 protocol. Now, let's delve into another pivotal concept that warrants attention.

Managing Multiple Consumers

Envision multiple workers from the same application reading from the same queue, processing it concurrently. In this scenario, all workers are interested in the same set of messages, but it's crucial that each message is delivered to only one worker at a time. In reality, workers are selected using a round-robin approach. To achieve this, we need to categorize consumers or workers. In AMQP, we accomplish this by utilizing the queue name. All workers listening to the same queue receive messages in a round-robin fashion.

Until now, we haven't had a means to identify the queue name or subscription. Let's add an additional input to our subscribe method, called subscriptionName. This enables us to segregate message delivery to different worker groups. However, we need to ensure that the same message isn't delivered to more than one worker within the same group. To do this, we require a way to track the status of each delivery and maintain an acknowledgment based on the delivery. The worker receiving the delivery should acknowledge or reject it, and the broker needs to verify if the delivery was successful. For this purpose, we'll introduce a new collection called delivery.

Whenever a message is delivered to a worker, an entry is created in the delivery collection, which retains the following information to identify the message delivery uniquely:

  • msgid: The message ID.
  • _id: The ID of the delivery record.
  • exchange: The name of the exchange.
  • subscriptionName: The name of the subscription.
  • state: This maintains three states:
    • delivered: When the message is delivered to a worker successfully.
    • acknowledged: When the worker acknowledges it.
    • rejected: When the worker rejects it.
  • deliverytimestamp: The timestamp when the delivery was attempted successfully.
  • requeue: If set to true during rejection, the message will be requeued to the exchange again.
  • acktimestamp: The timestamp when the acknowledgment was done.

We're introducing a new subscription method, which will take the following form:

Please let me know if you'd like me to continue with the rest of the text!
/*
    * subscribe to a specific routingkey
    * suboption : declares the subscription options . i.e. routingkey , autoAck
    */
    async subscribe(subscriptionName, suboption,next){

       
        var filter = {routingkey:suboption.routingkey};
        //initializes autoAck default to true if not passed.
        suboption.autoAck=suboption.autoAck||true;
        if('function' !== typeof next) throw('Callback function not defined');

        let db=this.client.db(this.option.dbname)
    
        let collection=db.collection(this.option.name)  
        let deliveryCol=db.collection("delivery");
        let seekCursor=deliveryCol.find({subscriptionName:subscriptionName, exchange:this.option.name}).sort({msgid:-1}).limit(1);
        let lastdelivery=await seekCursor.next();
        if(lastdelivery!=null){
            //Pick up the id of the last message and add it as filter.
            console.log('lastdelivery ',lastdelivery.msgid)
            filter['_id']={ $gt: lastdelivery.msgid};
        }
        var cursorOptions = {
                    tailable: true,
                    awaitdata: true,
                    numberOfRetries: -1
        };
        const tailableCursor = collection.find(filter, cursorOptions);
        //create stream from tailable cursor
        var stream =tailableCursor.stream();
        console.log('queue is waiting for message ...')
        stream.on('data', async (data)=>{
            
            // build delivery record
            data.deliveryTag={};
            data.deliveryTag.msgid=data._id; 
            //build the id of the record using exchange name, subscription name and _id of the message.
            //This is to ensure that one message is delivered to one consumer of the subscription.
            data.deliveryTag._id=this.option.name+"#"+subscriptionName+"#"+data._id; 
            data.deliveryTag.exchange=this.option.name
            data.deliveryTag.subscriptionName=subscriptionName;
            data.deliveryTag.deliverytimestamp=new Date().getTime();
            data.deliveryTag.requeue=false;
            data.deliveryTag.acktimestamp=null;
           
            if(suboption.autoAck){
                    
                //set the auto-ack
                data.deliveryTag.acktimestamp=new Date().getTime();
                data.deliveryTag.state='acknowledged';
            }else{
                data.deliveryTag.state='delivered'
            }
            //check if the msg is already delivered for the same subscription.
           
            let delivered=await deliveryCol.find(
                {
                _id: data.deliveryTag._id
                }).hasNext();
            if(!delivered){
                //insert the delivery record.
                deliveryCol.insertOne(data.deliveryTag,(err,result)=>{
                    if(!!err){
                        console.log("already delivered to other worker")
                    }else if(result.result.ok==1){
                        //only in case of successful insertion
                        next(data);
                        
                    }
                });
            }
            
        });

    }

A significant enhancement in our latest subscription method is the elimination of the processed flag for confirmation. Instead, we have introduced a seek cursor to identify the last delivery attempt for a subscription, enabling us to pinpoint messages that arrived subsequent to the last delivery. The unique identifier of each message is generated based on the current timestamp. To locate messages after the last delivery, we need to identify those with an _id value exceeding the msgid stored in the delivery.

When automatic acknowledgment is enabled, a message is deemed acknowledged immediately upon delivery completion. However, if automatic acknowledgment is disabled, the worker must callback the broker with the same delivery tag information. If the worker successfully processes the message, it will acknowledge it; otherwise, it will reject the message. Upon rejection, the worker may set a flag to requeue the message. If this flag is set, the message will be reinserted into the exchange collection. By default, this flag is set to false. To accommodate this, I added two new functions to the Broker class.

 /*
    * acknowledge a message delivery 
    */
   async ack(deliveryTag){
    let db=this.client.db(this.option.dbname);
    let collection=db.collection('delivery');

    let result=await collection.updateOne(
        {
            msgid: deliveryTag.msgid,
            exchange: this.option.name
        },
        {
            $set: {
                acktimestamp: new Date().getTime(),
                state: 'acknowledged'
            }
        }
    );
    if(result.result.ok==1){
        console.log('message acknowledged  ',deliveryTag.msgid,deliveryTag.subscriptionName)
    }
    
   }
   /*
    * reject a message delivery 
    */
   async nack(deliveryTag){
    let db=this.client.db(this.option.dbname);
    let collection=db.collection('delivery')

    let result=await collection.updateOne(
        { 
            msgid: deliveryTag.msgid, 
            exchange: this.option.name 
        },
        {
            $set: {
                acktimestamp: new Date().getTime(),
                state: 'rejected'
            }
        }
    );
    if(result.result.ok==1){
        console.log('message rejected  ',deliveryTag.msgid,deliveryTag.subscriptionName)
        if(deliveryTag.requeue){
            let exchange=db.collection(this.option.name);
            let msg=await exchange.find({_id:deliveryTag.msgid}).next();
            msg._id=new Date().getTime();
            exchange.insertOne(msg);
        }
    }
    
   }

Our revised architectural framework is outlined below.

using-mongodb-capped-collection-as-messaging-frame_img_7.png

A Retrospective of Our Journey Thus Far

What key takeaways have we gleaned so far?

We embarked on a simple yet intriguing idea: harnessing a unique MongoDB feature to cater to our messaging requirements. Initially, it seemed promising, but as we delved deeper into the use cases, we encountered more intricacies. We’ve only implemented one type of exchange, and it’s clear that we need to push the boundaries further to match other AMQP implementations available in the market. We’ve only scratched the surface, implementing basic functionalities. In the process, we’ve revised our initial architecture three times. Let’s summarize our progress:

  • A direct exchange protocol
  • Acknowledgement of messages
  • Segregation of message delivery based on subscription channel
  • Requeueing of messages
  • Durability of messages

However, these features are part of the client library, not a central service. The library provides reusable code, but it runs within the client runtime. As a result, managing these features falls to individual clients. As I mentioned earlier, these functionalities could be provided as a standalone service, a separate microservice. In this approach, the central service would manage internal resources and functionalities, freeing subscribers and publishers from added responsibilities. The revised architecture would look like this:

using-mongodb-capped-collection-as-messaging-frame_img_8.png

Now, let’s examine some key differences:

  • In our design, queues are logical and exclusive, existing only while connections are open. There are no physical queues, and thus no durability concerns. This approach eliminates queue maintenance overhead while achieving message segregation and delivery to target recipients.
  • Messages are always durable, backed by a database. In messaging systems, durability is a feature, but not a requirement. However, it comes at the cost of speed.
  • Since the exchange is implemented using a capped collection, we cannot delete messages, and the only way to control message lifetime is by the max number of messages or the total size of the capped collection. There is no TTL set on messages, so messages will be delivered to all consumers who join late as long as the messages are present.
    • We could enable TTL by implementing soft deletes after a certain time, but this would require additional overhead of managing soft deletes. The question is, who would manage this? It’s not ideal to leave it to subscribers or publishers, so a standalone microservice implementation is necessary.
  • In our design, we’ve introduced a delivery collection to maintain delivery state. Records in this collection must exist longer than the message lifetime in the exchange. Over time, data will grow, and maintenance overhead may be required to clean up periodically.
  • In this design, there is no need for a dead letter queue, as subscribers pull messages when they come online and establish a stable connection. This implementation is more similar to the Kafka protocol. However, if we run the broker as a separate microservice, the concept of dead letter queue may become relevant again, as the client runtime may not always be available to deliver messages.

Revisiting the fundamental question, under what circumstances does it make sense to develop a custom messaging framework rather than utilizing established market solutions? Are there any supplementary advantages to creating such a framework?

In my opinion, every messaging service or framework incurs infrastructure and maintenance expenses. If your application's messaging requirements are confined to a narrow set of use cases that adhere to a specific pattern, and you are already utilizing MongoDB, then exploring this feature may be worthwhile. Conversely, if you require support for diverse message routing and multiple use cases, which are typically supported by standard messaging frameworks, it is more prudent to adopt an existing solution rather than investing time and resources into rebuilding it.

The complete source code for the examples discussed above is available in the https://github.com/arijitr/amqp-on-mongo project.

In forthcoming articles in this series, we will explore the development of additional patterns and exchange types in greater depth.