Skip to main content
Skip table of contents

Send and receive messages with AWS SQS

This guide explains how to integrate AWS SQS with a qibb flow using the function node and the official AWS SDK (https://www.npmjs.com/package/@aws-sdk/client-sqs ) imported as external module.

qibb_flow_aws_sqs.png

Using the function node and AWS SDK

The examples will use the function node to integrate with AWS SQS using the AWS SDK. As a prerequisite, according function nodes should have the @aws-sdk/client-sqs module installed and imported as awsSdkClientSqs.

module_properties_in_function_node.png

Module in “Setup” tab of function node

In addition, the function node is expecting to have the following secrets stored in Space Secrets:

  • AWS_SQS_QUEUE_URL

  • AWS_SQS_API_KEY

  • AWS_SQS_API_SECRET

  • AWS_SQS_QUEUE_REGION


Sending SQS Messages

This example demonstrates how to send a message to an AWS SQS queue.

send_sqs.png

Function Node Code Example for “Send SQS message”

Function

Code

“On Start” Tab

Handles configuration of client on whenever the flow starts or restarts.

JS
// Import required classes from the AWS SDK v3 package
const { SQSClient } = awsSdkClientSqs;

// Group configuration values in an object
this.config = {
    queueUrl: global.get("SECRETS.AWS_SQS_QUEUE_URL"),
    accessKey: global.get("SECRETS.AWS_SQS_API_KEY"),
    accessSecret: global.get("SECRETS.AWS_SQS_API_SECRET"),
    region: global.get("SECRETS.AWS_SQS_QUEUE_REGION")
};

// Create an SQS client and store it as a property of the node (this.client)
this.client = new SQSClient({
    region: this.config.region,
    apiVersion: '2012-11-05',
    credentials: {
        accessKeyId: this.config.accessKey,
        secretAccessKey: this.config.accessSecret
    }
});

// Define a helper function for safely stringifying the msg (avoiding circular references)
this.removeCircular = function () {
    const seen = new WeakSet();
    return (key, value) => {
        if (typeof value === 'object' && value !== null) {
            if (seen.has(value)) {
                return undefined;
            }
            seen.add(value);
        }
        return value;
    };
};

“On Message” Tab

Uses the client to send message and passes response to output.

JS
// Import the SendMessageCommand from the AWS SDK
const { SendMessageCommand } = awsSdkClientSqs;

// Retrieve the client and config from the node context (varaibles defined in "On Start" tab)
const client = this.client;
const config = this.config;
// Define the parameters for sending a message
const params = {
    MessageBody: JSON.stringify(msg, this.removeCircular()),
    QueueUrl: config.queueUrl
};
// Create the command for sending a message
const command = new SendMessageCommand(params);

async function sendMessage(){
    try {
        // Send the message and wait for the response
        const data = await client.send(command);
        node.log("Message sent successfully. Message ID: " + data.MessageId);

        // Attach the response to msg.payload and send the message
        msg.payload = data;
        node.send(msg);
    } catch (error) {
        node.error("Error sending message: " + error);
    }
};


// Execute the sendMessage function asynchronously
sendMessage();

// Return null since output is handled asynchronously via node.send() method
return null;


Receiving SQS Messages

This example shows how to retrieve messages from an AWS SQS queue and immediately remove them to prevent reprocessing. To continuously poll for messages, the function node should be triggered at regular intervals, for instance, by using an inject node that runs repeatedly.

receive_sqs.png

Function Node Code Example for “Receive SQS message”

Function

Code

“On Start” Tab

Handles configuration of client on whenever the flow starts or restarts.

JS
// Import required classes from the AWS SDK v3 package
const { SQSClient } = awsSdkClientSqs;

// Group configuration values in an object
this.config = {
    queueUrl: global.get("SECRETS.AWS_SQS_QUEUE_URL"),
    accessKey: global.get("SECRETS.AWS_SQS_API_KEY"),
    accessSecret: global.get("SECRETS.AWS_SQS_API_SECRET"),
    region: global.get("SECRETS.AWS_SQS_QUEUE_REGION")
};

// Create an SQS client and store it as a property of the node (this.client)
this.client = new SQSClient({
    region: this.config.region,
    apiVersion: '2012-11-05',
    credentials: {
        accessKeyId: this.config.accessKey,
        secretAccessKey: this.config.accessSecret
    }
});

node.log("SQS client initialized for receiving messages.");

“On Message” Tab

  1. Uses the stored client to receive messages from the SQS queue with long polling enabled.

  2. For each received message, attempts to parse the message body (if it is JSON), forwards the message, and then deletes it from the queue to prevent duplicate processing.

JS
// Import required commands from the AWS SDK v3 package
const { ReceiveMessageCommand } = awsSdkClientSqs;

// Retrieve the client and config from the node context (set in "On Start")
const client = this.client;
const config = this.config;

// Define the parameters for receiving messages
const params = {
    QueueUrl: config.queueUrl,        // Your SQS queue's URL
    MaxNumberOfMessages: 10,            // Maximum messages to receive (max 10)
    WaitTimeSeconds: 10                 // Enable long polling (in seconds)
};

async function receiveMessages() {
    try {
        // Create the command for receiving messages
        const command = new ReceiveMessageCommand(params);

        // Send the command and wait for the response
        const data = await client.send(command);

        if (data.Messages && data.Messages.length > 0) {
            for (const message of data.Messages) {
                // Safely parse the message body (if JSON, otherwise leave it as a string)
                message.Body = safeJSONParse(message.Body);

                // Process the message by sending it along the flow
                node.send(message);

                // Delete the message from the queue to prevent reprocessing
                await deleteMessageFromQueue(message, client, config);
            }
        }
    } catch (error) {
        node.error("Error receiving messages: " + error);
    }
}

// Helper function to safely parse JSON; returns original string if parsing fails
function safeJSONParse(input) {
    try {
        return JSON.parse(input);
    } catch (err) {
        return input;
    }
}

// Helper function to delete a message from the SQS queue
async function deleteMessageFromQueue(message, client, config) {
    try {
        const { DeleteMessageCommand } = awsSdkClientSqs;
        const deleteParams = {
            QueueUrl: config.queueUrl,
            ReceiptHandle: message.ReceiptHandle
        };
        const deleteCommand = new DeleteMessageCommand(deleteParams);
        await client.send(deleteCommand);
    } catch (deleteError) {
        node.error("Error deleting message: " + deleteError);
    }
}

// Execute the receiveMessages function
receiveMessages();

// Return null since output is handled asynchronously via node.send()
return null;


JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.