Очередь сообщений на основе PHP и MemcacheQ
В предыдущей статье мы рассмотрели принцип работы систем очередей сообщений.
В этой статье мы рассотрим пример реализации такого решения на основе PHP и системы MemcacheQ. В качестве приложения выберем распространенную задачу по отправке email сообщений.
Сначала место memcacheQ должен был занять продукт Dropr, но более приближенный взгляд на него заставил пересмотреть все за и против. Dropr является стремительно развивающимся решением, у которого еще нет стабильной версии. Поэтому гарантировать то, что его получится установить, никто пока не может (установка сводится к чекауту из транка). Следовательно, я остановился на более простом решении, которое порекомендовал один из читателей данного блога - MemcacheQ.
Пример приложения
Для начала, что представляет собой наш пример на текущем этапе. Это страница с формой из одного текстового поля и кнопки. После нажатия кнопки происходит отправка email сообщений на 10 различных email адресов. После этого, пользователь системы получает страницу с текстом “Сообщения отправлены”. Исходный код на php:
$list = array(
‘test1@email.com’,
‘test2@email.com’,
‘test3@email.com’,
‘test4@email.com’,
‘test5@email.com’,
‘test6@email.com’,
‘test7@email.com’,
‘test8@email.com’,
‘test9@email.com’,
‘test10@email.com’
);if ( $_POST['submit'] )
{
foreach ( $list as $email )
{
mail($email, ‘Test’, $_POST['body']);
}$success = true;
}if ( $success )
{
echo ‘10 сообщений успешно отправлены’;
} else
{
// Выводим форму
}
После небольшого анализа, понятно, что самой затратной операцией на сервере является отправка email сообщений, поэтому вынесем эту операцию в асинхронный поток. Организуем очередь, куда будем складывать новые задачи по отправке email’ов.
В качестве решения для организации очереди, мы выбрали MemcacheQ, теперь по порядку:
Установка MemcacheQ
Для работы memcacheQ Вам потребуется PHP с расширением memcache, т.к. система работает на основе данного протокола. Система построена над memcacheDB, поэтому хранилище очередей организовано, как “постоянное” (другими словами, все хорошо и надежно).
Для начала Вам необходимо поставить BerkleyDB и убедиться, что на системе стоит библиотека libevent >= 1.4. Как все это ставить описано в статье Твиттер на основе MemcacheDB и PHP. После этого качаем тут и ставим memcacheq:
tar xvzf memcacheq-0.1.x.tar.gz cd memcacheq-0.1.x ./configure --enable-threads make sudo make install
Теперь необходимо создать папку данных, например: /var/data/queue и запускаем демон очереди:
memcacheq -d -r -H /var/data/queue -N -v -L 1024 -B 1024 > /var/log/queue_error.log 2>&1
По умолчанию демон будет слушать порт 22201. Следует отметить, что размер сообщений в очереди всегда фиксирован и ограничивается параметор демона -B. Если Ваше сообщение имеет меньшую длинну, то оно дополнится до требуемого размера символами 0×20. Мы выбрали длинну в 1024 байта.
Тестируем очередь сообщений
Для тестирования очереди напишем простенький скрипт:
# Подключаемся к серверу очереди
$mc = memcache_connect('localhost', 22201);
# Создаем от 10 до 20 сообщений в очереди
for ( $i = 0; $i < rand(10, 20); $i++ )
{
# Название нашей очереди - "test"
memcache_set($mc, 'test', 'message ' . date('H:i', time()), 0, 0);
}
# Выбираем все сообщения из очереди "test"
while ( $m = memcache_get($mc, 'test') )
{
echo $m . "\n";
}
В этом просто примере мы записываем несколько сообщений в очередь, после чего читаем и выводим их. Исполнение его из командной строки выдаст приблизительно следующий результат:
message 15:38 message 15:38 message 15:38 message 15:38 message 15:38 message 15:38 message 15:38 message 15:38 message 15:38 message 15:38 message 15:38
Решение задачи с отправкой email’ов
Вернемся к нашей задаче. Мы определили отправку почты, как ресурсоемкую операцию, которую стоит сделать асинхронной посредством очереди сообщений. Следовательно, алгоритм интерфейса следующий:
- Получаем запрос от клиента
- Добавляем в очередь соощение клиента (и больше ничего не делаем)
- Выводим сообщение об успешной отправке
Параллельно будет работать обработчик очереди по такому принципу:
- Проверить, есть ли новые сообщения в очереди
- Получить следующее сообщение и разослать email’ы
Реализация интерфейса:
if ( $_POST['submit'] )
{
# Добавляем сообщение в очередь "mails"
$mc = memcache_connect('localhost', 22201);
memcache_set($mc, 'mails', $_POST['body'], 0, 0);
$success = true;
}
if ( $success )
{
echo '10 сообщений успешно отправлены';
}
else
{
// Выводим форму
}
После того, как происходит сабмит формы, мы просто добавим сообщение пользователя в очередь “mails” и выведем сообщение об успешной отправке. Ресурсов на это уйдет минимум и время отдачи от сервера станет намного меньшим.
Реализация обработчика сообщений:
Этот скрипт необходимо будет запустить один только раз, т.к. он должен работать постоянно (постоянно проверять, нет ли новых сообщений). Соответственно, его необходимо будет запускать из CLI:
$list = array(
'test1@email.com',
'test2@email.com',
'test3@email.com',
'test4@email.com',
'test5@email.com',
'test6@email.com',
'test7@email.com',
'test8@email.com',
'test9@email.com',
'test10@email.com'
);
$memcache_obj = memcache_connect('localhost', 22201);
while ( true )
{
if ( $m = memcache_get($memcache_obj, 'mails') )
{
foreach ( $list as $email )
{
mail($email, 'Test', $m);
}
}
}
Что мы получили в результате:
- Быстрый интерфейс
- Асинхронный обработчик, который можно вынести на отдельный узел
- Балансирование нагрузки по времени, т.к. интерфейс теперь не будет подвержен “пиковым тормозам”


