Событийные циклы и асинхронная обработка
Event Loop (событийный цикл) является ядром асинхронной архитектуры Localzet Server. Он обеспечивает неблокирующую обработку тысяч одновременных соединений в одном процессе.
Концепция Event Loop
Event Loop реализует паттерн Reactor, где один поток (процесс) управляет множественными I/O операциями через единую точку входа.
Принцип работы
┌─────────────────────────────────────┐
│ Event Loop │
│ │
│ while (hasActiveEvents) { │
│ ├── Check Timers │
│ ├── Check I/O Events │
│ │ ├── Readable streams │
│ │ └── Writable streams │
│ ├── Check Signals │
│ └── Execute Callbacks │
│ } │
└─────────────────────────────────────┘
Архитектура Event Loop в Localzet Server
Интерфейс EventInterface
Все реализации событийного цикла реализуют единый интерфейс:
interface EventInterface {
// Таймеры
delay(float $delay, callable $func, array $args): int;
repeat(float $interval, callable $func, array $args): int;
offDelay(int $timerId): bool;
offRepeat(int $timerId): bool;
// Потоки
onReadable($stream, callable $func): void;
onWritable($stream, callable $func): void;
offReadable($stream): bool;
offWritable($stream): bool;
// Сигналы
onSignal(int $signal, callable $func): void;
offSignal(int $signal): bool;
// Управление
run(): void;
stop(): void;
deleteAllTimer(): void;
getTimerCount(): int;
}
Реализации Event Loop
1. Linux Event Loop (libuv / libev / libevent)
Приоритет выбора драйвера:
-
libuv (ext-uv) — наиболее производительный
- Кроссплатформенная библиотека событийного цикла
- Используется в Node.js
- Оптимальная производительность для сетевых операций
-
libev (ext-ev) — высокопроизводительный
- Легковесная библиотека
- Отличная производительность для Unix систем
- Минимальные накладные расходы
-
libevent (ext-event) — стабильный
- Проверенная временем библиотека
- Хорошая производительность
- Широкая поддержка
-
Stream Select (fallback) — универсальный
- Встроенный в PHP
- Работает везде, но менее производителен
- Используется при отсутствии других расширений
Механизм выбора:
DriverFactory::create()
├── Проверка ext-uv → UvDriver
├── Проверка ext-ev → EvDriver
├── Проверка ext-event → EventDriver
└── Fallback → StreamSelectDriver
2. Windows Event Loop
Специальная реализация для Windows, использующая:
stream_select()для I/O мультиплексирования- Windows-специфичные API для сигналов
- Адаптированная обработка процессов
3. Swoole Event Loop
Использует встроенный событийный цикл расширения Swoole:
- Максимальная производительность
- Нативная поддержка coroutines
- Интеграция с Swoole Runtime
4. Swow Event Loop
Современная реализация на базе Swow:
- Поддержка PHP Fibers
- Асинхронные операции
- Современные возможности PHP 8.1+
Цикл обработки событий
Основной цикл (run)
while (true) {
// 1. Обработка таймеров
$now = $this->now();
while (($timer = $this->timerQueue->extract($now)) !== null) {
$timer->invoke();
}
// 2. Ожидание I/O событий
$this->dispatch($blocking = true);
// Где dispatch:
// - Для libuv: uv_run()
// - Для libev: ev_run()
// - Для libevent: event_base_loop()
// - Для select: stream_select()
// 3. Проверка сигналов
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
// 4. Проверка условий выхода
if ($this->isEmpty()) {
break;
}
}
Обработка I/O событий
Readable Events (Чтение данных):
1. Регистрация потока:
eventLoop->onReadable($socket, $callback)
2. Event Loop мониторит поток
3. При появлении данных:
├── Операционная система сигнализирует через epoll/kqueue
├── Event Loop вызывает callback
├── Callback читает данные (fread)
└── Обработка прочитанных данных
4. Пример из TcpConnection:
baseRead() {
$buffer = fread($socket, READ_BUFFER_SIZE);
$this->recvBuffer .= $buffer;
$this->doProtocol();
}
Writable Events (Запись данных):
1. Регистрация потока:
eventLoop->onWritable($socket, $callback)
2. Event Loop мониторит готовность к записи
3. Когда сокет готов к записи:
├── Event Loop вызывает callback
├── Callback пишет данные (fwrite)
├── Если данные записаны полностью → offWritable
└── Если частично → продолжает мониторинг
4. Пример из TcpConnection:
baseWrite() {
$len = fwrite($socket, $sendBuffer);
if ($len === strlen($sendBuffer)) {
// Все записано
$this->eventLoop->offWritable($socket);
} else {
// Частично, продолжает мониторинг
$this->sendBuffer = substr($sendBuffer, $len);
}
}
Управление таймерами
Типы таймеров
1. Delay Timer (Одноразовый таймер):
$timerId = Timer::delay(2.0, function() {
echo "Выполнится через 2 секунды";
});
// Таймер автоматически удаляется после выполнения
Механизм работы:
delay($delay, $func, $args) {
$callbackId = $this->driver->defer($delay, function() use ($func, $args) {
$func(...$args);
});
return $callbackId;
}
2. Repeat Timer (Повторяющийся таймер):
$timerId = Timer::repeat(1.0, function() {
echo "Выполняется каждую секунду";
});
// Таймер продолжает выполняться
// Для остановки: Timer::del($timerId)
Механизм работы:
repeat($interval, $func, $args) {
$callbackId = $this->timerId++;
$this->timerQueue->insert($callbackId, $interval, $func, $args);
return $callbackId;
}
// При обработке:
while (($timer = $timerQueue->extract($now)) !== null) {
$timer->invoke();
// Если repeat - перевставляем в очередь
if ($timer->isRepeating()) {
$timerQueue->insert($timer->getId(), $interval, ...);
}
}
Оптимизация таймеров
Timer Queue:
Localzet Server использует эффективную структуру данных для таймеров:
- Min-Heap: Минимальная куча для быстрого поиска ближайшего таймера
- O(log n): Сложность вставки и удаления
- O(1): Получение следующего таймера
Обработка сигналов
Регистрация обработчиков сигналов
onSignal($signal, $func) {
// Регистрация в Event Loop
$callbackId = $this->driver->onSignal($signal, function() use ($func) {
$func($signal);
});
// Регистрация в PHP
pcntl_signal($signal, function($sig) {
// Запускается при вызове pcntl_signal_dispatch()
});
}
Механизм обработки
1. Система отправляет сигнал процессу
2. Event Loop получает уведомление:
- libuv/libev/libevent имеют встроенную поддержку сигналов
- stream_select требует дополнительной обработки через pcntl
3. Вызов callback функции:
onSignal(SIGUSR1, function($signal) {
// Обработка перезагрузки
})
Suspension API (для Fibers)
Принцип работы
Suspension позволяет приостановить выполнение Fiber и возобновить его позже:
// В Fiber
$suspension = $eventLoop->getSuspension();
Timer::delay(1.0, fn() => $suspension->resume());
$suspension->suspend(); // Приостановка до вызова resume()
Реализация:
class Suspension {
private ?Fiber $fiber = null;
public function suspend(): mixed {
$this->fiber = Fiber::getCurrent();
return Fiber::suspend();
}
public function resume(mixed $value = null): void {
if ($this->fiber !== null) {
Fiber::resume($this->fiber, $value);
}
}
}
Оптимизации производительности
1. Эффективное мультиплексирование
- Использование epoll/kqueue вместо select для больших количеств дескрипторов
- O(1) сложность для большого количества файловых дескрипторов
- Минимальные системные вызовы
2. Буферизация
- Чтение большими блоками для минимизации системных вызовов
- Запись только когда сокет готов (backpressure)
- Управление буферами для предотвращения переполнения
3. Оптимизация таймеров
- Min-heap для эффективного управления таймерами
- Батчинг проверок таймеров
- Ленивое удаление завершенных таймеров
4. Управление памятью
- Переиспользование объектов где возможно
- Минимизация аллокаций в горячих путях
- Сборка мусора только в безопасных точках
Сравнение реализаций
| Характеристика | libuv | libev | libevent | stream_select |
|---|---|---|---|---|
| Производительность | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Масштабируемость | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ |
| Кроссплатформенность | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Размер библиотеки | Большой | Средний | Средний | Встроен |
| Поддержка Windows | ✅ | ❌ | ✅ | ✅ |
Практические примеры
Пример 1: Асинхронное чтение файла
$file = fopen('large.txt', 'r');
$eventLoop->onReadable($file, function($file) {
$data = fread($file, 8192);
if ($data === false || feof($file)) {
$eventLoop->offReadable($file);
fclose($file);
} else {
processData($data);
}
});
Пример 2: Таймаут для операции
$timeoutId = Timer::delay(5.0, function() use ($connection) {
$connection->close();
});
// При успешном завершении
onSuccess(function() use ($timeoutId) {
Timer::del($timeoutId);
});
Пример 3: Heartbeat через таймер
Timer::repeat(30.0, function() use ($connection) {
if (!$connection->sendPing()) {
Timer::del($timerId);
$connection->close();
}
});

