This guide is also available as a flow, which you directly import into the Flow Editor:
flows_qibb_aws_sqs.json
Using the function node and AWS SDK
The examples will use the function node to integrate with AWS SQS using the AWS SDK. As a prerequisite, according function nodes should have the @aws-sdk/client-sqs module installed and imported as awsSdkClientSqs.
Module in “Setup” tab of function node
In addition, the function node is expecting to have the following secrets stored in Space Secrets:
AWS_SQS_QUEUE_URL
AWS_SQS_API_KEY
AWS_SQS_API_SECRET
AWS_SQS_QUEUE_REGION
Sending SQS Messages
This example demonstrates how to send a message to an AWS SQS queue.
Function Node Code Example for “Send SQS message”
Function
Code
“On Start” Tab
Handles configuration of client on whenever the flow starts or restarts.
JS
// Import required classes from the AWS SDK v3 package
const { SQSClient } = awsSdkClientSqs;
// Group configuration values in an object
this.config = {
queueUrl: global.get("SECRETS.AWS_SQS_QUEUE_URL"),
accessKey: global.get("SECRETS.AWS_SQS_API_KEY"),
accessSecret: global.get("SECRETS.AWS_SQS_API_SECRET"),
region: global.get("SECRETS.AWS_SQS_QUEUE_REGION")
};
// Create an SQS client and store it as a property of the node (this.client)
this.client = new SQSClient({
region: this.config.region,
apiVersion: '2012-11-05',
credentials: {
accessKeyId: this.config.accessKey,
secretAccessKey: this.config.accessSecret
}
});
// Define a helper function for safely stringifying the msg (avoiding circular references)
this.removeCircular = function () {
const seen = new WeakSet();
return (key, value) => {
if (typeof value === 'object' && value !== null) {
if (seen.has(value)) {
return undefined;
}
seen.add(value);
}
return value;
};
};
“On Message” Tab
Uses the client to send message and passes response to output.
JS
// Import the SendMessageCommand from the AWS SDK
const { SendMessageCommand } = awsSdkClientSqs;
// Retrieve the client and config from the node context (varaibles defined in "On Start" tab)
const client = this.client;
const config = this.config;
// Define the parameters for sending a message
const params = {
MessageBody: JSON.stringify(msg, this.removeCircular()),
QueueUrl: config.queueUrl
};
// Create the command for sending a message
const command = new SendMessageCommand(params);
async function sendMessage(){
try {
// Send the message and wait for the response
const data = await client.send(command);
node.log("Message sent successfully. Message ID: " + data.MessageId);
// Attach the response to msg.payload and send the message
msg.payload = data;
node.send(msg);
} catch (error) {
node.error("Error sending message: " + error);
}
};
// Execute the sendMessage function asynchronously
sendMessage();
// Return null since output is handled asynchronously via node.send() method
return null;
Receiving SQS Messages
This example shows how to retrieve messages from an AWS SQS queue and immediately remove them to prevent reprocessing. To continuously poll for messages, the function node should be triggered at regular intervals, for instance, by using an inject node that runs repeatedly.
Function Node Code Example for “Receive SQS message”
Function
Code
“On Start”Tab
Handles configuration of client on whenever the flow starts or restarts.
JS
// Import required classes from the AWS SDK v3 package
const { SQSClient } = awsSdkClientSqs;
// Group configuration values in an object
this.config = {
queueUrl: global.get("SECRETS.AWS_SQS_QUEUE_URL"),
accessKey: global.get("SECRETS.AWS_SQS_API_KEY"),
accessSecret: global.get("SECRETS.AWS_SQS_API_SECRET"),
region: global.get("SECRETS.AWS_SQS_QUEUE_REGION")
};
// Create an SQS client and store it as a property of the node (this.client)
this.client = new SQSClient({
region: this.config.region,
apiVersion: '2012-11-05',
credentials: {
accessKeyId: this.config.accessKey,
secretAccessKey: this.config.accessSecret
}
});
node.log("SQS client initialized for receiving messages.");
“On Message”Tab
Uses the stored client to receive messages from the SQS queue with long polling enabled.
For each received message, attempts to parse the message body (if it is JSON), forwards the message, and then deletes it from the queue to prevent duplicate processing.
JS
// Import required commands from the AWS SDK v3 package
const { ReceiveMessageCommand } = awsSdkClientSqs;
// Retrieve the client and config from the node context (set in "On Start")
const client = this.client;
const config = this.config;
// Define the parameters for receiving messages
const params = {
QueueUrl: config.queueUrl, // Your SQS queue's URL
MaxNumberOfMessages: 10, // Maximum messages to receive (max 10)
WaitTimeSeconds: 10 // Enable long polling (in seconds)
};
async function receiveMessages() {
try {
// Create the command for receiving messages
const command = new ReceiveMessageCommand(params);
// Send the command and wait for the response
const data = await client.send(command);
if (data.Messages && data.Messages.length > 0) {
for (const message of data.Messages) {
// Safely parse the message body (if JSON, otherwise leave it as a string)
message.Body = safeJSONParse(message.Body);
// Process the message by sending it along the flow
node.send(message);
// Delete the message from the queue to prevent reprocessing
await deleteMessageFromQueue(message, client, config);
}
}
} catch (error) {
node.error("Error receiving messages: " + error);
}
}
// Helper function to safely parse JSON; returns original string if parsing fails
function safeJSONParse(input) {
try {
return JSON.parse(input);
} catch (err) {
return input;
}
}
// Helper function to delete a message from the SQS queue
async function deleteMessageFromQueue(message, client, config) {
try {
const { DeleteMessageCommand } = awsSdkClientSqs;
const deleteParams = {
QueueUrl: config.queueUrl,
ReceiptHandle: message.ReceiptHandle
};
const deleteCommand = new DeleteMessageCommand(deleteParams);
await client.send(deleteCommand);
} catch (deleteError) {
node.error("Error deleting message: " + deleteError);
}
}
// Execute the receiveMessages function
receiveMessages();
// Return null since output is handled asynchronously via node.send()
return null;
JavaScript errors detected
Please note, these errors can depend on your browser setup.
If this problem persists, please contact our support.