Система обработки соединений
Localzet Server использует объектно-ориентированную модель для представления сетевых соединений, обеспечивая высокую производительность и гибкость.
Иерархия классов соединений
ConnectionInterface (абстрактный)
├── TcpConnection (TCP соединения)
├── UdpConnection (UDP соединения)
├── AsyncTcpConnection (асинхронные TCP соединения)
└── AsyncUdpConnection (асинхронные UDP соединения)
TcpConnection: Детальный анализ
Жизненный цикл соединения
1. Создание (__construct)
   ├── Инициализация сокета
   ├── Настройка неблокирующего режима
   ├── Регистрация в Event Loop
   └── Установка начальных параметров
2. Установка соединения (STATUS_CONNECTING → STATUS_ESTABLISHED)
   ├── Вызов onConnect callback
   ├── Инициализация протокола
   └── Начало приема данных
3. Обработка данных (STATUS_ESTABLISHED)
   ├── Чтение данных (baseRead)
   ├── Парсинг протоколом (input/decode)
   ├── Обработка бизнес-логикой (onMessage)
   └── Отправка ответа (send)
4. Закрытие (STATUS_ESTABLISHED → STATUS_CLOSING → STATUS_CLOSED)
   ├── Прекращение приема новых данных
   ├── Отправка оставшихся данных
   ├── Вызов onClose callback
   └── Освобождение ресурсов
