VM Connector and Queue Operations in MuleSoft
2/11/2025
VM Connector and Queue Operations in MuleSoft
The VM Connector is a powerful tool in MuleSoft that facilitates both intra-app (within the same application) and inter-app (between applications) communication using asynchronous queues. These queues can be either Transient or Persistent, each catering to different use cases.
Types of VM Queues
1. Transient Queues
o Characteristics:
§ Volatile in nature.
§ Data is lost in case of a system crash or restart.
o Performance: Faster compared to Persistent queues.
o Use Case: Suitable for scenarios where speed is a priority, and data loss is acceptable.
2. Persistent Queues
o Characteristics:
§ Data is stored persistently, ensuring recovery in case of a crash or restart.
o Performance: Slower than Transient queues due to persistence overhead.
o Use Case: Ideal for scenarios requiring reliability and data retention.
Common Use Cases for the VM Connector
· Message Routing: Transferring messages between different flows within an application.
· Load Balancing: Distributing workloads across a cluster for efficient resource utilization.
· Inter-App Communication: Facilitating communication between apps in the same Mule domain.
· Queue Management: Simple queueing mechanisms for asynchronous processing.
VM Connector Operations
1. Publish
o Adds messages to a queue.
o Typically used to enqueue data for processing in another flow.
2. Consume
o Retrieves messages from a queue.
o If no message is available, it waits for a specified timeout period. If no message is received within this time, a VM:QUEUE_TIMEOUT error is thrown.
3. Listener
o Continuously listens to a queue and picks up messages as they arrive.
4. Publish-Consume
o Publishes a message and waits for a response from the consumer.
o Includes a timeout parameter to handle delays, throwing a VM:QUEUE_TIMEOUT error if no response is received within the specified time.
Implementation Guide
Let's try to understand these operations and how VM queues work with a demo.
1. Publish the Message Into the VM Queue and Listen to the Message From the Queue
We will design Mule flows as depicted in the image below. The Publish-Flow handles publishing messages to the VM queue. It begins with an HTTP Listener to receive messages by triggering the HTTP endpoint. The received message is logged, followed by a transformation step where a new field named "Time" is added to include the current timestamp. The message is then published to the queue using the VM Publish operation, and finally, the published message is logged.
In the Persistent-Queue-Flow, a VM Listener serves as the source, continuously monitoring the VM queue and retrieving any available messages. Once a message is picked, it is logged for further verification.
Let’s explore the VM Queue configuration by dragging and dropping the VM Publish component from the Mule Palette into a Mule flow to set it up. Initially, an error will appear because the configuration is not yet defined. To resolve this, we’ll create a VM configuration by clicking the Plus (+) icon in the 'Connector Configuration' section, as shown in the image below.

Within the VM Connector configuration, navigate to the General section and select Edit inline from the drop-down menu for the Queue field. Next, click on the Plus (+) icon, as illustrated in the image below.
Upon clicking, a pop-up window will appear, as shown in the image below. This window displays three fields: Queue Name, Queue Type (Transient or Persistent), and Max Outstanding Messages. Enter the appropriate values in these fields and click Finish to complete the setup.
We can see our newly created Pqueue1 is visible in the Queue section, as shown in the below image.

To create additional queues, simply click the Plus (+) icon again and add the desired queues. These queues will then appear in the Queue section of the VM Configuration.
Once the connector configuration is complete, the queue names will become available as options in the drop-down menu within the Publish operation connector. For this setup, we will select the queue Pqueue1.

In the transform message component, we have written the below dataweave to add a timestamp:
Java


In the Listener flow below, we set the configuration:

Now, let's startup the API in debug mode and try to trigger the request from Postman and see how the VM queue is working.
The request triggered by Postman is shown below:

We can see the same message in our Publish flow; notice that we have only one field named "Message" in the payload.
After executing the Transform Message component, we have added one more field in the payload named "Time."
Now we can publish it. Once we process the Publish and Logger components of the Publish flow, we will see the same message is being picked up by the VM listener.
We can get a more clear view of this by looking at the Mule logs below:


