summaryrefslogblamecommitdiffstats
path: root/inc/taskmanager.inc.php
blob: f7c72e04d1ef0f7901cd783a7cb6d5cb6ca121fc (plain) (tree)
1
2
3
4
5
6
7
8






                                          
 






                                                    



                                                                  
                                     
 

                                      

                                          












                                                                                                        



                                       

         





                                                                                                                  
                                                                                                                
                                                                                                                        
           
                                                                           

                             
                                          




                                                   
                                                
                                            
                                                     

                                     

                                    
                                               
                                                                                                                                                                           
                                                      

                                     


                              



                                                    
                                                                                       

                                            
         




                                                            
                             
                                          
                                                     

                                          
                                               

                                      

                              
 
           

                                                                                           
                                                                 

                                                                                          
          
                                                                                                 

                                                               
                                            
         

                                     


                                                    

                                                                                                          


           

                                                
                                                     
                                                                                
                                                                                        
           
                                                                   
         
                                                            
                                                                                                                                        






                                                           
                              
                                                              

                                                                        
                                                      

                                                          
                                                                                                                                            


                                             
                 
                                                                                                                    
                                             
                 


                               
           

                                                                      
          
                                                                                  

                                                                                                       

                                              
                                                                                           
                                    
                                                                                                                                                                                    


                                    
 










                                                                                                                                      
                                                                                                                              



                                    















                                                                                                                                           




                                                     
                                                                            




                                                  
                                                                     


                                                    
                                                                                                                        

                               
                                                                          
         
 


                                                                                                                         
                                                                                                                          


                                             
         




                                                            
                             
                                          
                                                  
                                     
         
 
           


                                                            
                                                                                              
           
                                                      

                           




                                                                                             
                                              




                                                                                 
                                              







                                                                               
                                                      



                                                                                       
                                                      



                                                     
                         






















                                                                                                       
                                         
                                                                            
                                 
                         

                                             
                 
                                                                                           


                             
           













                                                        













                                                                                   
 
<?php

/**
 * Interface to the external task manager.
 */
class Taskmanager
{

	const NO_SUCH_TASK = 'NO_SUCH_TASK';
	const TASK_FINISHED = 'TASK_FINISHED';
	const TASK_ERROR = 'TASK_ERROR';
	const TASK_WAITING = 'TASK_WAITING';
	const NO_SUCH_INSTANCE = 'NO_SUCH_INSTANCE';
	const TASK_PROCESSING = 'TASK_PROCESSING';

	/**
	 * UDP socket used for communication with the task manager
	 * @var resource
	 */
	private static $sock = false;