Структура данных соединения
TcpConnection {
    // Идентификаторы
    int $id                    // Уникальный ID соединения
    int $realId               // ID для связи с Server
    
    // Сокет и адреса
    resource $socket          // Файловый дескриптор сокета
    string $remoteAddress     // "IP:PORT" клиента
    string $localAddress      // "IP:PORT" сервера
    
    // Буферы
    string $sendBuffer        // Буфер для отправки
    string $recvBuffer        // Буфер для приема
    int $currentPackageLength // Длина ожидаемого пакета
    
    // Протоколы
    string $transport         // "tcp", "udp", "ssl"
    string $protocol          // "http", "websocket", etc.
    
    // Статус
    int $status               // Состояние соединения
    bool $isPaused            // Приостановлено ли чтение
    bool $sslHandshakeCompleted
    
    // События
    callable $onMessage
    callable $onClose
    callable $onError
    callable $onBufferFull
    callable $onBufferDrain
    
    // Производительность
    int $bytesRead           // Прочитано байт
    int $bytesWritten        // Записано байт
    int $maxSendBufferSize   // Максимальный размер буфера отправки
    int $maxPackageSize      // Максимальный размер пакета
}
Процесс чтения данных
Алгоритм baseRead()
baseRead() {
    // 1. Чтение данных из сокета
    $buffer = fread($socket, READ_BUFFER_SIZE);
    
    if ($buffer === '' || $buffer === false) {
        // Соединение закрыто
        destroy();
        return;
    }
    
    // 2. Добавление в буфер приема
    $this->recvBuffer .= $buffer;
    $this->bytesRead += strlen($buffer);
    
    // 3. Обработка SSL (если применимо)
    if (!$this->sslHandshakeCompleted) {
        $this->doSslHandshake();
        return;
    }
    
    // 4. Обработка протоколом прикладного уровня
    if ($this->protocol) {
        $this->doProtocol();
    } else {
        // Без протокола - передача сырых данных
        ($this->onMessage)($this, $this->recvBuffer);
        $this->recvBuffer = '';
    }
}
Обработка протоколом (doProtocol)
doProtocol() {
    while ($this->recvBuffer !== '') {
        // 1. Проверка целостности пакета
        $length = $this->protocol::input(
            $this->recvBuffer, 
            $this
        );
        
        if ($length === 0) {
            // Данных недостаточно, ждем еще
            return;
        }
        
        if ($length === false) {
            // Ошибка протокола - закрываем соединение
            $this->close();
            return;
        }
        
        // 2. Извлечение полного пакета
        $package = substr($this->recvBuffer, 0, $length);
        $this->recvBuffer = substr($this->recvBuffer, $length);
        $this->currentPackageLength = 0;
        
        // 3. Декодирование пакета
        try {
            $data = $this->protocol::decode($package, $this);
            
            // 4. Передача в бизнес-логику
            ($this->onMessage)($this, $data);
        } catch (Throwable $e) {
            $this->error($e);
        }
    }
}
Процесс отправки данных
Алгоритм send()
send($data, $raw = false) {
    // 1. Проверка состояния соединения
    if ($this->status === STATUS_CLOSING || STATUS_CLOSED) {
        return false;
    }
    
    // 2. Кодирование протоколом (если не raw)
    if (!$raw && $this->protocol) {
        $data = $this->protocol::encode($data, $this);
        
        if ($data === '') {
            return null; // Пустой ответ
        }
    }
    
    // 3. Проверка SSL рукопожатия
    if ($this->transport === 'ssl' && !$this->sslHandshakeCompleted) {
        // Добавляем в буфер, отправим после рукопожатия
        $this->sendBuffer .= $data;
        return null;
    }
    
    // 4. Попытка прямой отправки
    if ($this->sendBuffer === '') {
        $len = @fwrite($this->socket, $data);
        
        if ($len === strlen($data)) {
            // Все отправлено
            $this->bytesWritten += $len;
            return true;
        }
        
        if ($len > 0) {
            // Частично отправлено
            $this->sendBuffer = substr($data, $len);
            $this->bytesWritten += $len;
        } else {
            // Ошибка записи
            ++ConnectionInterface::$statistics['send_fail'];
            $this->destroy();
            return false;
        }
    } else {
        // Буфер не пуст - добавляем в очередь
        $this->sendBuffer .= $data;
    }
    
    // 5. Регистрация в Event Loop для отправки
    if ($this->sendBuffer !== '') {
        $this->eventLoop->onWritable($this->socket, $this->baseWrite(...));
    }
    
    // 6. Проверка переполнения буфера
    $this->checkBufferWillFull();
    
    return null; // Данные в буфере, отправка асинхронная
}
Алгоритм baseWrite()
baseWrite() {
    // 1. Попытка записи данных
    $len = @fwrite($this->socket, $this->sendBuffer);
    
    if ($len === false) {
        // Ошибка - закрываем соединение
        ++ConnectionInterface::$statistics['send_fail'];
        $this->destroy();
        return;
    }
    
    if ($len === strlen($this->sendBuffer)) {
        // 2. Все данные отправлены
        $this->bytesWritten += $len;
        $this->sendBuffer = '';
        
        // 3. Отключение мониторинга записи
        $this->eventLoop->offWritable($this->socket);
        
        // 4. Вызов onBufferDrain callback
        if ($this->onBufferDrain) {
            ($this->onBufferDrain)($this);
        }
        
        // 5. Если соединение закрывается - завершаем
        if ($this->status === STATUS_CLOSING) {
            $this->destroy();
        }
        
        return;
    }
    
    // 6. Частичная отправка
    if ($len > 0) {
        $this->bytesWritten += $len;
        $this->sendBuffer = substr($this->sendBuffer, $len);
        // Продолжаем мониторинг для оставшихся данных
    }
}
Управление буферами
Контроль переполнения буфера
checkBufferWillFull() {
    if (strlen($this->sendBuffer) >= $this->maxSendBufferSize) {
        // Буфер полон
        
        // 1. Вызов onBufferFull callback
        if ($this->onBufferFull) {
            ($this->onBufferFull)($this);
        }
        
        // 2. Приостановка приема данных
        $this->pauseRecv();
    }
}
Пауза и возобновление приема
pauseRecv() {
    if ($this->isPaused) {
        return;
    }
    
    $this->isPaused = true;
    $this->eventLoop->offReadable($this->socket);
}
resumeRecv() {
    if (!$this->isPaused) {
        return;
    }
    
    $this->isPaused = false;
    $this->eventLoop->onReadable($this->socket, $this->baseRead(...));
}
Сценарий использования:
1. Клиент отправляет данные быстрее, чем сервер может обработать
2. sendBuffer заполняется до maxSendBufferSize
3. Вызывается onBufferFull
4. Приостанавливается прием данных (pauseRecv)
5. Когда буфер отправки освобождается (onBufferDrain)
6. Возобновляется прием данных (resumeRecv)
SSL/TLS соединения
Процесс SSL рукопожатия
doSslHandshake() {
    if ($this->sslHandshakeCompleted) {
        return;
    }
    
    // 1. Инициализация SSL контекста
    $result = stream_socket_enable_crypto(
        $this->socket,
        true,
        STREAM_CRYPTO_METHOD_TLSv1_2_SERVER
    );
    
    if ($result === true) {
        // 2. Рукопожатие завершено
        $this->sslHandshakeCompleted = true;
        
        // 3. Отправка данных из буфера
        if ($this->sendBuffer !== '') {
            $this->eventLoop->onWritable($this->socket, $this->baseWrite(...));
        }
        
        // 4. Обработка данных из буфера приема
        if ($this->recvBuffer !== '') {
            $this->doProtocol();
        }
    } elseif ($result === false) {
        // Ошибка рукопожатия
        $this->close();
    }
    // Иначе (0) - рукопожатие в процессе, продолжаем ожидание
}
Безопасность SSL
- Поддержка TLS 1.2 и выше
 - Настраиваемые параметры через socket context
 - Валидация сертификатов
 - SNI (Server Name Indication)
 
