From 0f90ba5b4a382ffd923676316d641b68d17876ce Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 7 Jan 2020 15:44:14 +0100 Subject: [inc/Taskmanager] Switch to new TCP interface --- config.php.example | 2 + inc/taskmanager.inc.php | 111 ++++++++++++++++++++++++++++++++++++------------ 2 files changed, 87 insertions(+), 26 deletions(-) diff --git a/config.php.example b/config.php.example index cb54a05d..455840ff 100644 --- a/config.php.example +++ b/config.php.example @@ -13,6 +13,8 @@ define('CONFIG_SQL_PASS', '%MYSQL_OPENSLX_PASS%'); // Set this to true if you mysql server doesn't default to UTF-8 on new connections define('CONFIG_SQL_FORCE_UTF8', true); +define('CONFIG_TM_PASSWORD', '%TM_OPENSLX_PASS%'); + define('CONFIG_TGZ_LIST_DIR', '/opt/openslx/configs'); define('CONFIG_TFTP_DIR', '/srv/openslx/tftp'); diff --git a/inc/taskmanager.inc.php b/inc/taskmanager.inc.php index 6c8eb056..c8ea9d0c 100644 --- a/inc/taskmanager.inc.php +++ b/inc/taskmanager.inc.php @@ -23,10 +23,20 @@ class Taskmanager { if (self::$sock !== false) return; - self::$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); - socket_set_option(self::$sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => 0, 'usec' => 300000)); - socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 0, 'usec' => 200000)); - socket_connect(self::$sock, '127.0.0.1', 9215); + self::$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + if (self::$sock === false) + return; + socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 1, 'usec' => 0)); + if (socket_connect(self::$sock, '127.0.0.1', 9215) === false) + return; + self::send(CONFIG_TM_PASSWORD); + } + + private static function send($message) + { + $len = strlen($message); + $sent = socket_send(self::$sock, pack('N', $len) . $message, $len + 4, 0); + return $sent === $len + 4; } /** @@ -48,8 +58,7 @@ class Taskmanager $data = json_encode($data); } $message = "$seq, $task, $data"; - $sent = socket_send(self::$sock, $message, strlen($message), 0); - if ($sent != strlen($message)) { + if (!self::send($message)) { self::addErrorMessage(false); return false; } @@ -79,7 +88,8 @@ class Taskmanager self::init(); $seq = (string) mt_rand(); $message = "$seq, status, $task"; - socket_send(self::$sock, $message, strlen($message), 0); + if (!self::send($message)) + return false; $reply = self::readReply($seq); if (!is_array($reply)) return false; @@ -228,37 +238,71 @@ class Taskmanager self::init(); $seq = (string) mt_rand(); $message = "$seq, release, $task"; - socket_send(self::$sock, $message, strlen($message), 0); + self::send($message); } /** * Read reply from socket for given sequence number. * * @param string $seq - * @return mixed the decoded json data for that message as an array, or null on error + * @return mixed the decoded json data for that message as an array, or false on error */ - private static function readReply($seq) + private static function readReply(string $seq) { $tries = 0; - while (($bytes = @socket_recvfrom(self::$sock, $buf, 90000, 0, $bla1, $bla2)) !== false || socket_last_error() === 11) { - $parts = explode(',', $buf, 2); - // Do we have compressed data? - if (substr($parts[0], 0, 3) === '+z:') { - $parts[0] = substr($parts[0], 3); - $gz = true; - } else { - $gz = false; + $deadline = microtime(true) + 2; + self::updateRecvTimeout($deadline); + while (($bytes = socket_recv(self::$sock, $buf, 4, MSG_WAITALL)) !== false) { + if ($bytes !== 4) { + error_log('TM: Short read'); + return false; + } + $len = unpack('Nx', $buf)['x']; + if ($len < 0 || $len > 1024 * 1024) { + error_log('TM: Invalid payload length: ' . $len); + return false; + } + $message = ''; + while ($len > 0) { + self::updateRecvTimeout($deadline); + $ret = socket_recv(self::$sock, $buf, $len, 0); + if ($ret === false) { + error_log('TM: Error reading payload'); + return false; + } + if ($ret <= 0) { + error_log('TM: Taskmanager closed connection'); + return false; + } + $message .= $buf; + $len -= $ret; } - // See if it's our message - if (count($parts) === 2 && $parts[0] === $seq) { - if ($gz) { - $parts[1] = gzinflate($parts[1]); - if ($parts[1] === false) { - error_log('Taskmanager: Invalid deflate data received'); - continue; + $parts = explode(',', $message, 2); + if (count($parts) !== 2) { + error_log('TM: Invalid reply, no "," in payload'); + } elseif ($parts[0] === 'ERROR') { + Util::traceError('Taskmanager remote error: ' . $parts[1]); + } elseif ($parts[0] === 'WARNING') { + Message::addWarning('main.taskmanager-warning', $parts[1]); + } else { + // Do we have compressed data? + if (substr($parts[0], 0, 3) === '+z:') { + $parts[0] = substr($parts[0], 3); + $gz = true; + } else { + $gz = false; + } + // See if it's our message + if ($parts[0] === $seq) { + if ($gz) { + $parts[1] = gzinflate($parts[1]); + if ($parts[1] === false) { + error_log('TM: Invalid deflate data received'); + continue; + } } + return json_decode($parts[1], true); } - return json_decode($parts[1], true); } if (++$tries > 10) return false; @@ -267,4 +311,19 @@ class Taskmanager return false; } + /** + * @param float $deadline end time + */ + private static function updateRecvTimeout($deadline) + { + $to = $deadline - microtime(true); + if ($to <= 0) { + $to = ['sec' => 0, 'usec' => 1]; + } else { + $s = (int)$to; + $to = ['sec' => $s, 'usec' => (int)(($to - $s) * 1000000)]; + } + socket_set_option(self::$sock, SOL_SOCKET, SO_RCVTIMEO, $to); + } + } -- cgit v1.2.3-55-g7522