Working with queues is quite a neat feature. It used to be Commercial only, but since release 2.3 this is also available for the Open Source version of Magento 2.
Now i hear you thinking: why would i need a queue? Well, to execute actions in the background. This has a few advantages:
Expensive operations are executed in a separate process, allowing to execute the current process quickly.
When an action fails, for example a api call, you can retry this.
When the action fails the enduser won’t notice this as it executes in the background.
You can move the processing to an external server if needed.
Now the downside is that timing can be hard. If you need the data right now you can’t use queues.
Big frameworks like Laravel and Symfony come with support for queues out of the box. If you want to get familliar with them i can recommend to play with them as it is easier then with Magento.
Prerequisites
I have tested this dispatching using RabbitMQ. For this i have installed RabbitMQ locally and added this to my env.php:
'queue' => [ 'amqp' => [ 'host' => 'localhost', 'port' => '5672', 'user' => 'rabbitmquser', 'password' => 'rabbitmqpassword', 'virtualhost' => 'rabbitmqvirtualhost', 'ssl' => false ] ],
There is also an options to use MySql to run the queue, but i have no experience with that.
Hey, I'm running a developer focussed newsletter called MageDispatch.com, which is for the community and by the community. Here you can share links that you think that the community should know about. If you like this kind of technical content you will like this newsletter too.
Dispatching tasks
When i needed to implement a queue driven task, i started to Google around and found this and this blogs. They basically boil down to a set of 5 XML files to configure everything. After some digging around in the Magento code, i bumped at this method:
\Magento\AsynchronousOperations\Model\MassSchedule::publishMass
You need to feed it two things:
A topic name.
An entities array
Let's see how you should use this:
use Magento\AsynchronousOperations\Model\ConfigInterface; use Magento\AsynchronousOperations\Model\MassSchedule; use Magento\Framework\Event\Observer; use Magento\Framework\Event\ObserverInterface; use Magento\Sales\Api\Data\OrderInterface; class PublishExpensiveOperationTask implements ObserverInterface { /** * @var MassSchedule */ private $massSchedule; /** * @var ConfigInterface */ private $asyncConfig; public function __construct( MassSchedule $massSchedule, ConfigInterface $asyncConfig ) { $this->massSchedule = $massSchedule; $this->asyncConfig = $asyncConfig; } public function execute(Observer $observer) { /** @var OrderInterface $order */ $order = $observer->getData('order'); $topicName = $this->asyncConfig->getTopicName('/V1/expensive-operation/calculate', 'POST'); $this->massSchedule->publishMass($topicName, [[(int)$order->getCustomerId()]]); } }
The topic name is just an API endpoint which will be called. You need to pass in the endpoint url, plus the type of request you want to make (GET, POST, etc). The entities array is an array with arrays that contains an ID. That's why it's wrapped in two [[ $id ]]
.
Your webapi.xml
would look something like this:
<?xml version="1.0"?> <routes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Webapi:etc/webapi.xsd"> <route url="/V1/expensive-operation/calculate" method="POST"> <service class="<VENDOR>\<MODULE>\Api\ExpensiveOperationInterface" method="calculate"/> <resources> <resource ref="Admin"/> </resources> </route> </routes>
Off cource you would need to declare the implementation of the ExpensiveOperationInterface
in your di.xml.
Running the consumer
Now jobs are placed in the queue, but how do you make sure they are executed? This is where the consumer comes in. This will pick up the jobs and execute them one by one. Just run this simple command:
bin/magento queue:consumers:start async.operations.all
Only want to run a limited amount of jobs to debug your script? Call the command with --max-messages=1
and it will stop when it has processed exactly 1 job.
How to make sure this process keeps running?
Well, that is the tricky part. You can start this command, but if it for some reason fails your queue grinds to a halt and thing will go bad quickly. There is a simple solution for that: Use Supervisor. Supervisor is a tool that is widely use to watch specific programs and make sure they keep running. If it fails for whatever reason it will start the process again for your. There is also a bonus: Supervisor can start the same process multiple time, to increase the queue throughput. A typical Supervisor configuration would look like something like this:
[program:consumers-start] process_name=%(program_name)s_%(process_num)02d command=/usr/bin/php /home/magento/current/bin/magento queue:consumers:start async.operations.all autostart=true autorestart=true user=YOUR_USER numprocs=8 redirect_stderr=true stdout_logfile=/home/magento/current/var/log/worker.log
After this load the Supervisor configuration and start the process:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start consumers-start:*
Other ways to leverage the queue
I've used this way because it's relatively easy to use the queue. But Magento offers other ways too. There is a pretty extensive tutorial on this subject from Inviqa, which you can find here. I'm pretty sure that method is better if you have a lot of different background jobs, but that method requires you to create a few xml files and a few classes before you can use a queue.
That's why i like the method described above, it's relatively simple to get a queue up and running quickly.
Want to respond?