summaryrefslogtreecommitdiffstats
path: root/inc/taskmanager.inc.php
diff options
context:
space:
mode:
Diffstat (limited to 'inc/taskmanager.inc.php')
-rw-r--r--inc/taskmanager.inc.php170
1 files changed, 126 insertions, 44 deletions
diff --git a/inc/taskmanager.inc.php b/inc/taskmanager.inc.php
index 547a75d4..d9396901 100644
--- a/inc/taskmanager.inc.php
+++ b/inc/taskmanager.inc.php
@@ -1,5 +1,7 @@
<?php
+declare(strict_types=1);
+
/**
* Interface to the external task manager.
*/
@@ -19,14 +21,27 @@ class Taskmanager
*/
private static $sock = false;
- private static function init()
+ private static function init(): void
{
if (self::$sock !== false)
return;
- self::$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
- socket_set_option(self::$sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => 0, 'usec' => 300000));
- socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 0, 'usec' => 200000));
- socket_connect(self::$sock, '127.0.0.1', 9215);
+ self::$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
+ if (self::$sock === false)
+ return;
+ socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 1, 'usec' => 0));
+ if (socket_connect(self::$sock, '127.0.0.1', 9215) === false)
+ return;
+ self::send(CONFIG_TM_PASSWORD);
+ }
+
+ private static function send(string $message): bool
+ {
+ $len = strlen($message);
+ $sent = socket_send(self::$sock, pack('N', $len) . $message, $len + 4, 0);
+ if ($sent === $len + 4)
+ return true;
+ self::reset();
+ return false;
}
/**
@@ -35,10 +50,10 @@ class Taskmanager
* @param string $task name of task to start
* @param array $data data to pass to the task. the structure depends on the task.
* @param boolean $async if true, the function will not wait for the reply of the taskmanager, which means
- * the return value is just true (and you won't know if the task could acutally be started)
- * @return array|false struct representing the task status (as a result of submit); false on communication error
+ * the return value is just true (and you won't know if the task could actually be started)
+ * @return array{id: string, statusCode: string, data: array}|bool struct representing the task status (as a result of submit); false on communication error
*/
- public static function submit($task, $data = false, $async = false)
+ public static function submit(string $task, array $data = null, bool $async = false)
{
self::init();
$seq = (string) mt_rand();
@@ -48,8 +63,7 @@ class Taskmanager
$data = json_encode($data);
}
$message = "$seq, $task, $data";
- $sent = socket_send(self::$sock, $message, strlen($message), 0);
- if ($sent != strlen($message)) {
+ if (!self::send($message)) {
self::addErrorMessage(false);
return false;
}
@@ -79,7 +93,8 @@ class Taskmanager
self::init();
$seq = (string) mt_rand();
$message = "$seq, status, $task";
- socket_send(self::$sock, $message, strlen($message), 0);
+ if (!self::send($message))
+ return false;
$reply = self::readReply($seq);
if (!is_array($reply))
return false;
@@ -96,7 +111,7 @@ class Taskmanager
* @param string|array $taskid a task id or a task array returned by ::status or ::submit
* @return boolean true if taskid exists in taskmanager
*/
- public static function isTask($task)
+ public static function isTask($task): bool
{
if ($task === false)
return false;
@@ -114,7 +129,7 @@ class Taskmanager
* @param int $timeout maximum time in ms to wait for completion of task
* @return array|false result/status of task, or false if it couldn't be queried
*/
- public static function waitComplete($task, $timeout = 2500)
+ public static function waitComplete($task, int $timeout = 2500)
{
if (is_array($task) && isset($task['id'])) {
if ($task['statusCode'] !== Taskmanager::TASK_PROCESSING && $task['statusCode'] !== Taskmanager::TASK_WAITING) {
@@ -127,7 +142,9 @@ class Taskmanager
return false;
$done = false;
$deadline = microtime(true) + $timeout / 1000;
- do {
+ $status = false;
+ while (($remaining = $deadline - microtime(true)) > 0) {
+ usleep((int)min(100000, $remaining * 100000));
$status = self::status($task);
if (!isset($status['statusCode']))
break;
@@ -135,8 +152,7 @@ class Taskmanager
$done = true;
break;
}
- usleep(100000);
- } while (microtime(true) < $deadline);
+ }
if ($done) { // For now we do this unconditionally, but maybe we want to keep them longer some time?
self::release($task);
}
@@ -150,7 +166,7 @@ class Taskmanager
* @param array|false $task struct representing task, obtained by ::status
* @return boolean true if task failed, false if finished successfully or still waiting/running
*/
- public static function isFailed($task)
+ public static function isFailed($task): bool
{
if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id']))
return true;
@@ -163,10 +179,10 @@ class Taskmanager
* Check whether the given task is finished, i.e. either failed or succeeded,
* but is not running, still waiting for execution or simply unknown.
*
- * @param array $task struct representing task, obtained by ::status
+ * @param mixed $task struct representing task, obtained by ::status
* @return boolean true if task failed or finished, false if waiting for execution or currently executing, no valid task, etc.
*/
- public static function isFinished($task)
+ public static function isFinished($task): bool
{
if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id']))
return false;
@@ -179,10 +195,10 @@ class Taskmanager
* Check whether the given task is running, that is either waiting for execution
* or currently executing.
*
- * @param array $task struct representing task, obtained by ::status
+ * @param mixed $task struct representing task, obtained by ::status
* @return boolean true if task is waiting or executing, false if waiting for execution or currently executing, no valid task, etc.
*/
- public static function isRunning($task)
+ public static function isRunning($task): bool
{
if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id']))
return false;
@@ -191,7 +207,7 @@ class Taskmanager
return false;
}
- public static function addErrorMessage($task)
+ public static function addErrorMessage($task): void
{
static $failure = false;
if ($task === false) {
@@ -218,7 +234,7 @@ class Taskmanager
* @param string|array $task task to release. can either be its id, or a struct representing the task, as returned
* by ::submit() or ::status()
*/
- public static function release($task)
+ public static function release($task): void
{
if (is_array($task) && isset($task['id'])) {
$task = $task['id'];
@@ -228,43 +244,109 @@ class Taskmanager
self::init();
$seq = (string) mt_rand();
$message = "$seq, release, $task";
- socket_send(self::$sock, $message, strlen($message), 0);
+ self::send($message);
}
/**
* Read reply from socket for given sequence number.
*
- * @param string $seq
- * @return mixed the decoded json data for that message as an array, or null on error
+ * @return mixed the decoded json data for that message as an array, or false on error
*/
- private static function readReply($seq)
+ private static function readReply(string $seq)
{
$tries = 0;
- while (($bytes = @socket_recvfrom(self::$sock, $buf, 90000, 0, $bla1, $bla2)) !== false || socket_last_error() === 11) {
- $parts = explode(',', $buf, 2);
- // Do we have compressed data?
- if (substr($parts[0], 0, 3) === '+z:') {
- $parts[0] = substr($parts[0], 3);
- $gz = true;
- } else {
- $gz = false;
+ $deadline = microtime(true) + 2;
+ self::updateRecvTimeout($deadline);
+ while (($bytes = socket_recv(self::$sock, $buf, 4, MSG_WAITALL)) !== false) {
+ if ($bytes !== 4) {
+ error_log('TM: Short read');
+ self::reset();
+ return false;
+ }
+ $len = unpack('Nx', $buf)['x'];
+ if ($len < 0 || $len > 1024 * 1024) {
+ error_log('TM: Invalid payload length: ' . $len);
+ self::reset();
+ return false;
+ }
+ $message = '';
+ while ($len > 0) {
+ self::updateRecvTimeout($deadline);
+ $ret = socket_recv(self::$sock, $buf, $len, 0);
+ if ($ret === false) {
+ error_log('TM: Error reading payload');
+ self::reset();
+ return false;
+ }
+ if ($ret <= 0) {
+ error_log('TM: Taskmanager closed connection');
+ self::reset();
+ return false;
+ }
+ $message .= $buf;
+ $len -= $ret;
}
- // See if it's our message
- if (count($parts) === 2 && $parts[0] === $seq) {
- if ($gz) {
- $parts[1] = gzinflate($parts[1]);
- if ($parts[1] === false) {
- error_log('Taskmanager: Invalid deflate data received');
- continue;
+ $parts = explode(',', $message, 2);
+ if (count($parts) !== 2) {
+ error_log('TM: Invalid reply, no "," in payload');
+ } elseif ($parts[0] === 'ERROR') {
+ ErrorHandler::traceError('Taskmanager remote error: ' . $parts[1]);
+ } elseif ($parts[0] === 'WARNING') {
+ Message::addWarning('main.taskmanager-warning', $parts[1]);
+ } else {
+ // Do we have compressed data?
+ if (substr($parts[0], 0, 3) === '+z:') {
+ $parts[0] = substr($parts[0], 3);
+ $gz = true;
+ } else {
+ $gz = false;
+ }
+ // See if it's our message
+ if ($parts[0] === $seq) {
+ if ($gz) {
+ $parts[1] = gzinflate($parts[1]);
+ if ($parts[1] === false) {
+ error_log('TM: Invalid deflate data received');
+ continue;
+ }
}
+ return json_decode($parts[1], true);
}
- return json_decode($parts[1], true);
}
if (++$tries > 10)
return false;
}
- error_log('Reading taskmanager reply failed, socket error ' . socket_last_error());
+ error_log('TM: Reading reply failed, socket error ' . socket_last_error());
return false;
}
+ /**
+ * Closes connection and resets the variable.
+ * Should be called if something goes wrong when
+ * sending or receiving and the send or receive
+ * buffer might be in an undefined state.
+ */
+ private static function reset(): void
+ {
+ if (self::$sock === false)
+ return;
+ socket_close(self::$sock);
+ self::$sock = false;
+ }
+
+ /**
+ * @param float $deadline end time
+ */
+ private static function updateRecvTimeout(float $deadline): void
+ {
+ $to = $deadline - microtime(true);
+ if ($to <= 0) {
+ $to = ['sec' => 0, 'usec' => 1];
+ } else {
+ $s = (int)$to;
+ $to = ['sec' => $s, 'usec' => (int)(($to - $s) * 1000000)];
+ }
+ socket_set_option(self::$sock, SOL_SOCKET, SO_RCVTIMEO, $to);
+ }
+
}