v1.0.0 initial release
This commit is contained in:
+536
@@ -0,0 +1,536 @@
|
||||
<?php
|
||||
|
||||
namespace Illuminate\Bus;
|
||||
|
||||
use Aws\DynamoDb\DynamoDbClient;
|
||||
use Aws\DynamoDb\Marshaler;
|
||||
use Carbon\CarbonImmutable;
|
||||
use Closure;
|
||||
use Illuminate\Support\Str;
|
||||
|
||||
class DynamoBatchRepository implements BatchRepository
|
||||
{
|
||||
/**
|
||||
* The batch factory instance.
|
||||
*
|
||||
* @var \Illuminate\Bus\BatchFactory
|
||||
*/
|
||||
protected $factory;
|
||||
|
||||
/**
|
||||
* The database connection instance.
|
||||
*
|
||||
* @var \Aws\DynamoDb\DynamoDbClient
|
||||
*/
|
||||
protected $dynamoDbClient;
|
||||
|
||||
/**
|
||||
* The application name.
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
protected $applicationName;
|
||||
|
||||
/**
|
||||
* The table to use to store batch information.
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
protected $table;
|
||||
|
||||
/**
|
||||
* The time-to-live value for batch records.
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
protected $ttl;
|
||||
|
||||
/**
|
||||
* The name of the time-to-live attribute for batch records.
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
protected $ttlAttribute;
|
||||
|
||||
/**
|
||||
* The DynamoDB marshaler instance.
|
||||
*
|
||||
* @var \Aws\DynamoDb\Marshaler
|
||||
*/
|
||||
protected $marshaler;
|
||||
|
||||
/**
|
||||
* Create a new batch repository instance.
|
||||
*/
|
||||
public function __construct(
|
||||
BatchFactory $factory,
|
||||
DynamoDbClient $dynamoDbClient,
|
||||
string $applicationName,
|
||||
string $table,
|
||||
?int $ttl,
|
||||
?string $ttlAttribute,
|
||||
) {
|
||||
$this->factory = $factory;
|
||||
$this->dynamoDbClient = $dynamoDbClient;
|
||||
$this->applicationName = $applicationName;
|
||||
$this->table = $table;
|
||||
$this->ttl = $ttl;
|
||||
$this->ttlAttribute = $ttlAttribute;
|
||||
$this->marshaler = new Marshaler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve a list of batches.
|
||||
*
|
||||
* @param int $limit
|
||||
* @param mixed $before
|
||||
* @return \Illuminate\Bus\Batch[]
|
||||
*/
|
||||
public function get($limit = 50, $before = null)
|
||||
{
|
||||
$condition = 'application = :application';
|
||||
|
||||
if ($before) {
|
||||
$condition = 'application = :application AND id < :id';
|
||||
}
|
||||
|
||||
$result = $this->dynamoDbClient->query([
|
||||
'TableName' => $this->table,
|
||||
'KeyConditionExpression' => $condition,
|
||||
'ExpressionAttributeValues' => array_filter([
|
||||
':application' => ['S' => $this->applicationName],
|
||||
':id' => array_filter(['S' => $before]),
|
||||
]),
|
||||
'Limit' => $limit,
|
||||
'ScanIndexForward' => false,
|
||||
]);
|
||||
|
||||
return array_map(
|
||||
fn ($b) => $this->toBatch($this->marshaler->unmarshalItem($b, mapAsObject: true)),
|
||||
$result['Items']
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve information about an existing batch.
|
||||
*
|
||||
* @param string $batchId
|
||||
* @return \Illuminate\Bus\Batch|null
|
||||
*/
|
||||
public function find(string $batchId)
|
||||
{
|
||||
if ($batchId === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
$b = $this->dynamoDbClient->getItem([
|
||||
'TableName' => $this->table,
|
||||
'Key' => [
|
||||
'application' => ['S' => $this->applicationName],
|
||||
'id' => ['S' => $batchId],
|
||||
],
|
||||
]);
|
||||
|
||||
if (! isset($b['Item'])) {
|
||||
// If we didn't find it via a standard read, attempt consistent read...
|
||||
$b = $this->dynamoDbClient->getItem([
|
||||
'TableName' => $this->table,
|
||||
'Key' => [
|
||||
'application' => ['S' => $this->applicationName],
|
||||
'id' => ['S' => $batchId],
|
||||
],
|
||||
'ConsistentRead' => true,
|
||||
]);
|
||||
|
||||
if (! isset($b['Item'])) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
$batch = $this->marshaler->unmarshalItem($b['Item'], mapAsObject: true);
|
||||
|
||||
if ($batch) {
|
||||
return $this->toBatch($batch);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a new pending batch.
|
||||
*
|
||||
* @param \Illuminate\Bus\PendingBatch $batch
|
||||
* @return \Illuminate\Bus\Batch
|
||||
*/
|
||||
public function store(PendingBatch $batch)
|
||||
{
|
||||
$id = (string) Str::orderedUuid();
|
||||
|
||||
$batch = [
|
||||
'id' => $id,
|
||||
'name' => $batch->name,
|
||||
'total_jobs' => 0,
|
||||
'pending_jobs' => 0,
|
||||
'failed_jobs' => 0,
|
||||
'failed_job_ids' => [],
|
||||
'options' => $this->serialize($batch->options ?? []),
|
||||
'created_at' => time(),
|
||||
'cancelled_at' => null,
|
||||
'finished_at' => null,
|
||||
];
|
||||
|
||||
if (! is_null($this->ttl)) {
|
||||
$batch[$this->ttlAttribute] = time() + $this->ttl;
|
||||
}
|
||||
|
||||
$this->dynamoDbClient->putItem([
|
||||
'TableName' => $this->table,
|
||||
'Item' => $this->marshaler->marshalItem(
|
||||
array_merge(['application' => $this->applicationName], $batch)
|
||||
),
|
||||
]);
|
||||
|
||||
return $this->find($id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the total number of jobs within the batch.
|
||||
*
|
||||
* @param string $batchId
|
||||
* @param int $amount
|
||||
* @return void
|
||||
*/
|
||||
public function incrementTotalJobs(string $batchId, int $amount)
|
||||
{
|
||||
$update = 'SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val';
|
||||
|
||||
if ($this->ttl) {
|
||||
$update = "SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val, #{$this->ttlAttribute} = :ttl";
|
||||
}
|
||||
|
||||
$this->dynamoDbClient->updateItem(array_filter([
|
||||
'TableName' => $this->table,
|
||||
'Key' => [
|
||||
'application' => ['S' => $this->applicationName],
|
||||
'id' => ['S' => $batchId],
|
||||
],
|
||||
'UpdateExpression' => $update,
|
||||
'ExpressionAttributeValues' => array_filter([
|
||||
':val' => ['N' => "$amount"],
|
||||
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
|
||||
]),
|
||||
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
|
||||
'ReturnValues' => 'ALL_NEW',
|
||||
]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement the total number of pending jobs for the batch.
|
||||
*
|
||||
* @param string $batchId
|
||||
* @param string $jobId
|
||||
* @return \Illuminate\Bus\UpdatedBatchJobCounts
|
||||
*/
|
||||
public function decrementPendingJobs(string $batchId, string $jobId)
|
||||
{
|
||||
$update = 'SET pending_jobs = pending_jobs - :inc';
|
||||
|
||||
if ($this->ttl !== null) {
|
||||
$update = "SET pending_jobs = pending_jobs - :inc, #{$this->ttlAttribute} = :ttl";
|
||||
}
|
||||
|
||||
$batch = $this->dynamoDbClient->updateItem(array_filter([
|
||||
'TableName' => $this->table,
|
||||
'Key' => [
|
||||
'application' => ['S' => $this->applicationName],
|
||||
'id' => ['S' => $batchId],
|
||||
],
|
||||
'UpdateExpression' => $update,
|
||||
'ExpressionAttributeValues' => array_filter([
|
||||
':inc' => ['N' => '1'],
|
||||
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
|
||||
]),
|
||||
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
|
||||
'ReturnValues' => 'ALL_NEW',
|
||||
]));
|
||||
|
||||
$values = $this->marshaler->unmarshalItem($batch['Attributes']);
|
||||
|
||||
return new UpdatedBatchJobCounts(
|
||||
$values['pending_jobs'],
|
||||
$values['failed_jobs']
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the total number of failed jobs for the batch.
|
||||
*
|
||||
* @param string $batchId
|
||||
* @param string $jobId
|
||||
* @return \Illuminate\Bus\UpdatedBatchJobCounts
|
||||
*/
|
||||
public function incrementFailedJobs(string $batchId, string $jobId)
|
||||
{
|
||||
$update = 'SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId)';
|
||||
|
||||
if ($this->ttl !== null) {
|
||||
$update = "SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId), #{$this->ttlAttribute} = :ttl";
|
||||
}
|
||||
|
||||
$batch = $this->dynamoDbClient->updateItem(array_filter([
|
||||
'TableName' => $this->table,
|
||||
'Key' => [
|
||||
'application' => ['S' => $this->applicationName],
|
||||
'id' => ['S' => $batchId],
|
||||
],
|
||||
'UpdateExpression' => $update,
|
||||
'ExpressionAttributeValues' => array_filter([
|
||||
':jobId' => $this->marshaler->marshalValue([$jobId]),
|
||||
':inc' => ['N' => '1'],
|
||||
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
|
||||
]),
|
||||
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
|
||||
'ReturnValues' => 'ALL_NEW',
|
||||
]));
|
||||
|
||||
$values = $this->marshaler->unmarshalItem($batch['Attributes']);
|
||||
|
||||
return new UpdatedBatchJobCounts(
|
||||
$values['pending_jobs'],
|
||||
$values['failed_jobs']
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark the batch that has the given ID as finished.
|
||||
*
|
||||
* @param string $batchId
|
||||
* @return void
|
||||
*/
|
||||
public function markAsFinished(string $batchId)
|
||||
{
|
||||
$update = 'SET finished_at = :timestamp';
|
||||
|
||||
if ($this->ttl !== null) {
|
||||
$update = "SET finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
|
||||
}
|
||||
|
||||
$this->dynamoDbClient->updateItem(array_filter([
|
||||
'TableName' => $this->table,
|
||||
'Key' => [
|
||||
'application' => ['S' => $this->applicationName],
|
||||
'id' => ['S' => $batchId],
|
||||
],
|
||||
'UpdateExpression' => $update,
|
||||
'ExpressionAttributeValues' => array_filter([
|
||||
':timestamp' => ['N' => (string) time()],
|
||||
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
|
||||
]),
|
||||
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
|
||||
]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel the batch that has the given ID.
|
||||
*
|
||||
* @param string $batchId
|
||||
* @return void
|
||||
*/
|
||||
public function cancel(string $batchId)
|
||||
{
|
||||
$update = 'SET cancelled_at = :timestamp, finished_at = :timestamp';
|
||||
|
||||
if ($this->ttl !== null) {
|
||||
$update = "SET cancelled_at = :timestamp, finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
|
||||
}
|
||||
|
||||
$this->dynamoDbClient->updateItem(array_filter([
|
||||
'TableName' => $this->table,
|
||||
'Key' => [
|
||||
'application' => ['S' => $this->applicationName],
|
||||
'id' => ['S' => $batchId],
|
||||
],
|
||||
'UpdateExpression' => $update,
|
||||
'ExpressionAttributeValues' => array_filter([
|
||||
':timestamp' => ['N' => (string) time()],
|
||||
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
|
||||
]),
|
||||
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
|
||||
]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the batch that has the given ID.
|
||||
*
|
||||
* @param string $batchId
|
||||
* @return void
|
||||
*/
|
||||
public function delete(string $batchId)
|
||||
{
|
||||
$this->dynamoDbClient->deleteItem([
|
||||
'TableName' => $this->table,
|
||||
'Key' => [
|
||||
'application' => ['S' => $this->applicationName],
|
||||
'id' => ['S' => $batchId],
|
||||
],
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given Closure within a storage specific transaction.
|
||||
*
|
||||
* @param \Closure $callback
|
||||
* @return mixed
|
||||
*/
|
||||
public function transaction(Closure $callback)
|
||||
{
|
||||
return $callback();
|
||||
}
|
||||
|
||||
/**
|
||||
* Rollback the last database transaction for the connection.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function rollBack()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the given raw batch to a Batch object.
|
||||
*
|
||||
* @param object $batch
|
||||
* @return \Illuminate\Bus\Batch
|
||||
*/
|
||||
protected function toBatch($batch)
|
||||
{
|
||||
return $this->factory->make(
|
||||
$this,
|
||||
$batch->id,
|
||||
$batch->name,
|
||||
(int) $batch->total_jobs,
|
||||
(int) $batch->pending_jobs,
|
||||
(int) $batch->failed_jobs,
|
||||
$batch->failed_job_ids,
|
||||
$this->unserialize($batch->options) ?? [],
|
||||
CarbonImmutable::createFromTimestamp($batch->created_at, date_default_timezone_get()),
|
||||
$batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at, date_default_timezone_get()) : $batch->cancelled_at,
|
||||
$batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at, date_default_timezone_get()) : $batch->finished_at
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the underlying DynamoDB table.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function createAwsDynamoTable(): void
|
||||
{
|
||||
$definition = [
|
||||
'TableName' => $this->table,
|
||||
'AttributeDefinitions' => [
|
||||
[
|
||||
'AttributeName' => 'application',
|
||||
'AttributeType' => 'S',
|
||||
],
|
||||
[
|
||||
'AttributeName' => 'id',
|
||||
'AttributeType' => 'S',
|
||||
],
|
||||
],
|
||||
'KeySchema' => [
|
||||
[
|
||||
'AttributeName' => 'application',
|
||||
'KeyType' => 'HASH',
|
||||
],
|
||||
[
|
||||
'AttributeName' => 'id',
|
||||
'KeyType' => 'RANGE',
|
||||
],
|
||||
],
|
||||
'BillingMode' => 'PAY_PER_REQUEST',
|
||||
];
|
||||
|
||||
$this->dynamoDbClient->createTable($definition);
|
||||
|
||||
if (! is_null($this->ttl)) {
|
||||
$this->dynamoDbClient->updateTimeToLive([
|
||||
'TableName' => $this->table,
|
||||
'TimeToLiveSpecification' => [
|
||||
'AttributeName' => $this->ttlAttribute,
|
||||
'Enabled' => true,
|
||||
],
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the underlying DynamoDB table.
|
||||
*/
|
||||
public function deleteAwsDynamoTable(): void
|
||||
{
|
||||
$this->dynamoDbClient->deleteTable([
|
||||
'TableName' => $this->table,
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the expiry time based on the configured time-to-live.
|
||||
*
|
||||
* @return string|null
|
||||
*/
|
||||
protected function getExpiryTime(): ?string
|
||||
{
|
||||
return is_null($this->ttl) ? null : (string) (time() + $this->ttl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the expression attribute name for the time-to-live attribute.
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
protected function ttlExpressionAttributeName(): array
|
||||
{
|
||||
return is_null($this->ttl) ? [] : ["#{$this->ttlAttribute}" => $this->ttlAttribute];
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize the given value.
|
||||
*
|
||||
* @param mixed $value
|
||||
* @return string
|
||||
*/
|
||||
protected function serialize($value)
|
||||
{
|
||||
return serialize($value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unserialize the given value.
|
||||
*
|
||||
* @param string $serialized
|
||||
* @return mixed
|
||||
*/
|
||||
protected function unserialize($serialized)
|
||||
{
|
||||
return unserialize($serialized);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the underlying DynamoDB client instance.
|
||||
*
|
||||
* @return \Aws\DynamoDb\DynamoDbClient
|
||||
*/
|
||||
public function getDynamoClient(): DynamoDbClient
|
||||
{
|
||||
return $this->dynamoDbClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the table that contains the batch records.
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getTable(): string
|
||||
{
|
||||
return $this->table;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user