	private static function init()
	{
		if (self::$sock !== false)
			return;
		self::$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
		if (self::$sock === false)
			return;
		socket_set_option(self::$sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 1, 'usec' => 0));
		if (socket_connect(self::$sock, '127.0.0.1', 9215) === false)
			return;
		self::send(CONFIG_TM_PASSWORD);
	}

	private static function send($message)
	{
		$len = strlen($message);
		$sent = socket_send(self::$sock, pack('N', $len) . $message, $len + 4, 0);
		if ($sent === $len + 4)
			return true;
		self::reset();
		return false;
	}

	/**
	 * Start a task via the task manager.
	 *
	 * @param string $task name of task to start
	 * @param array $data data to pass to the task. the structure depends on the task.
	 * @param boolean $async if true, the function will not wait for the reply of the taskmanager, which means
	 * 		the return value is just true (and you won't know if the task could actually be started)
	 * @return array|false struct representing the task status (as a result of submit); false on communication error
	 */
	public static function submit($task, $data = false, $async = false)
	{
		self::init();
		$seq = (string) mt_rand();
		if (empty($data)) {
			$data = '{}';
		} else {
			$data = json_encode($data);
		}
		$message = "$seq, $task, $data";
		if (!self::send($message)) {
			self::addErrorMessage(false);
			return false;
		}
		if ($async)
			return true;
		$reply = self::readReply($seq);
		if ($reply === false || !is_array($reply) || !isset($reply['id']) || (isset($reply['statusCode']) && $reply['statusCode'] === Taskmanager::NO_SUCH_TASK)) {
			self::addErrorMessage($reply);
			return false;
		}
		return $reply;
	}

	/**
	 * Query status of given task.
	 *
	 * @param mixed $task task id or task struct
	 * @return array|false status of task as array, or false on communication error
	 */
	public static function status($task)
	{
		if (is_array($task) && isset($task['id'])) {
			$task = $task['id'];
		}
		if (!is_string($task))
			return false;
		self::init();
		$seq = (string) mt_rand();
		$message = "$seq,     status, $task";
		if (!self::send($message))
			return false;
		$reply = self::readReply($seq);
		if (!is_array($reply))
			return false;
		return $reply;
	}

	/**
	 * Checks whether the given task id corresponds to a known task in the taskmanager.
	 * Returns true iff the taskmanager is reachable and the status of the task
	 * is different from Taskmanager::NO_SUCH_INSTANCE/_TASK.
	 * If you pass an array it is assumed that it was already queried and is evaluated
	 * directly.
	 *
	 * @param string|array $taskid a task id or a task array returned by ::status or ::submit
	 * @return boolean true if taskid exists in taskmanager
	 */
	public static function isTask($task)
	{
		if ($task === false)
			return false;
		if (is_string($task)) {
			$task = self::status($task);
		}
		return isset($task['statusCode']) && $task['statusCode'] !== Taskmanager::NO_SUCH_INSTANCE
			&& $task['statusCode'] !== Taskmanager::NO_SUCH_TASK;
	}

	/**
	 * Wait for the given task's completion.
	 *
	 * @param string|array $task task to wait for
	 * @param int $timeout maximum time in ms to wait for completion of task
	 * @return array|false result/status of task, or false if it couldn't be queried
	 */
	public static function waitComplete($task, $timeout = 2500)
	{
		if (is_array($task) && isset($task['id'])) {
			if ($task['statusCode'] !== Taskmanager::TASK_PROCESSING && $task['statusCode'] !== Taskmanager::TASK_WAITING) {
				self::release($task['id']);
				return $task;
			}
			$task = $task['id'];
		}
		if (!is_string($task))
			return false;
		$done = false;
		$deadline = microtime(true) + $timeout / 1000;
		while (($remaining = $deadline - microtime(true)) > 0) {
			usleep(min(100000, $remaining * 100000));
			$status = self::status($task);
			if (!isset($status['statusCode']))
				break;
			if ($status['statusCode'] !== Taskmanager::TASK_PROCESSING && $status['statusCode'] !== Taskmanager::TASK_WAITING) {
				$done = true;
				break;
			}
		}
		if ($done) { // For now we do this unconditionally, but maybe we want to keep them longer some time?
			self::release($task);
		}
		return $status;
	}

	/**
	 * Check whether the given task can be considered failed. This
	 * includes that the task id is invalid, etc.
	 *
	 * @param array|false $task struct representing task, obtained by ::status
	 * @return boolean true if task failed, false if finished successfully or still waiting/running
	 */
	public static function isFailed($task)
	{
		if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id']))
			return true;
		if ($task['statusCode'] !== Taskmanager::TASK_WAITING && $task['statusCode'] !== Taskmanager::TASK_PROCESSING && $task['statusCode'] !== Taskmanager::TASK_FINISHED)
			return true;
		return false;
	}

	/**
	 * Check whether the given task is finished, i.e. either failed or succeeded,
	 * but is not running, still waiting for execution or simply unknown.
	 *
	 * @param array $task struct representing task, obtained by ::status
	 * @return boolean true if task failed or finished, false if waiting for execution or currently executing, no valid task, etc.
	 */
	public static function isFinished($task)
	{
		if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id']))
			return false;
		if ($task['statusCode'] !== Taskmanager::TASK_WAITING && $task['statusCode'] !== Taskmanager::TASK_PROCESSING)
			return true;
		return false;
	}

	/**
	 * Check whether the given task is running, that is either waiting for execution
	 * or currently executing.
	 *
	 * @param array $task struct representing task, obtained by ::status
	 * @return boolean true if task is waiting or executing, false if waiting for execution or currently executing, no valid task, etc.
	 */
	public static function isRunning($task)
	{
		if (!is_array($task) || !isset($task['statusCode']) || !isset($task['id']))
			return false;
		if ($task['statusCode'] === Taskmanager::TASK_WAITING || $task['statusCode'] === Taskmanager::TASK_PROCESSING)
			return true;
		return false;
	}

	public static function addErrorMessage($task)
	{
		static $failure = false;
		if ($task === false) {
			if (!$failure) {
				Message::addError('main.taskmanager-error');
				$failure = true;
			}
			return;
		}
		if (!isset($task['statusCode'])) {
			Message::addError('main.taskmanager-format');
			return;
		}
		if (isset($task['data']['error'])) {
			Message::addError('main.task-error', $task['statusCode'] . ' (' . $task['data']['error'] . ')');
			return;
		}
		Message::addError('main.task-error', $task['statusCode']);
	}

	/**
	 * Release a given task from the task manager, so it won't keep the result anymore in case it's finished running.
	 *
	 * @param string|array $task task to release. can either be its id, or a struct representing the task, as returned
	 * by ::submit() or ::status()
	 */
	public static function release($task)
	{
		if (is_array($task) && isset($task['id'])) {
			$task = $task['id'];
		}
		if (!is_string($task))
			return;
		self::init();
		$seq = (string) mt_rand();
		$message = "$seq, release, $task";
		self::send($message);
	}

	/**
	 * Read reply from socket for given sequence number.
	 *
	 * @param string $seq
	 * @return mixed the decoded json data for that message as an array, or false on error
	 */
	private static function readReply(string $seq)
	{
		$tries = 0;
		$deadline = microtime(true) + 2;
		self::updateRecvTimeout($deadline);
		while (($bytes = socket_recv(self::$sock, $buf, 4, MSG_WAITALL)) !== false) {
			if ($bytes !== 4) {
				error_log('TM: Short read');
				self::reset();
				return false;
			}
			$len = unpack('Nx', $buf)['x'];
			if ($len < 0 || $len > 1024 * 1024) {
				error_log('TM: Invalid payload length: ' . $len);
				self::reset();
				return false;
			}
			$message = '';
			while ($len > 0) {
				self::updateRecvTimeout($deadline);
				$ret = socket_recv(self::$sock, $buf, $len, 0);
				if ($ret === false) {
					error_log('TM: Error reading payload');
					self::reset();
					return false;
				}
				if ($ret <= 0) {
					error_log('TM: Taskmanager closed connection');
					self::reset();
					return false;
				}
				$message .= $buf;
				$len -= $ret;
			}
			$parts = explode(',', $message, 2);
			if (count($parts) !== 2) {
				error_log('TM: Invalid reply, no "," in payload');
			} elseif ($parts[0] === 'ERROR') {
				Util::traceError('Taskmanager remote error: ' . $parts[1]);
			} elseif ($parts[0] === 'WARNING') {
				Message::addWarning('main.taskmanager-warning', $parts[1]);
			} else {
				// Do we have compressed data?
				if (substr($parts[0], 0, 3) === '+z:') {
					$parts[0] = substr($parts[0], 3);
					$gz = true;
				} else {
					$gz = false;
				}
				// See if it's our message
				if ($parts[0] === $seq) {
					if ($gz) {
						$parts[1] = gzinflate($parts[1]);
						if ($parts[1] === false) {
							error_log('TM: Invalid deflate data received');
							continue;
						}
					}
					return json_decode($parts[1], true);
				}
			}
			if (++$tries > 10)
				return false;
		}
		error_log('TM: Reading reply failed, socket error ' . socket_last_error());
		return false;
	}

	/**
	 * Closes connection and resets the variable.
	 * Should be called if something goes wrong when
	 * sending or receiving and the send or receive
	 * buffer might be in an undefined state.
	 */
	private static function reset()
	{
		if (self::$sock === false)
			return;
		socket_close(self::$sock);
		self::$sock = false;
	}

	/**
	 * @param float $deadline end time
	 */
	private static function updateRecvTimeout($deadline)
	{
		$to = $deadline - microtime(true);
		if ($to <= 0) {
			$to = ['sec' => 0, 'usec' => 1];
		} else {
			$s = (int)$to;
			$to = ['sec' => $s, 'usec' => (int)(($to - $s) * 1000000)];
		}
		socket_set_option(self::$sock, SOL_SOCKET, SO_RCVTIMEO, $to);
	}

}