diff options
Diffstat (limited to 'inc/taskmanager.inc.php')
-rw-r--r-- | inc/taskmanager.inc.php | 170 |
1 files changed, 126 insertions, 44 deletions
diff --git a/inc/taskmanager.inc.php b/inc/taskmanager.inc.php index 547a75d4..d9396901 100644 --- a/inc/taskmanager.inc.php +++ b/inc/taskmanager.inc.php @@ -1,5 +1,7 @@ <?php +declare(strict_types=1); + /** * Interface to the external task manager. */ @@ -19,14 +21,27 @@ class Taskmanager */ private static $sock = false; - private static function init() + private static function init(): void { if (self::$sock !== false) return; - self::$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); - socket_set_option(self::$sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => 0, 'usec' => 300000)); - socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 0, 'usec' => 200000)); - socket_connect(self::$sock, '127.0.0.1', 9215); + self::$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + if (self::$sock === false) + return; + socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 1, 'usec' => 0)); + if (socket_connect(self::$sock, '127.0.0.1', 9215) === false) + return; + self::send(CONFIG_TM_PASSWORD); + } + + private static function send(string $message): bool + { + $len = strlen($message); + $sent = socket_send(self::$sock, pack('N', $len) . $message, $len + 4, 0); + if ($sent === $len + 4) + return true; + self::reset(); + return false; } /** @@ -35,10 +50,10 @@ class Taskmanager * @param string $task name of task to start * @param array $data data to pass to the task. the structure depends on the task. * @param boolean $async if true, the function will not wait for the reply of the taskmanager, which means - * the return value is just true (and you won't know if the task could acutally be started) - * @return array|false struct representing the task status (as a result of submit); false on communication error + * the return value is just true (and you won't know if the task could actually be started) + * @return array{id: string, statusCode: string, data: array}|bool struct representing the task status (as a result of submit); false on communication error */ - public static function submit($task, $data = false, $async = false) + public static function submit(string $task, array $data = null, bool $async = false) { self::init(); $seq = (string) mt_rand(); @@ -48,8 +63,7 @@ class Taskmanager $data = json_encode($data); } $message = "$seq, $task, $data"; - $sent = socket_send(self::$sock, $message, strlen($message), 0); - if ($sent != strlen($message)) { + if (!self::send($message)) { self::addErrorMessage(false); return false; } @@ -79,7 +93,8 @@ class Taskmanager self::init(); $seq = (string) mt_rand(); $message = "$seq, status, $task"; - socket_send(self::$sock, $message, strlen($message), 0); + if (!self::send($message)) + return false; $reply = self::readReply($seq); if (!is_array($reply)) return false; @@ -96,7 +111,7 @@ class Taskmanager * @param string|array $taskid a task id or a task array returned by ::status or ::submit * @return boolean true if taskid exists in taskmanager */ - public static function isTask($task) + public static function isTask($task): bool { if ($task === false) return false; @@ -114,7 +129,7 @@ class Taskmanager * @param int $timeout maximum time in ms to wait for completion of task * @return array|false result/status of task, or false if it couldn't be queried */ - public static function waitComplete($task, $timeout = 2500) + public static function waitComplete($task, int $timeout = 2500) { if (is_array($task) && isset($task['id'])) { if ($task['statusCode'] !== Taskmanager::TASK_PROCESSING && $task['statusCode'] !== Taskmanager::TASK_WAITING) { @@ -127,7 +142,9 @@ class Taskmanager return false; $done = false; $deadline = microtime(true) + $timeout / 1000; - do { + $status = false; + while (($remaining = $deadline - microtime(true)) > 0) { + usleep((int)min(100000, $remaining * 100000)); $status = self::status($task); if (!isset($status['statusCode'])) break; @@ -135,8 +152,7 @@ class Taskmanager $done = true; break; } - usleep(100000); - } while (microtime(true) < $deadline); + } if ($done) { // For now we do this unconditionally, but maybe we want to keep them longer some time? self::release($task); } @@ -150,7 +166,7 @@ class Taskmanager * @param array|false $task struct representing task, obtained by ::status * @return boolean true if task failed, false if finished successfully or still waiting/running */ - public static function isFailed($task) + public static function isFailed($task): bool { if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id'])) return true; @@ -163,10 +179,10 @@ class Taskmanager * Check whether the given task is finished, i.e. either failed or succeeded, * but is not running, still waiting for execution or simply unknown. * - * @param array $task struct representing task, obtained by ::status + * @param mixed $task struct representing task, obtained by ::status * @return boolean true if task failed or finished, false if waiting for execution or currently executing, no valid task, etc. */ - public static function isFinished($task) + public static function isFinished($task): bool { if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id'])) return false; @@ -179,10 +195,10 @@ class Taskmanager * Check whether the given task is running, that is either waiting for execution * or currently executing. * - * @param array $task struct representing task, obtained by ::status + * @param mixed $task struct representing task, obtained by ::status * @return boolean true if task is waiting or executing, false if waiting for execution or currently executing, no valid task, etc. */ - public static function isRunning($task) + public static function isRunning($task): bool { if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id'])) return false; @@ -191,7 +207,7 @@ class Taskmanager return false; } - public static function addErrorMessage($task) + public static function addErrorMessage($task): void { static $failure = false; if ($task === false) { @@ -218,7 +234,7 @@ class Taskmanager * @param string|array $task task to release. can either be its id, or a struct representing the task, as returned * by ::submit() or ::status() */ - public static function release($task) + public static function release($task): void { if (is_array($task) && isset($task['id'])) { $task = $task['id']; @@ -228,43 +244,109 @@ class Taskmanager self::init(); $seq = (string) mt_rand(); $message = "$seq, release, $task"; - socket_send(self::$sock, $message, strlen($message), 0); + self::send($message); } /** * Read reply from socket for given sequence number. * - * @param string $seq - * @return mixed the decoded json data for that message as an array, or null on error + * @return mixed the decoded json data for that message as an array, or false on error */ - private static function readReply($seq) + private static function readReply(string $seq) { $tries = 0; - while (($bytes = @socket_recvfrom(self::$sock, $buf, 90000, 0, $bla1, $bla2)) !== false || socket_last_error() === 11) { - $parts = explode(',', $buf, 2); - // Do we have compressed data? - if (substr($parts[0], 0, 3) === '+z:') { - $parts[0] = substr($parts[0], 3); - $gz = true; - } else { - $gz = false; + $deadline = microtime(true) + 2; + self::updateRecvTimeout($deadline); + while (($bytes = socket_recv(self::$sock, $buf, 4, MSG_WAITALL)) !== false) { + if ($bytes !== 4) { + error_log('TM: Short read'); + self::reset(); + return false; + } + $len = unpack('Nx', $buf)['x']; + if ($len < 0 || $len > 1024 * 1024) { + error_log('TM: Invalid payload length: ' . $len); + self::reset(); + return false; + } + $message = ''; + while ($len > 0) { + self::updateRecvTimeout($deadline); + $ret = socket_recv(self::$sock, $buf, $len, 0); + if ($ret === false) { + error_log('TM: Error reading payload'); + self::reset(); + return false; + } + if ($ret <= 0) { + error_log('TM: Taskmanager closed connection'); + self::reset(); + return false; + } + $message .= $buf; + $len -= $ret; } - // See if it's our message - if (count($parts) === 2 && $parts[0] === $seq) { - if ($gz) { - $parts[1] = gzinflate($parts[1]); - if ($parts[1] === false) { - error_log('Taskmanager: Invalid deflate data received'); - continue; + $parts = explode(',', $message, 2); + if (count($parts) !== 2) { + error_log('TM: Invalid reply, no "," in payload'); + } elseif ($parts[0] === 'ERROR') { + ErrorHandler::traceError('Taskmanager remote error: ' . $parts[1]); + } elseif ($parts[0] === 'WARNING') { + Message::addWarning('main.taskmanager-warning', $parts[1]); + } else { + // Do we have compressed data? + if (substr($parts[0], 0, 3) === '+z:') { + $parts[0] = substr($parts[0], 3); + $gz = true; + } else { + $gz = false; + } + // See if it's our message + if ($parts[0] === $seq) { + if ($gz) { + $parts[1] = gzinflate($parts[1]); + if ($parts[1] === false) { + error_log('TM: Invalid deflate data received'); + continue; + } } + return json_decode($parts[1], true); } - return json_decode($parts[1], true); } if (++$tries > 10) return false; } - error_log('Reading taskmanager reply failed, socket error ' . socket_last_error()); + error_log('TM: Reading reply failed, socket error ' . socket_last_error()); return false; } + /** + * Closes connection and resets the variable. + * Should be called if something goes wrong when + * sending or receiving and the send or receive + * buffer might be in an undefined state. + */ + private static function reset(): void + { + if (self::$sock === false) + return; + socket_close(self::$sock); + self::$sock = false; + } + + /** + * @param float $deadline end time + */ + private static function updateRecvTimeout(float $deadline): void + { + $to = $deadline - microtime(true); + if ($to <= 0) { + $to = ['sec' => 0, 'usec' => 1]; + } else { + $s = (int)$to; + $to = ['sec' => $s, 'usec' => (int)(($to - $s) * 1000000)]; + } + socket_set_option(self::$sock, SOL_SOCKET, SO_RCVTIMEO, $to); + } + } |