Interface Contract
<?php namespace David\App\Models; interface Queue { public function push(array $message); public function pop() : array; public function flush(string $queueUrl); public function process(); }
QueueService Class
<?php namespace App\Core\Services; use App\Core\Prelude; use Aws\Sqs\SqsClient; use Mapi\App\Models\Queue; /** * Class QueueService * * @package App\Core\Services */ class QueueService extends \App\Core\Classes\Service implements Queue { /** * QueueConfigs * * @var bool */ private $queueConfigs = false; /** * DbConnections * * @var bool */ private $dbConnections = false; /** * SqsClient * * @var SqsClient|bool */ private $sqsClient = false; /** * GetSqsClient * * @return SqsClient|bool|static */ public function getSqsClient() { return $this->sqsClient; } /** * SetSqsClient * * @param SqsClient|bool|static $sqsClient */ public function setSqsClient($sqsClient) { $this->sqsClient = $sqsClient; } /** * StorageService constructor. */ public function __construct($queue_name, Prelude $prelude) { if ($queue_name === null) { die( 'QueueService::constructor queue_name does not exist'); } $this->prelude = $prelude; $this->queues = $this->queueConfigs()['queues']; $this->selectedQueue = $this->queues[$queue_name]; $this->queueUrl = $this->selectedQueue['queue_url'] ; $this->sqsCredentials = array( 'region' => $this->selectedQueue['region'], 'version' => $this->selectedQueue['version'], 'credentials' => array( 'key' => $this->selectedQueue['key'], 'secret' =>$this->selectedQueue['secret'], ), ); $this->sqsClient = $this->queueConnect($queue_name); } /** * Storage Configs * * @return bool|mixed */ private function queueConfigs() { if ($this->queueConfigs === false) { $this->queueConfigs = yaml_parse_file(__DIR__.'/../../config/storage.yaml'); } return $this->queueConfigs; } /** * Queue connection * * @param string $schema * @param bool $allowShared */ public function queueConnect() { try { // Instantiate the client $sqsClient = SqsClient::factory($this->sqsCredentials); } catch (Exception $e) { die('Error Connecting to the Queue '.$e->getMessage()); } return $sqsClient; } /** * Push * * @param string $jsonMessage * * @return bool */ public function push(array $params) : bool { try { // Send the message $this->sqsClient->sendMessage( $params ); } catch (Exception $e) { die('Error sending message to queue '.$e->getMessage()); return false; } return true; } public function deleteMessage($result) { $this->sqsClient->deleteMessage( [ 'QueueUrl' => $this->queueUrl, // REQUIRED 'ReceiptHandle' => $result->get('Messages')[0]['ReceiptHandle'] // REQUIRED ] ); } /** * Pop Message * * @return array */ public function pop() : array { $resultMessage = array(); try { $result = $this->sqsClient->receiveMessage( array( 'AttributeNames' => ['SentTimestamp'], 'MaxNumberOfMessages' => 1, 'MessageAttributeNames' => ['All'], 'QueueUrl' => $this->queueUrl, 'WaitTimeSeconds' => 0, ) ); //Logging Pending Queue Messages $this->prelude->log->debug('QueueService:pop COUNT MESSAGES ' .count($result->getPath('Messages'))); if (is_array($result->get('Messages'))) { if (count($result->get('Messages')) > 0) { // $this->prelude->log->debug('QueueService:pop' // .json_encode($result->get('Messages'))); $status = true; $resultMessage = $result->get('Messages')[0]['Body']; //Delete Message $this->deleteMessage($result); } } else { $status = false; $resultMessage = "No messages in queue. \n"; } } catch (Exception $e) { die(' message to queue '.$e->getMessage()); } // Add Debugging SQS QUEUE $this->prelude->log->debug('WORKER BODY MESSAGE '.$resultMessage); return array('status' => $status, 'result' => $resultMessage); }