summaryrefslogtreecommitdiffstats
path: root/inc/taskmanager.inc.php
diff options
context:
space:
mode:
authorSimon Rettberg2014-05-15 18:28:24 +0200
committerSimon Rettberg2014-05-15 18:28:24 +0200
commit63c0cf521f8097b0dadaf1228176dc38c7d897f6 (patch)
tree83f5da6dc130ac7db575b0eee41ed6c7a2f994fb /inc/taskmanager.inc.php
parentFix handle leak in downloading, better error reporting on failed downloads, a... (diff)
downloadslx-admin-63c0cf521f8097b0dadaf1228176dc38c7d897f6.tar.gz
slx-admin-63c0cf521f8097b0dadaf1228176dc38c7d897f6.tar.xz
slx-admin-63c0cf521f8097b0dadaf1228176dc38c7d897f6.zip
Working on config.tgz composition through config modules
Diffstat (limited to 'inc/taskmanager.inc.php')
-rw-r--r--inc/taskmanager.inc.php86
1 files changed, 86 insertions, 0 deletions
diff --git a/inc/taskmanager.inc.php b/inc/taskmanager.inc.php
new file mode 100644
index 00000000..f2f337be
--- /dev/null
+++ b/inc/taskmanager.inc.php
@@ -0,0 +1,86 @@
+<?php
+
+/**
+ * Interface to the external task manager.
+ */
+class Taskmanager
+{
+
+ private static $sock = false;
+
+ private static function init()
+ {
+ if (self::$sock !== false) return;
+ self::$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
+ socket_set_option(self::$sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => 0, 'usec' => 100000));
+ socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 0, 'usec' => 100000));
+ socket_connect(self::$sock, '127.0.0.1', 9215);
+ }
+
+ public static function submit($task, $data, $async)
+ {
+ self::init();
+ $seq = (string)mt_rand();
+ $data = json_encode($data);
+ $message = "$seq, $task, $data";
+ $sent = socket_send(self::$sock, $message, strlen($message), 0);
+ if ($async) return true;
+ $reply = self::readReply($seq);
+ if (!is_array($reply)) return false;
+ return $reply;
+ }
+
+ public static function status($taskId)
+ {
+ self::init();
+ $seq = (string)mt_rand();
+ $message = "$seq, status, $taskId";
+ $sent = socket_send(self::$sock, $message, strlen($message), 0);
+ $reply = self::readReply($seq);
+ if (!is_array($reply)) return false;
+ return $reply;
+ }
+
+ public static function waitComplete($taskId)
+ {
+ for ($i = 0; $i < 10; ++$i) {
+ $status = self::status($taskId);
+ if (!isset($status['statusCode'])) break;
+ if ($status['statusCode'] != TASK_PROCESSING && $status['statusCode'] != TASK_WAITING) break;
+ usleep(150000);
+ }
+ return $status;
+ }
+
+ public static function release($taskId)
+ {
+ self::init();
+ $seq = (string)mt_rand();
+ $message = "$seq, release, $taskId";
+ socket_send(self::$sock, $message, strlen($message), 0);
+ }
+
+ /**
+ *
+ * @param type $seq
+ * @return mixed the decoded json data for that message as an array, or null on error
+ */
+ private static function readReply($seq)
+ {
+ $tries = 0;
+ while (($bytes = socket_recvfrom(self::$sock, $buf, 90000, 0, $bla1, $bla2)) !== false) {
+ $parts = explode(',', $buf, 2);
+ if (count($parts) == 2 && $parts[0] == $seq) {
+ return json_decode($parts[1], true);
+ }
+ if (++$tries > 10) return false;
+ }
+ //error_log(socket_strerror(socket_last_error(self::$sock)));
+ return false;
+ }
+
+}
+
+foreach (array('TASK_FINISHED', 'TASK_ERROR', 'TASK_WAITING', 'NO_SUCH_TASK', 'TASK_PROCESSING') as $i) {
+ define($i, $i);
+}