Prooph Event Store, Prooph Service-Bus, Prooph Snapshotter, Prooph Event Sourcing, Prooph PSR-7 middleware, Service-Bus ZFC-Rbac Bridge, HumusAmqp, PhpDisruptor
Commands are value objects
Commands do not return anything
final class ChangeUserName {
private $userId;
private $userName;
public function __construct($userId, $userName) {
$this->userId = $userId;
$this->userName = $userName;
}
public function userId() { return $this->userId; }
public function userName() { return $this->userName; }
}
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;
$commandBus = new CommandBus();
$router = new CommandRouter();
$router->route(ChangeUserName::class)
->to(ChangeUserNameHandler::class);
$commandBus->utilize($router);
$command = new ChangeUsername($userId, $userName);
$commandBus->dispatch($command);
final class ChangeUserNameHandler {
public function __invoke(ChangeUserName $command) {
$user = $this->find($command->userId());
$user->changeUserName($command->userName());
}
private function find($userId) {
// code
}
}
Code with intent
Scale reads and writes independent from each other
Code with intent
More boilerplate
Adds complexity
TodoWasPosted
DeadlineWasAddedToTodo
TodoWasMarkedAsDone
TodoWasReopened
TodoWasMarkedAsDone
final class Todo extends AggregateRoot {
// properties
public static function post(
$text,
UserId $assigneeId,
TodoId $todoId
) {
$self = new self();
$self->assertText($text);
$self->recordThat(TodoWasPosted::byUser(
$assigneeId,
$text,
$todoId,
TodoStatus::open()
));
return $self;
}
// more methods
}
final class Todo extends AggregateRoot {
// ...
public function reopenTodo() {
if (!$this->status->isDone()) {
throw Exception\CannotReopenTodo::notMarkedDone($this);
}
$this->recordThat(TodoWasReopened::withStatus(
$this->todoId,
TodoStatus::fromString(TodoStatus::OPEN)
));
}
}
final class Todo extends AggregateRoot {
// ...
protected function whenTodoWasPosted(TodoWasPosted $event)
{
$this->todoId = $event->todoId();
$this->assigneeId = $event->assigneeId();
$this->text = $event->text();
$this->status = $event->todoStatus();
}
}
final class Todo extends AggregateRoot {
// ...
protected function whenTodoWasMarkedAsDone(
TodoWasMarkedAsDone $event
) {
$this->status = $event->newStatus();
}
}
abstract class AggregateRoot {
protected $version = 0;
protected $recordedEvents = [];
protected function recordThat(AggregateChanged $event) {
$this->version += 1;
$this->recordedEvents[] = $event->withVersion($this->version);
$this->apply($event);
}
// more code
}
abstract class AggregateRoot {
// ...
protected function apply(AggregateChanged $e) {
$handler = $this->determineEventHandlerMethodFor($e);
if (! method_exists($this, $handler)) {
throw new \RuntimeException(sprintf(
"Missing event handler method %s for aggregate root %s",
$handler,
get_class($this)
));
}
$this->{$handler}($e);
}
// more code
}
abstract class AggregateRoot {
// ...
protected static function reconstituteFromHistory(
\Iterator $historyEvents
) {
$instance = new static();
$instance->replay($historyEvents);
return $instance;
}
// more code
$streamName = new StreamName('todo');
$events = new ArrayIterator([
$todoWasPosted,
$deadlineWasAddedToTodo,
$todoWasMarkedAsDone
]);
$eventStore->beginTransaction();
$eventStore->create(new Stream($streamName, $events));
$eventStore->commit();
final class TodoProjector {
// ...
public function onTodoWasPosted(TodoWasPosted $event) {
$this->connection->insert(Table::TODO, [
'id' => $event->todoId()->toString(),
'assignee_id' => $event->assigneeId()->toString(),
'text' => $event->text(),
'status' => $event->todoStatus()->toString()
]);
}
// more methods
}
final class TodoProjector {
// ...
public function onTodoWasMarkedAsDone(TodoWasMarkedAsDone $event) {
$this->connection->update(Table::TODO,
[
'status' => $event->newStatus()->toString()
],
[
'id' => $event->todoId()->toString()
]
);
}
}
Append only (very fast)
Immutable
No joins
Complete history
Not really
Overkill for simple CRUD apps