Событийные циклы и асинхронная обработка

Event Loop (событийный цикл) является ядром асинхронной архитектуры Localzet Server. Он обеспечивает неблокирующую обработку тысяч одновременных соединений в одном процессе.

Концепция Event Loop

Event Loop реализует паттерн Reactor, где один поток (процесс) управляет множественными I/O операциями через единую точку входа.

Принцип работы

┌─────────────────────────────────────┐
│         Event Loop                   │
│                                      │
│  while (hasActiveEvents) {          │
│    ├── Check Timers                 │
│    ├── Check I/O Events             │
│    │   ├── Readable streams         │
│    │   └── Writable streams         │
│    ├── Check Signals                │
│    └── Execute Callbacks            │
│  }                                   │
└─────────────────────────────────────┘

Архитектура Event Loop в Localzet Server

Интерфейс EventInterface

Все реализации событийного цикла реализуют единый интерфейс:

interface EventInterface {
    // Таймеры
    delay(float $delay, callable $func, array $args): int;
    repeat(float $interval, callable $func, array $args): int;
    offDelay(int $timerId): bool;
    offRepeat(int $timerId): bool;
    
    // Потоки
    onReadable($stream, callable $func): void;
    onWritable($stream, callable $func): void;
    offReadable($stream): bool;
    offWritable($stream): bool;
    
    // Сигналы
    onSignal(int $signal, callable $func): void;
    offSignal(int $signal): bool;
    
    // Управление
    run(): void;
    stop(): void;
    deleteAllTimer(): void;
    getTimerCount(): int;
}

Реализации Event Loop

1. Linux Event Loop (libuv / libev / libevent)

Приоритет выбора драйвера:

  1. libuv (ext-uv) — наиболее производительный

    • Кроссплатформенная библиотека событийного цикла
    • Используется в Node.js
    • Оптимальная производительность для сетевых операций
  2. libev (ext-ev) — высокопроизводительный

    • Легковесная библиотека
    • Отличная производительность для Unix систем
    • Минимальные накладные расходы
  3. libevent (ext-event) — стабильный

    • Проверенная временем библиотека
    • Хорошая производительность
    • Широкая поддержка
  4. Stream Select (fallback) — универсальный

    • Встроенный в PHP
    • Работает везде, но менее производителен
    • Используется при отсутствии других расширений

Механизм выбора:

DriverFactory::create()
├── Проверка ext-uv → UvDriver
├── Проверка ext-ev → EvDriver
├── Проверка ext-event → EventDriver
└── Fallback → StreamSelectDriver

2. Windows Event Loop

Специальная реализация для Windows, использующая:

  • stream_select() для I/O мультиплексирования
  • Windows-специфичные API для сигналов
  • Адаптированная обработка процессов

3. Swoole Event Loop

Использует встроенный событийный цикл расширения Swoole:

  • Максимальная производительность
  • Нативная поддержка coroutines
  • Интеграция с Swoole Runtime

4. Swow Event Loop

Современная реализация на базе Swow:

  • Поддержка PHP Fibers
  • Асинхронные операции
  • Современные возможности PHP 8.1+

Цикл обработки событий

Основной цикл (run)

while (true) {
    // 1. Обработка таймеров
    $now = $this->now();
    while (($timer = $this->timerQueue->extract($now)) !== null) {
        $timer->invoke();
    }
    
    // 2. Ожидание I/O событий
    $this->dispatch($blocking = true);
    // Где dispatch:
    // - Для libuv: uv_run()
    // - Для libev: ev_run()
    // - Для libevent: event_base_loop()
    // - Для select: stream_select()
    
    // 3. Проверка сигналов
    if (function_exists('pcntl_signal_dispatch')) {
        pcntl_signal_dispatch();
    }
    
    // 4. Проверка условий выхода
    if ($this->isEmpty()) {
        break;
    }
}

Обработка I/O событий

Readable Events (Чтение данных):

1. Регистрация потока:
   eventLoop->onReadable($socket, $callback)

2. Event Loop мониторит поток

3. При появлении данных:
   ├── Операционная система сигнализирует через epoll/kqueue
   ├── Event Loop вызывает callback
   ├── Callback читает данные (fread)
   └── Обработка прочитанных данных

4. Пример из TcpConnection:
   baseRead() {
       $buffer = fread($socket, READ_BUFFER_SIZE);
       $this->recvBuffer .= $buffer;
       $this->doProtocol();
   }

Writable Events (Запись данных):

1. Регистрация потока:
   eventLoop->onWritable($socket, $callback)

2. Event Loop мониторит готовность к записи

3. Когда сокет готов к записи:
   ├── Event Loop вызывает callback
   ├── Callback пишет данные (fwrite)
   ├── Если данные записаны полностью → offWritable
   └── Если частично → продолжает мониторинг

