
Interface Contract
<?php
namespace David\App\Models;
interface Queue
{
public function push(array $message);
public function pop() : array;
public function flush(string $queueUrl);
public function process();
}
QueueService Class
<?php
namespace App\Core\Services;
use App\Core\Prelude;
use Aws\Sqs\SqsClient;
use Mapi\App\Models\Queue;
/**
* Class QueueService
*
* @package App\Core\Services
*/
class QueueService extends \App\Core\Classes\Service implements Queue
{
/**
* QueueConfigs
*
* @var bool
*/
private $queueConfigs = false;
/**
* DbConnections
*
* @var bool
*/
private $dbConnections = false;
/**
* SqsClient
*
* @var SqsClient|bool
*/
private $sqsClient = false;
/**
* GetSqsClient
*
* @return SqsClient|bool|static
*/
public function getSqsClient()
{
return $this->sqsClient;
}
/**
* SetSqsClient
*
* @param SqsClient|bool|static $sqsClient
*/
public function setSqsClient($sqsClient)
{
$this->sqsClient = $sqsClient;
}
/**
* StorageService constructor.
*/
public function __construct($queue_name, Prelude $prelude)
{
if ($queue_name === null) {
die(
'QueueService::constructor queue_name does not exist');
}
$this->prelude = $prelude;
$this->queues = $this->queueConfigs()['queues'];
$this->selectedQueue = $this->queues[$queue_name];
$this->queueUrl = $this->selectedQueue['queue_url'] ;
$this->sqsCredentials = array(
'region' => $this->selectedQueue['region'],
'version' => $this->selectedQueue['version'],
'credentials' => array(
'key' => $this->selectedQueue['key'],
'secret' =>$this->selectedQueue['secret'],
),
);
$this->sqsClient = $this->queueConnect($queue_name);
}
/**
* Storage Configs
*
* @return bool|mixed
*/
private function queueConfigs()
{
if ($this->queueConfigs === false) {
$this->queueConfigs = yaml_parse_file(__DIR__.'/../../config/storage.yaml');
}
return $this->queueConfigs;
}
/**
* Queue connection
*
* @param string $schema
* @param bool $allowShared
*/
public function queueConnect()
{
try {
// Instantiate the client
$sqsClient = SqsClient::factory($this->sqsCredentials);
} catch (Exception $e) {
die('Error Connecting to the Queue '.$e->getMessage());
}
return $sqsClient;
}
/**
* Push
*
* @param string $jsonMessage
*
* @return bool
*/
public function push(array $params) : bool
{
try {
// Send the message
$this->sqsClient->sendMessage(
$params
);
} catch (Exception $e) {
die('Error sending message to queue '.$e->getMessage());
return false;
}
return true;
}
public function deleteMessage($result)
{
$this->sqsClient->deleteMessage(
[
'QueueUrl' => $this->queueUrl, // REQUIRED
'ReceiptHandle' => $result->get('Messages')[0]['ReceiptHandle'] // REQUIRED
]
);
}
/**
* Pop Message
*
* @return array
*/
public function pop() : array
{
$resultMessage = array();
try {
$result = $this->sqsClient->receiveMessage(
array(
'AttributeNames' => ['SentTimestamp'],
'MaxNumberOfMessages' => 1,
'MessageAttributeNames' => ['All'],
'QueueUrl' => $this->queueUrl,
'WaitTimeSeconds' => 0,
)
);
//Logging Pending Queue Messages
$this->prelude->log->debug('QueueService:pop COUNT MESSAGES '
.count($result->getPath('Messages')));
if (is_array($result->get('Messages'))) {
if (count($result->get('Messages')) > 0) {
// $this->prelude->log->debug('QueueService:pop'
// .json_encode($result->get('Messages')));
$status = true;
$resultMessage = $result->get('Messages')[0]['Body'];
//Delete Message
$this->deleteMessage($result);
}
} else {
$status = false;
$resultMessage = "No messages in queue. \n";
}
} catch (Exception $e) {
die(' message to queue '.$e->getMessage());
}
// Add Debugging SQS QUEUE
$this->prelude->log->debug('WORKER BODY MESSAGE '.$resultMessage);
return array('status' => $status, 'result' => $resultMessage);
}