From 63c0cf521f8097b0dadaf1228176dc38c7d897f6 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 15 May 2014 18:28:24 +0200 Subject: Working on config.tgz composition through config modules --- inc/taskmanager.inc.php | 86 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 inc/taskmanager.inc.php (limited to 'inc/taskmanager.inc.php') diff --git a/inc/taskmanager.inc.php b/inc/taskmanager.inc.php new file mode 100644 index 00000000..f2f337be --- /dev/null +++ b/inc/taskmanager.inc.php @@ -0,0 +1,86 @@ + 0, 'usec' => 100000)); + socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 0, 'usec' => 100000)); + socket_connect(self::$sock, '127.0.0.1', 9215); + } + + public static function submit($task, $data, $async) + { + self::init(); + $seq = (string)mt_rand(); + $data = json_encode($data); + $message = "$seq, $task, $data"; + $sent = socket_send(self::$sock, $message, strlen($message), 0); + if ($async) return true; + $reply = self::readReply($seq); + if (!is_array($reply)) return false; + return $reply; + } + + public static function status($taskId) + { + self::init(); + $seq = (string)mt_rand(); + $message = "$seq, status, $taskId"; + $sent = socket_send(self::$sock, $message, strlen($message), 0); + $reply = self::readReply($seq); + if (!is_array($reply)) return false; + return $reply; + } + + public static function waitComplete($taskId) + { + for ($i = 0; $i < 10; ++$i) { + $status = self::status($taskId); + if (!isset($status['statusCode'])) break; + if ($status['statusCode'] != TASK_PROCESSING && $status['statusCode'] != TASK_WAITING) break; + usleep(150000); + } + return $status; + } + + public static function release($taskId) + { + self::init(); + $seq = (string)mt_rand(); + $message = "$seq, release, $taskId"; + socket_send(self::$sock, $message, strlen($message), 0); + } + + /** + * + * @param type $seq + * @return mixed the decoded json data for that message as an array, or null on error + */ + private static function readReply($seq) + { + $tries = 0; + while (($bytes = socket_recvfrom(self::$sock, $buf, 90000, 0, $bla1, $bla2)) !== false) { + $parts = explode(',', $buf, 2); + if (count($parts) == 2 && $parts[0] == $seq) { + return json_decode($parts[1], true); + } + if (++$tries > 10) return false; + } + //error_log(socket_strerror(socket_last_error(self::$sock))); + return false; + } + +} + +foreach (array('TASK_FINISHED', 'TASK_ERROR', 'TASK_WAITING', 'NO_SUCH_TASK', 'TASK_PROCESSING') as $i) { + define($i, $i); +} -- cgit v1.2.3-55-g7522