Управление жизненным циклом
Закрытие соединения
close($data = null, $raw = false) {
    // 1. Проверка состояния
    if ($this->status === STATUS_CLOSING || STATUS_CLOSED) {
        return;
    }
    
    // 2. Отправка данных перед закрытием (если есть)
    if ($data !== null) {
        $this->send($data, $raw);
    }
    
    // 3. Установка статуса закрытия
    $this->status = STATUS_CLOSING;
    
    // 4. Если буфер отправки пуст - закрываем сразу
    if ($this->sendBuffer === '') {
        $this->destroy();
    }
    // Иначе - destroy() вызовется после baseWrite()
}
Освобождение ресурсов (destroy)
destroy() {
    // 1. Проверка повторного вызова
    if ($this->status === STATUS_CLOSED) {
        return;
    }
    
    // 2. Отключение от Event Loop
    $this->eventLoop->offReadable($this->socket);
    $this->eventLoop->offWritable($this->socket);
    
    // 3. Закрытие сокета
    @fclose($this->socket);
    
    // 4. Установка статуса
    $this->status = STATUS_CLOSED;
    
    // 5. Вызов onClose callback
    if ($this->onClose) {
        ($this->onClose)($this);
    }
    
    // 6. Очистка данных
    $this->sendBuffer = '';
    $this->recvBuffer = '';
    $this->currentPackageLength = 0;
    
    // 7. Удаление из Server
    unset($this->server->connections[$this->realId]);
    unset(TcpConnection::$connections[$this->id]);
    
    // 8. Обнуление ссылок для GC
    $this->onMessage = null;
    $this->onClose = null;
    $this->eventLoop = null;
    $this->server = null;
}
Статистика соединений
Сбор метрик
ConnectionInterface::$statistics = [
    'connection_count' => 0,  // Общее количество соединений
    'total_request' => 0,     // Общее количество запросов
    'throw_exception' => 0,   // Количество исключений
    'send_fail' => 0,         // Неудачных отправок
];
Счетчики на соединение
TcpConnection {
    $bytesRead      // Байт прочитано
    $bytesWritten   // Байт записано
}
Оптимизации
1. Кеширование запросов
Для оптимизации парсинга повторяющихся запросов:
static $requests = [];
// Для запросов меньше MAX_CACHE_STRING_LENGTH
if (!isset($buffer[TcpConnection::MAX_CACHE_STRING_LENGTH])) {
    if (isset($requests[$buffer])) {
        // Используем кешированный запрос
        $request = clone $requests[$buffer];
    } else {
        // Парсим и кешируем
        $request = new Request($buffer);
        $requests[$buffer] = $request;
        
        // LRU: удаляем старые при переполнении
        if (count($requests) > MAX_CACHE_SIZE) {
            unset($requests[key($requests)]);
        }
    }
}
2. Оптимизация памяти
- Минимизация копирования данных
 - Переиспользование объектов
 - Эффективные структуры данных для буферов
 
3. Zero-copy операции
Где возможно, используются ссылки вместо копирования данных:
// Вместо копирования буфера
$package = &$this->recvBuffer; // Ссылка
// Обработка
$this->recvBuffer = substr($this->recvBuffer, $length); // Только при необходимости
Паттерны использования
1. Pipe соединений
Перенаправление данных между соединениями:
$clientConnection->pipe($targetConnection);
// Все данные от клиента автоматически отправляются в target
2. Graceful shutdown
Плавное закрытие соединений:
// Устанавливаем статус закрытия
$connection->status = STATUS_CLOSING;
// Отправляем оставшиеся данные
while ($connection->sendBuffer !== '') {
    // Event Loop обработает отправку через baseWrite
}
// После отправки всех данных - закрываем
$connection->destroy();
3. Таймауты
$timeoutId = Timer::delay(30.0, function() use ($connection) {
    $connection->close('Timeout');
});
$connection->onMessage = function($conn, $data) use ($timeoutId) {
    // Сброс таймаута при получении данных
    Timer::del($timeoutId);
    Timer::delay(30.0, function() use ($conn) {
        $conn->close('Timeout');
    });
};

