Главная > Теория и практика > Очередь сообщений на основе PHP и MemcacheQ

Очередь сообщений на основе PHP и MemcacheQ

В предыдущей статье мы рассмотрели принцип работы систем очередей сообщений.
В этой статье мы рассотрим пример реализации такого решения на основе PHP и системы MemcacheQ. В качестве приложения выберем распространенную задачу по отправке email сообщений.

Сначала место memcacheQ должен был занять продукт Dropr, но более приближенный взгляд на него заставил пересмотреть все за и против. Dropr является стремительно развивающимся решением, у которого еще нет стабильной версии. Поэтому гарантировать то, что его получится установить, никто пока не может (установка сводится к чекауту из транка). Следовательно, я остановился на более простом решении, которое порекомендовал один из читателей данного блога - MemcacheQ.

Пример приложения

Для начала, что представляет собой наш пример на текущем этапе. Это страница с формой из одного текстового поля и кнопки. После нажатия кнопки происходит отправка email сообщений на 10 различных email адресов. После этого, пользователь системы получает страницу с текстом “Сообщения отправлены”. Исходный код на php:

$list = array(
‘test1@email.com’,
‘test2@email.com’,
‘test3@email.com’,
‘test4@email.com’,
‘test5@email.com’,
‘test6@email.com’,
‘test7@email.com’,
‘test8@email.com’,
‘test9@email.com’,
‘test10@email.com’
);

if ( $_POST['submit'] )
{
foreach ( $list as $email )
{
mail($email, ‘Test’, $_POST['body']);
}

$success = true;
}

if ( $success )
{
echo ‘10 сообщений успешно отправлены’;
} else
{
// Выводим форму
}

После небольшого анализа, понятно, что самой затратной операцией на сервере является отправка email сообщений, поэтому вынесем эту операцию в асинхронный поток. Организуем очередь, куда будем складывать новые задачи по отправке email’ов.

В качестве решения для организации очереди, мы выбрали MemcacheQ, теперь по порядку:

Установка MemcacheQ

Для работы memcacheQ Вам потребуется PHP с расширением memcache, т.к. система работает на основе данного протокола. Система построена над memcacheDB, поэтому хранилище очередей организовано, как “постоянное” (другими словами, все хорошо и надежно).

Для начала Вам необходимо поставить BerkleyDB и убедиться, что на системе стоит библиотека libevent >= 1.4. Как все это ставить описано в статье Твиттер на основе MemcacheDB и PHP. После этого качаем тут и ставим memcacheq:

tar xvzf memcacheq-0.1.x.tar.gz
cd memcacheq-0.1.x
./configure --enable-threads
make
sudo make install

Теперь необходимо создать папку данных, например: /var/data/queue и запускаем демон очереди:

memcacheq -d -r -H /var/data/queue -N -v -L 1024 -B 1024 > /var/log/queue_error.log 2>&1

По умолчанию демон будет слушать порт 22201. Следует отметить, что размер сообщений в очереди всегда фиксирован и ограничивается параметор демона -B. Если Ваше сообщение имеет меньшую длинну, то оно дополнится до требуемого размера символами 0×20. Мы выбрали длинну в 1024 байта.

Тестируем очередь сообщений

Для тестирования очереди напишем простенький скрипт:

# Подключаемся к серверу очереди
$mc = memcache_connect('localhost', 22201);

# Создаем от 10 до 20 сообщений в очереди
for ( $i = 0; $i < rand(10, 20); $i++ )
{
        # Название нашей очереди - "test"
        memcache_set($mc, 'test', 'message ' . date('H:i', time()), 0, 0);
}

# Выбираем все сообщения из очереди "test"
while ( $m = memcache_get($mc, 'test') )
{
        echo $m . "\n";
}

В этом просто примере мы записываем несколько сообщений в очередь, после чего читаем и выводим их. Исполнение его из командной строки выдаст приблизительно следующий результат:

message 15:38
message 15:38
message 15:38
message 15:38
message 15:38
message 15:38
message 15:38
message 15:38
message 15:38
message 15:38
message 15:38

Решение задачи с отправкой email’ов

Вернемся к нашей задаче. Мы определили отправку почты, как ресурсоемкую операцию, которую стоит сделать асинхронной посредством очереди сообщений. Следовательно, алгоритм интерфейса следующий:

  1. Получаем запрос от клиента
  2. Добавляем в очередь соощение клиента (и больше ничего не делаем)
  3. Выводим сообщение об успешной отправке

Параллельно будет работать обработчик очереди по такому принципу:

  1. Проверить, есть ли новые сообщения в очереди
  2. Получить следующее сообщение и разослать email’ы
Реализация интерфейса:
if ( $_POST['submit'] )
{
        # Добавляем сообщение в очередь "mails"
        $mc = memcache_connect('localhost', 22201);
        memcache_set($mc, 'mails', $_POST['body'], 0, 0);

        $success = true;
}

if ( $success )
{
        echo '10 сообщений успешно отправлены';
}
else
{
        // Выводим форму
}

После того, как происходит сабмит формы, мы просто добавим сообщение пользователя в очередь “mails” и выведем сообщение об успешной отправке. Ресурсов на это уйдет минимум и время отдачи от сервера станет намного меньшим.

Реализация обработчика сообщений:

Этот скрипт необходимо будет запустить один только раз, т.к. он должен работать постоянно (постоянно проверять, нет ли новых сообщений). Соответственно, его необходимо будет запускать из CLI:

$list = array(
        'test1@email.com',
        'test2@email.com',
        'test3@email.com',
        'test4@email.com',
        'test5@email.com',
        'test6@email.com',
        'test7@email.com',
        'test8@email.com',
        'test9@email.com',
        'test10@email.com'
);

$memcache_obj = memcache_connect('localhost', 22201);

while ( true )
{
if ( $m = memcache_get($memcache_obj, 'mails') )
{
        foreach ( $list as $email )
        {
                mail($email, 'Test', $m);
        }
}
}

Что мы получили в результате:

  • Быстрый интерфейс
  • Асинхронный обработчик, который можно вынести на отдельный узел
  • Балансирование нагрузки по времени, т.к. интерфейс теперь не будет подвержен “пиковым тормозам”
Google Bookmarks Digg I.ua Ru-marks Ruspace Zakladok.net Reddit delicious Technorati Yahoo My Web News2.ru БобрДобр.ru Memori.ru rucity.com

Статьи по теме

  1. Andy Hammer
    12 Июнь 2009 в 13:49 | #1

    понятно теперь как на php организована многопоточность. А что произойдет если memcache отвалится? По идее имеет обработать и эту ситуацию и отправить письма в текущем потоке. Или есть стандартное решение?

    • 12 Июнь 2009 в 13:56 | #2

      А что если отвалится MTA? Такие ситуации решаются только специфическими разработками. Например, для избежания проблем из-за отпавшего MemcacheQ Вы можете, на уровне приложения, сохранять данные во временную директорию на диск (например), а потом асинхронно, проверять эту директорию на наличие файлов и их последующую обработку. Т.е. выбирать Вам. Хотя такая ситуация достаточно маловероятна, т.к. memcache (и построенная на нем memcacheQ) достаточно прост и ситуацию, когда он “просто падает” крайне тяжело получить на практике.

  2. Andy Hammer
    12 Июнь 2009 в 17:51 | #3

    ну падает, не из-за нестабильности продукта, а некометентности скажем хостера.
    что такое МТА?

    • 12 Июнь 2009 в 17:53 | #4

      MTA - mail transfer agent

      А понятия некомпетентный хостер и высоконагруженная система несовместимы :)

  3. IgorN
    3 Июль 2009 в 16:58 | #5

    Супер статья. Наконец то нормальный и простой примерю.

  4. IgorN
    3 Июль 2009 в 17:00 | #6

    Маленький вопрос, для каждой задачи надо вешать демон на какой то свой порт?

  5. 3 Июль 2009 в 17:31 | #7

    @IgorN
    Демоны рабочих скриптов не должны слушать порты - не актуально в этом случае.
    А кроме этого:
    1. Если задача специфическая и для нее необходима отдельная платформа (например задачи транскодирования видео/аудио), то для них лучше создавать отдельные обработчики, которые работают только с задачами определенных типов. Еще лучше для них и отдельные очереди иметь.
    2. Если же задачи выполнимы в пределах используемых технологий на бекендах, то лучше иметь обработчики, которые умеют работать со всеми типами задач - это поможет натурально балансировать нагрузку.

  6. hostmaster
    11 Сентябрь 2009 в 15:38 | #8

    Решение простое но в в HighLoad так делать не стоит. Предположим что у вас не 20 а 200 000 сообщений (это же HL, не правда ли).
    тогда
    - хорошо бы использовать multiget (ага http://highload.com.ua/index.php/2009/08/05/memcache-multi-get-%D0%B7%D0%B0%D1%87%D0%B5%D0%BC/)
    - предположим что вызов mail() завершился не удачей, что при таком почтовом трафике обычная ситуация, что тогда ? потеря сообщения вот что. надо продумать логику с возвратом сообщения в очередь в случае не удачи. а если теперь в момент отправки сообщения наш сервер аварийно завершит работу. опять получаем потерю сообщений.

  7. 11 Сентябрь 2009 в 16:37 | #9

    @hostmaster
    Спасибо! Вы очень верно подметили.
    Приведенный пример - это пример, не следует его использовать в продуктивных средах.

    Хотелось бы отметить один момент. Если в период между моментами запуска рабочего процесса, накапливается 200 тыс. сообщений - значит имеем проблемы с настройкой либо железом. Нужно добавлять больше серверов для обработки очередей, либо больше процессоров для запуска новых процессов обработки.

  8. 4 Декабрь 2009 в 12:26 | #10

    Раз это демон, я б добавил в foreach usleep(100), а в while sleep(1) - это как минимум. Ну и обработку отлупа - возврат в очередь задачи. Плюс, я бы очередь строил не так, что каждая задача - отправка N сообщений, а так, чтобы каждая задача - отправка одного сообщения. Тогда это все легко параллелится. А где тут мультигет - я вообще не понимаю, вроде memcacheq его не поддерживает.

  9. 5 Декабрь 2009 в 08:42 | #11

    @kaa
    Мультигет в memcacheQ в принципе не актуален. Мне кажется, hostmaster имел ввиду работу с Memcache/MemcacheDB для вытаскивания дополнительных данных к каждой задаче. А это делать понадобится, т.к. записи в очередях имеют фиксированную длину.

  10. Андрей
    31 Март 2010 в 13:00 | #12

    >>Соответственно, его необходимо будет запускать из CLI:

    Это значит по Cron что ли?

    Как я понимаю организовав эти очереди, можно выполнение ресурсоемких операций отложить на потом (скажем на то время когда нагрузка на проект снизилась)?

  11. 31 Март 2010 в 13:04 | #13

    @Андрей
    Как удобнее - не обязательно кроном. Можно разработать демон.

  1. Пока что нет уведомлений.