Message queue in php

Implementing an asynchronous message queue in PHP with React

A fun exercise in programming. Implementing a asynchronous, non-blocking message queue in PHP.

Unlike most other programming and scripting languages PHP does not support threads (there is a third-party library called pthreads) and threads would be required to implement a truly asynchronous, non-blocking message queue. Challenge accepted.

In this article I am going to explain how you can implement a message queue in PHP. I do this mostly for fun and pleasure, because in most production scenarios you probably want to use a real message queue like RabbitMQ.

The message queue typically consists of a server that accepts messages. These messages are sent by a producer and are received by a consumer.

The server

The server is a program that runs in the background and listens to a specific port. Whenever a message arrives at the given port it invokes a consumer with the given message.

Running in the background, listening to a port, reacting when a message arrives. That sounds like a job for ReactPHP. In our case we only need the socket server of React and fortunately there is a subtree split of reacts socket component.

Читайте также:  Python sqlite select all

If you are using Composer (if not you really should) you can add react/socket to your project.

// composer.json  "require":  "react/socket": "0.3.*" > >

With React we can implement a simple server that listens to a socket and executes code whenever new data arrives at the socket.

// server.php require_once __DIR__.'/vendor/autoload.php'; // Create loop and socket $loop = React\EventLoop\Factory::create(); $socket = new React\Socket\Server($loop); // New connection is established $socket->on('connection', function (\React\Socket\ConnectionInterface $conn)  // New data arrives at the socket $conn->on('data', function ($data) use ($conn)  // TODO: Handle message echo "$data"; // Close the connection when we consumed the message $conn->close(); >); >); // The socket should listen to port 4000 $socket->listen(4000); $loop->run();

In the above script I added a simple echo to output the received data. We could now run php server.php to start our server and send messages to it by connecting to it via telnet localhost 4000 .

React server that echoes messages

Let’s assume that our message queue receives messages and needs to perform some time-consuming task with them. We will simulate this with the following code.

function consume($data, \React\Socket\ConnectionInterface $conn)  for ($i = 0; $i  5; $i++)  echo sprintf("%s: Do something with %s\n", date('H:i:s'), $data); sleep(1); > >

When we run two telnet simultaneously we will receive the following output:

19:18:07: Do something with foo 19:18:08: Do something with foo 19:18:09: Do something with foo 19:18:10: Do something with foo 19:18:11: Do something with foo 19:18:12: Do something with bar 19:18:13: Do something with bar 19:18:14: Do something with bar 19:18:15: Do something with bar 19:18:16: Do something with bar

Client 1 has an open connection to the server for five seconds and client 2 has an open connection to the server for ten seconds. We want that the clients transfer a message to the server and immediately close the connection while the server accepts new messages and works on those tasks in the background.

Now is a good time to remember the last posting I wrote here: Running background processes in PHP.

We can use BcBackgroundProcess to create a background process and consume the message there.

When using the factory provided by BcBackgroundProcess it is quite easy to create a new process and execute it in the background.

// server.php . $processFactory = new \Bc\BackgroundProcess\Factory('\Bc\BackgroundProcess\BackgroundProcess'); $socket->on('connection', function (\React\Socket\ConnectionInterface $conn) use ($processFactory) < $conn->on('data', function ($data) use ($conn, $processFactory) < $command = sprintf('php consumer.php "%s"', addslashes($data)); $processFactory->newProcess($command)->run(); $conn->close(); >); >); . 

The command executed in the above code is php consumer.php with the message as first (and only) argument.

The consumer

The consumer is another script which is executed by the server as background process. Its first and only argument is the message. The biggest disadvantage of executing the code in a background process (compared to what we could do if PHP would support threads) is that we can no longer communicate with the server and thus can’t output anything there. Instead we will write log messages to a file.

// consumer.php function consume($message, $filename) < for ($i = 0; $i < 5; $i++) < $data = sprintf("%s: Do something with %s\n", date('H:i:s'), $message); file_put_contents($filename, $data, FILE_APPEND); sleep(1); >> $message = stripslashes($_SERVER['argv'][1]); consume($message, "message.log");

Ok, let’s try this out. In my experiment I will open four Terminal windows in parallel. The first one will run the server, the second one will watch the message.log log (with tail -f ) and the third and forth will be used to write to the message server.

Screenshot of a Terminal window running server.php, reading the log file and running two clients

You can see that both messages are consumed in parallel and that the client is started and closes within a second.

The producer

In reality you will probably never send messages to a MQ server using the telnet command line utility, but rather send messages from another script.

When you are using PHP it is extremely simple to write a message to a socket.

// produce.php function produce($message) < $fp = @stream_socket_client('tcp://localhost:4000', $errno, $errstr, 30); if ($fp) < fwrite($fp, $message); fclose($fp); >> produce("Hello World!");

Use Cases

As mentioned above this code is not really useful in a production environment. However, I use such a message queue to send real time notifications from a task that runs once a week for a few hours. The task is used only internal and not critical and therefore it would be a huge overkill to install and maintain a real message queue.