In this way, we can send messages to the VM queue and consume the same message from the queue.
2. The Publish Consume Operation
When using the Publish Consume operation, the published message waits for a specified duration to receive a response from the queue.
For the Publish Consume operation demonstration, we replace the Publish component in our Publish flow with the Publish Consume component. A VM Listener is then used to retrieve messages from the queue.
.
The message is published to the queue Pqueue1, with a timeout duration set to 15 seconds. This means the Publish Consume operation waits for up to 15 seconds to receive a response from the queue. If no response is received within this timeframe, a QUEUE_TIMEOUT error is thrown.
The configuration for the VM Listener is shown below. It includes a section labeled Response, which has a default value of #[payload]. This setting is used to notify the Publish Consume operation that the message has been successfully retrieved from the queue.
Let's run the application to trigger the same sample message from Postman and see how it works:


Here's the rephrased version:
Upon examining the console logs in detail, we can observe that the first log entry corresponds to the message received from the HTTP listener in the publish flow. Following this, the logs for the Persistent-Queue-Flow are displayed, showing the message that was received. The final log entry indicates the processing of the publish consume operation within the publish flow.
3. The Consume Operation
For the consume operation, we will replace the publish-consume component with the publish component in the Publish flow. Additionally, we will create a new flow called the consume-flow, as depicted in the image below. This flow includes a scheduler set to run every 15 seconds.
Alongside this, we have the VM consume operation, which retrieves the message from the queue. In the logger, we display the message that was consumed from the queue.
In the consume operation, the configuration is set up as shown in the image below. We use the same VM configuration that was created for the publishing operation, and we are working with Pqueue1. The timeout is configured to 20 seconds, meaning that once the flow is triggered, the consume operation will wait for up to 20 seconds to receive a message. If no message is received within this period, the VM:EMPTY_QUEUE error will be triggered.

Note: We have removed the persistent-queue-flow, as the VM Listener is pointing to the same queue, Pqueue1.
Now, let’s run the app and trigger a request from Postman to see how it works.
When we sent the same message from Postman, it was received by the publish flow and logged accordingly. After adding the time field, the message was published to the queue. The logs show that the published message included the time field.
The consumer then picks up the same message, as indicated in the logs. After the defined 20- second timeout, the consume operation throws a VM:EMPTY_QUEUE error, as no other message was available in the queue to be processed.

Element : consume-Flow/processors/0 @ vm-demo-app:vm-demo-app.xml:55 (Consume)

We said that Persistent Queue holds the data even in case of a restart or crash. Let’s test this hypothesis. For this, make a small alteration to the existing Mule flow by changing the scheduler frequency to 60 seconds.

After implementing these changes and running the application, the scheduler initiates, and the consume operation waits for 20 seconds to retrieve a message from the queue. If no message is found within this time, an error is thrown. Once a minute passes from the start time, the scheduler restarts the consume operation and repeats the process, waiting for a message in the queue.
In this scenario, we wait for the timeout to occur before sending a message via Postman. Immediately after publishing the message, we stop the application. Upon restarting the application, we will verify whether the VM consumer retrieves the previously published message.
Based on the logs, it is evident that the application started and initiated the consume flow. However, since no messages were available in the queue within the configured timeout, an
EMPTY_QUEUE error was logged. Subsequently, we sent a message using Postman, which was successfully published to the queue along with a timestamp, as reflected in the logs. After publishing the message, the application was stopped.
Upon restarting the application five minutes later, the scheduler reactivated the consume flow, which retrieved the message published before the shutdown. This was confirmed by examining the time field in the logs, which matched the timestamp of the previously published message.

Therefore, Persistent queues ensure data integrity, unlike Transient queues, where data is lost upon restart.
Key Takeaways
· Transient Queues prioritize speed, suitable for non-critical data processing.
· Persistent Queues ensure reliability, suitable for critical workflows requiring data retention.
· VM Connector supports diverse operations like Publish, Consume, Listener, and Publish-Consume, enabling flexible message handling.
With these capabilities, the VM Connector is an essential component for asynchronous communication in MuleSoft applications.