понятно теперь как на php организована многопоточность. А что произойдет если memcache отвалится? По идее имеет обработать и эту ситуацию и отправить письма в текущем потоке. Или есть стандартное решение?
А что если отвалится MTA? Такие ситуации решаются только специфическими разработками. Например, для избежания проблем из-за отпавшего MemcacheQ Вы можете, на уровне приложения, сохранять данные во временную директорию на диск (например), а потом асинхронно, проверять эту директорию на наличие файлов и их последующую обработку. Т.е. выбирать Вам. Хотя такая ситуация достаточно маловероятна, т.к. memcache (и построенная на нем memcacheQ) достаточно прост и ситуацию, когда он “просто падает” крайне тяжело получить на практике.
ну падает, не из-за нестабильности продукта, а некометентности скажем хостера.
что такое МТА?
MTA - mail transfer agent
А понятия некомпетентный хостер и высоконагруженная система несовместимы
Супер статья. Наконец то нормальный и простой примерю.
Маленький вопрос, для каждой задачи надо вешать демон на какой то свой порт?
@IgorN
Демоны рабочих скриптов не должны слушать порты - не актуально в этом случае.
А кроме этого:
1. Если задача специфическая и для нее необходима отдельная платформа (например задачи транскодирования видео/аудио), то для них лучше создавать отдельные обработчики, которые работают только с задачами определенных типов. Еще лучше для них и отдельные очереди иметь.
2. Если же задачи выполнимы в пределах используемых технологий на бекендах, то лучше иметь обработчики, которые умеют работать со всеми типами задач - это поможет натурально балансировать нагрузку.
Решение простое но в в HighLoad так делать не стоит. Предположим что у вас не 20 а 200 000 сообщений (это же HL, не правда ли).
тогда
- хорошо бы использовать multiget (ага http://highload.com.ua/index.php/2009/08/05/memcache-multi-get-%D0%B7%D0%B0%D1%87%D0%B5%D0%BC/)
- предположим что вызов mail() завершился не удачей, что при таком почтовом трафике обычная ситуация, что тогда ? потеря сообщения вот что. надо продумать логику с возвратом сообщения в очередь в случае не удачи. а если теперь в момент отправки сообщения наш сервер аварийно завершит работу. опять получаем потерю сообщений.
@hostmaster
Спасибо! Вы очень верно подметили.
Приведенный пример - это пример, не следует его использовать в продуктивных средах.
Хотелось бы отметить один момент. Если в период между моментами запуска рабочего процесса, накапливается 200 тыс. сообщений - значит имеем проблемы с настройкой либо железом. Нужно добавлять больше серверов для обработки очередей, либо больше процессоров для запуска новых процессов обработки.
Раз это демон, я б добавил в foreach usleep(100), а в while sleep(1) - это как минимум. Ну и обработку отлупа - возврат в очередь задачи. Плюс, я бы очередь строил не так, что каждая задача - отправка N сообщений, а так, чтобы каждая задача - отправка одного сообщения. Тогда это все легко параллелится. А где тут мультигет - я вообще не понимаю, вроде memcacheq его не поддерживает.
@kaa
Мультигет в memcacheQ в принципе не актуален. Мне кажется, hostmaster имел ввиду работу с Memcache/MemcacheDB для вытаскивания дополнительных данных к каждой задаче. А это делать понадобится, т.к. записи в очередях имеют фиксированную длину.
>>Соответственно, его необходимо будет запускать из CLI:
Это значит по Cron что ли?
Как я понимаю организовав эти очереди, можно выполнение ресурсоемких операций отложить на потом (скажем на то время когда нагрузка на проект снизилась)?
@Андрей
Как удобнее - не обязательно кроном. Можно разработать демон.