BcMq

I created a library from the code described in this article. The principles are the same, but it has a nicer architecture and is tested. You can find it on Github: BcMq.

BcMqBundle

If you want to use the code in a Symfony2 application I made things even easier by creating a bundle that encapsulates BcMq. The bundle uses services to consume messages which makes it quite easy and elegant. Detailed instructions on how to install and use the bundle can be found on Github: BcMqBundle.

Subscribe to new posts

Made with ♥ by Florian Eckerstorfer in Vienna, Europe. 2023 . Imprint. Privacy Policy.

Источник

Saved searches

Use saved searches to filter your results more quickly

You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window. Reload to refresh your session.

ingresse/message-queue-php

This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?

Sign In Required

Please sign in to use Codespaces.

Launching GitHub Desktop

If nothing happens, download GitHub Desktop and try again.

Launching GitHub Desktop

If nothing happens, download GitHub Desktop and try again.

Launching Xcode

If nothing happens, download Xcode and try again.

Launching Visual Studio Code

Your codespace will open once ready.

There was a problem preparing your codespace, please try again.

Latest commit

Git stats

Files

Failed to load latest commit information.

readme.md

##Message Queue php library to publish and subscribe to queues with diferent types of adapters.

##Current supported adapters:

With Composer Add ingresse/message-queue-php as dependency in composer.json

"require":  . "ingresse/message-queue-php" : "1.*" ... >

Now the message-queue-php will be autoloaded into your project.

require 'vendor/autoload.php'; $configData = [ 'connection' => [ 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/' ], 'queues' => [ 'worker.test' => [ 'passive' => false, 'durable' => true, 'exclusive' => false, 'autoDelete' => false, 'delivery_mode' => 2 ] ], 'exchanges' => [ 'exchange.test' => [ 'type' => 'fanout' 'passive' => false, 'durable' => true, 'auto_delete' => false, 'internal' => false, 'nowait' => false, 'arguments' => false, 'ticket' => false, 'delivery_mode' => 2 ] ], 'consume' => [ 'Simpler' => [ 'noLocal' => false, 'noAck' => false, 'exclusive' => false, 'noWait' => false ] ], 'logger' => [ 'host' => 'localhost', 'port' => 6379, 'key' => 'logstash', 'channel' => 'message-queue-php' path' => '/var/log/message-queue-php.log' ] ]; $config = new MessageQueuePHP\Config\AMQPConfig($configData); $amqpAdapter = new MessageQueuePHP\Adapter\AMQPAdapter($config); $myPublisher = new MessageQueuePHP\Publisher\Publisher($amqpAdapter, 'worker.test'); $myPublisher ->setMessage($myData) ->send(); $subscriber = new MessageQueuePHP\Subscriber\Subscriber($amqpAdapter); $simplerConsumer = new MessageQueuePHP\Subscriber\Consumer\SimplerConsumer; $subscriber ->setConsumer($simplerConsumer) ->subscribe('worker.test') ->consume();

Источник

Saved searches

Use saved searches to filter your results more quickly

You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window. Reload to refresh your session.

message stack on php using local files

License

bozerkins/php-message-queue

This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?

Sign In Required

Please sign in to use Codespaces.

Launching GitHub Desktop

If nothing happens, download GitHub Desktop and try again.

Launching GitHub Desktop

If nothing happens, download GitHub Desktop and try again.

Launching Xcode

If nothing happens, download Xcode and try again.

Launching Visual Studio Code

Your codespace will open once ready.

There was a problem preparing your codespace, please try again.

Latest commit

Git stats

Files

Failed to load latest commit information.

README.md

A simple php implementation of message queue (or message queue) which can write messages and read messages in a FIFO manner.

composer require bozerkins/php-message-queue

To define a queue you first need to define some environment configurations. An environment of a queue messaging system defined what folders will be used for operations.

$queue = new \MessageQueue\Queue( new \MessageQueue\Environment( [ 'dir' => '/var/my-queue-folder', 'queue' => 'my-queue' ] ) );

You can write to queue and read from queue. These are the basic operations.

NOTE that you can use this with multiple processes. The Message Queue uses flock(), so there shouldn’t be a problem.

# write two messages $queue->write( [ 'my message', 'my second message' ] ); # read two messages print_r($queue->read(2));

For several optimization reasons the queue does not delete messages by itself. To free some disk space from already read messages just run this command from time to time. Please note that this operation is disk write heavy (when lot’s of messages are being passed)

To optimize reads from the queue the library uses a caching construction. Messages are added into file system based cache in chunks, by default single chunk size is 100 messages. If you intend to read more than 100 messages from the queue at once please change the configuration option ‘rotate_amount’ to a bigger number.

$queue = new \MessageQueue\Queue( new \MessageQueue\Environment( [ 'dir' => '/var/my-queue-folder', 'queue' => 'my-queue', 'rotate_amount' => 200 ] ) );

If you wish to improve the library, feel free to submit merge request or contact me at b.ozerkins@gmail.com

Источник

Оцените статью