AWS SQS Library

Image result for aws

Interface Contract

<?php

namespace David\App\Models;

interface Queue
{
    public function push(array $message);
    public function pop() : array;
    public function flush(string $queueUrl);
    public function process();
}

 

QueueService Class

<?php

namespace App\Core\Services;

use App\Core\Prelude;
use Aws\Sqs\SqsClient;
use Mapi\App\Models\Queue;

/**
 * Class QueueService
 *
 * @package App\Core\Services
 */
class QueueService extends \App\Core\Classes\Service implements Queue
{
    /**
     * QueueConfigs
     *
     * @var bool
     */
    private $queueConfigs = false;
    /**
     * DbConnections
     *
     * @var bool
     */
    private $dbConnections = false;
    /**
     * SqsClient
     *
     * @var SqsClient|bool
     */
    private $sqsClient  = false;

    /**
     * GetSqsClient
     *
     * @return SqsClient|bool|static
     */
    public function getSqsClient()
    {
        return $this->sqsClient;
    }

    /**
     * SetSqsClient
     *
     * @param SqsClient|bool|static $sqsClient
     */
    public function setSqsClient($sqsClient)
    {
        $this->sqsClient = $sqsClient;
    }

    /**
     * StorageService constructor.
     */
    public function __construct($queue_name, Prelude $prelude)
    {
        if ($queue_name === null) {
            die(
                'QueueService::constructor queue_name does not exist');
        }
        $this->prelude = $prelude;
        $this->queues = $this->queueConfigs()['queues'];
        $this->selectedQueue =  $this->queues[$queue_name];
        $this->queueUrl = $this->selectedQueue['queue_url'] ;
        $this->sqsCredentials = array(
            'region' => $this->selectedQueue['region'],
            'version' => $this->selectedQueue['version'],
            'credentials' => array(
                'key' => $this->selectedQueue['key'],
                'secret' =>$this->selectedQueue['secret'],
            ),
        );

        $this->sqsClient = $this->queueConnect($queue_name);
    }

    /**
     * Storage Configs
     *
     * @return bool|mixed
     */
    private function queueConfigs()
    {
        if ($this->queueConfigs === false) {
            $this->queueConfigs = yaml_parse_file(__DIR__.'/../../config/storage.yaml');
        }

        return $this->queueConfigs;
    }

    /**
     * Queue connection
     *
     * @param string $schema
     * @param bool $allowShared
     */
    public function queueConnect()
    {
        try {
            // Instantiate the client
            $sqsClient = SqsClient::factory($this->sqsCredentials);
        } catch (Exception $e) {
            die('Error Connecting to the Queue '.$e->getMessage());
        }

        return $sqsClient;
    }

    /**
     * Push
     *
     * @param string $jsonMessage
     *
     * @return bool
     */
    public function push(array $params) : bool
    {
        try {
            // Send the message
            $this->sqsClient->sendMessage(
                $params
            );
        } catch (Exception $e) {
            die('Error sending message to queue '.$e->getMessage());
            return false;
        }
        return true;
    }

    public function deleteMessage($result)
    {
        $this->sqsClient->deleteMessage(
            [
                'QueueUrl' => $this->queueUrl, // REQUIRED
                'ReceiptHandle' => $result->get('Messages')[0]['ReceiptHandle'] // REQUIRED
            ]
        );
    }

    /**
     * Pop Message
     *
     * @return array
     */
    public function pop() : array
    {
        $resultMessage = array();
        try {
            $result = $this->sqsClient->receiveMessage(
                array(
                    'AttributeNames' => ['SentTimestamp'],
                    'MaxNumberOfMessages' => 1,
                    'MessageAttributeNames' => ['All'],
                    'QueueUrl' => $this->queueUrl,
                    'WaitTimeSeconds' => 0,
                )
            );
            //Logging Pending Queue Messages
            $this->prelude->log->debug('QueueService:pop COUNT MESSAGES '
                    .count($result->getPath('Messages')));

            if (is_array($result->get('Messages'))) {
                if (count($result->get('Messages')) > 0) {
//                    $this->prelude->log->debug('QueueService:pop'
//                        .json_encode($result->get('Messages')));

                    $status = true;
                    $resultMessage = $result->get('Messages')[0]['Body'];
                    //Delete Message
                    $this->deleteMessage($result);
                }
            } else {
                $status = false;
                $resultMessage = "No messages in queue. \n";
            }
        } catch (Exception $e) {
            die(' message to queue '.$e->getMessage());
        }
        // Add Debugging SQS QUEUE
        $this->prelude->log->debug('WORKER BODY MESSAGE '.$resultMessage);
        return array('status' => $status, 'result' => $resultMessage);
    }

 

 

Leave a Reply