4. Пример из TcpConnection:
   baseWrite() {
       $len = fwrite($socket, $sendBuffer);
       if ($len === strlen($sendBuffer)) {
           // Все записано
           $this->eventLoop->offWritable($socket);
       } else {
           // Частично, продолжает мониторинг
           $this->sendBuffer = substr($sendBuffer, $len);
       }
   }

Управление таймерами

Типы таймеров

1. Delay Timer (Одноразовый таймер):

$timerId = Timer::delay(2.0, function() {
    echo "Выполнится через 2 секунды";
});

// Таймер автоматически удаляется после выполнения

Механизм работы:

delay($delay, $func, $args) {
    $callbackId = $this->driver->defer($delay, function() use ($func, $args) {
        $func(...$args);
    });
    return $callbackId;
}

2. Repeat Timer (Повторяющийся таймер):

$timerId = Timer::repeat(1.0, function() {
    echo "Выполняется каждую секунду";
});

// Таймер продолжает выполняться
// Для остановки: Timer::del($timerId)

Механизм работы:

repeat($interval, $func, $args) {
    $callbackId = $this->timerId++;
    $this->timerQueue->insert($callbackId, $interval, $func, $args);
    return $callbackId;
}

// При обработке:
while (($timer = $timerQueue->extract($now)) !== null) {
    $timer->invoke();
    
    // Если repeat - перевставляем в очередь
    if ($timer->isRepeating()) {
        $timerQueue->insert($timer->getId(), $interval, ...);
    }
}

Оптимизация таймеров

Timer Queue:

Localzet Server использует эффективную структуру данных для таймеров:

  • Min-Heap: Минимальная куча для быстрого поиска ближайшего таймера
  • O(log n): Сложность вставки и удаления
  • O(1): Получение следующего таймера

Обработка сигналов

Регистрация обработчиков сигналов

onSignal($signal, $func) {
    // Регистрация в Event Loop
    $callbackId = $this->driver->onSignal($signal, function() use ($func) {
        $func($signal);
    });
    
    // Регистрация в PHP
    pcntl_signal($signal, function($sig) {
        // Запускается при вызове pcntl_signal_dispatch()
    });
}

Механизм обработки

1. Система отправляет сигнал процессу

2. Event Loop получает уведомление:
   - libuv/libev/libevent имеют встроенную поддержку сигналов
   - stream_select требует дополнительной обработки через pcntl

3. Вызов callback функции:
   onSignal(SIGUSR1, function($signal) {
       // Обработка перезагрузки
   })

Suspension API (для Fibers)

Принцип работы

Suspension позволяет приостановить выполнение Fiber и возобновить его позже:

// В Fiber
$suspension = $eventLoop->getSuspension();
Timer::delay(1.0, fn() => $suspension->resume());
$suspension->suspend(); // Приостановка до вызова resume()

Реализация:

class Suspension {
    private ?Fiber $fiber = null;
    
    public function suspend(): mixed {
        $this->fiber = Fiber::getCurrent();
        return Fiber::suspend();
    }
    
    public function resume(mixed $value = null): void {
        if ($this->fiber !== null) {
            Fiber::resume($this->fiber, $value);
        }
    }
}

Оптимизации производительности

1. Эффективное мультиплексирование

  • Использование epoll/kqueue вместо select для больших количеств дескрипторов
  • O(1) сложность для большого количества файловых дескрипторов
  • Минимальные системные вызовы

2. Буферизация

  • Чтение большими блоками для минимизации системных вызовов
  • Запись только когда сокет готов (backpressure)
  • Управление буферами для предотвращения переполнения

3. Оптимизация таймеров

  • Min-heap для эффективного управления таймерами
  • Батчинг проверок таймеров
  • Ленивое удаление завершенных таймеров

4. Управление памятью

  • Переиспользование объектов где возможно
  • Минимизация аллокаций в горячих путях
  • Сборка мусора только в безопасных точках

Сравнение реализаций

Характеристикаlibuvlibevlibeventstream_select
Производительность⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Масштабируемость⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Кроссплатформенность⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Размер библиотекиБольшойСреднийСреднийВстроен
Поддержка Windows

Практические примеры

Пример 1: Асинхронное чтение файла

$file = fopen('large.txt', 'r');
$eventLoop->onReadable($file, function($file) {
    $data = fread($file, 8192);
    if ($data === false || feof($file)) {
        $eventLoop->offReadable($file);
        fclose($file);
    } else {
        processData($data);
    }
});

Пример 2: Таймаут для операции

$timeoutId = Timer::delay(5.0, function() use ($connection) {
    $connection->close();
});

// При успешном завершении
onSuccess(function() use ($timeoutId) {
    Timer::del($timeoutId);
});

Пример 3: Heartbeat через таймер

Timer::repeat(30.0, function() use ($connection) {
    if (!$connection->sendPing()) {
        Timer::del($timerId);
        $connection->close();
    }
});

Система обработки соединений