Requires PHP 7.1 (scalar type hints, return types, ...)
Event Store Interfaces
Different Event Store Implementations - no adapters
Implementation details of PDO-EventStore
Queries & Projections
namespace Prooph\EventStore;
interface EventStore
{
public function create(Stream $stream): void;
public function appendTo(
StreamName $streamName,
Iterator $streamEvents
): void;
/* more methods */
}
namespace Prooph\EventStore;
interface EventStore
{
/* more methods */
public function fetchStreamMetadata(
StreamName $streamName
): array;
public function hasStream(StreamName $streamName): bool;
public function delete(StreamName $streamName): void;
/* more methods */
}
namespace Prooph\EventStore;
interface EventStore
{
/* more methods */
public function load(
StreamName $streamName,
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream;
public function loadReverse(
StreamName $streamName,
int $fromNumber = PHP_INT_MAX,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream;
}
$event = UserCreated::with(['name' => 'John'], 1);
$event = $event->withAddedMetadata('foo', 'bar');
$event = $event->withAddedMetadata('int', 5);
var_dump($event->metadata());
// output
[
'foo' => 'bar',
'int' => 5
]
$metadataMatcher = new MetadataMatcher();
$metadataMatcher = $metadataMatcher->withMetadataMatch(
'foo',
Operator::EQUALS(),
'bar'
);
$metadataMatcher = $metadataMatcher->withMetadataMatch(
'int',
Operator::GREATER_THAN(),
4
);
$stream = $eventStore->load(
$streamName,
1,
null,
$metadataMatcher
);
namespace Prooph\EventStore;
interface TransactionalEventStore extends EventStore
{
public function beginTransaction(): void;
public function commit(): void;
public function rollback(): void;
public function isInTransaction(): bool;
}
namespace Prooph\EventStore;
interface ActionEventEmitterEventStore extends EventStore
{
const EVENT_APPEND_TO = 'appendTo';
const EVENT_CREATE = 'create';
const EVENT_LOAD = 'load';
const EVENT_LOAD_REVERSE = 'loadReverse';
const EVENT_DELETE = 'delete';
const EVENT_HAS_STREAM = 'hasStream';
const EVENT_FETCH_STREAM_METADATA = 'fetchStreamMetadata';
public function getActionEventEmitter(): ActionEventEmitter;
}
namespace Prooph\EventStore;
interface TransactionalActionEventEmitterEventStore extends
ActionEventEmitterEventStore,
TransactionalEventStore
{
const EVENT_BEGIN_TRANSACTION = 'beginTransaction';
const EVENT_COMMIT = 'commit';
const EVENT_IS_IN_TRANSACTION = 'isInTransaction';
const EVENT_ROLLBACK = 'rollback';
}
AbstractActionEventEmitterEventStore
AbstractTransactionalActionEventEmitterEventStore
InMemoryEventStore (extends AbstractTransactionalActionEventEmitterEventStore)
MySQLEventStore (extends AbstractActionEventEmitterEventStore)
PostgresEventStore (extends AbstractTransactionalActionEventEmitterEventStore)
namespace Prooph\EventStore\PDO;
interface TableNameGeneratorStrategy
{
public function __invoke(StreamName $streamName): string;
}
namespace Prooph\EventStore\PDO\TableNameGeneratorStrategy;
final class Sha1 implements TableNameGeneratorStrategy
{
public function __invoke(StreamName $streamName): string
{
return '_' . sha1($streamName->toString());
}
}
namespace Prooph\EventStore\PDO;
interface IndexingStrategy
{
/**
* @return string[]
*/
public function createSchema(string $tableName): array;
public function oneStreamPerAggregate(): bool;
/**
* @return string[]
*/
public function uniqueViolationErrorCodes(): array;
}
MySQLAggregateStreamStrategy
MySQLSingleStreamStrategy
MySQLSimpleStreamStrategy
Postgres...
Used for event sourcing
One stream per aggregate
Used for event sourcing
One stream per aggregate type
or one stream for all aggregates
Used outside of event sourcing
Useful especially for projections
namespace Prooph\ProophessorDo\Projection\User;
final class UserProjector
{
public function onUserWasRegistered(UserWasRegistered $event)
{
$this->connection->insert(Table::USER, [
'id' => $event->userId()->toString(),
'name' => $event->name(),
'email' => $event->emailAddress()->toString()
]);
}
public function onTodoWasPosted(TodoWasPosted $event)
{
// ...
}
}
namespace Prooph\EventStore\Projection;
interface Query
{
public function init(Closure $callback): Query;
public function fromStream(string $streamName): Query;
public function fromStreams(string ...$streamNames): Query;
public function fromCategory(string $name): Query;
public function fromCategories(string ...$names): Query;
public function fromAll(): Query;
public function when(array $handlers): Query;
public function whenAny(Closure $closure): Query;
public function reset(): void;
public function run(): void;
public function stop(): void;
public function getState(): array;
}
function prepareEventStream(string $name, EventStore $eventStore)
{
$events = [];
$events[] = UserCreated::with([
'name' => 'Alex'
], 1);
for ($i = 2; $i < 50; $i++) {
$events[] = UsernameChanged::with([
'name' => uniqid('name_')
], $i);
}
$events[] = UsernameChanged::with([
'name' => 'Sascha'
], 50);
$eventStore->create(new Stream(
new StreamName($name),
new ArrayIterator($events)
));
}
prepareEventStream('user-123', $eventStore);
$query = new InMemoryEventStoreQuery($eventStore);
$query
->init(function (): array {
return ['count' => 0];
})
->fromStream('user-123')
->when([
'user-name-changed' => function (
array $state, UsernameChanged $event
): array {
$state['count']++;
return $state;
}
])
->run();
echo $query->getState()['count']; // 49
$query->reset();
$query->run();
prepareEventStream('user-123', $eventStore);
prepareEventStream('user-234', $eventStore);
$query = new PostgresEventStoreQuery($eventStore, $connection, 'event_streams');
$query
->init(function (): array {
return ['count' => 0];
})
->fromStreams('user-123', 'user-234')
->whenAny(
function (array $state, Message $event): array {
$state['count']++;
return $state;
}
)
->run();
echo $query->getState()['count']; // 100
prepareEventStream('user-123', $eventStore);
$query = new PostgresEventStoreQuery($eventStore, $connection, 'event_streams');
$query
->init(function (): array {
return ['count' => 0];
})
->fromCategory('user')
->whenAny(
function (array $state, Message $event): array {
$state['count']++;
return $state;
}
)
->run();
// more code
// more code
$events = [];
for ($i = 51; $i <= 100; $i++) {
$events[] = UsernameChanged::with([
'name' => uniqid('name_')
], $i);
}
$eventStore->appendTo(
new StreamName('user-123'),
new ArrayIterator($events)
);
$query->run();
echo $query->getState()['count']; // 100
namespace Prooph\ProophessorDo\Projection\User;
final class UserProjector
{
public function onUserWasRegistered(UserWasRegistered $event)
{
$this->connection->insert(Table::USER, [
'id' => $event->userId()->toString(),
'name' => $event->name(),
'email' => $event->emailAddress()->toString()
]);
}
public function onTodoWasPosted(TodoWasPosted $event)
{
// ...
}
}
Projections are like queries, but persistent
Projections have special additional methods:
linkTo(string $streamName, Message $event)
emit(Message $event)
namespace Prooph\EventStore\Projection;
interface Projection extends Query
{
public function getName(): string;
public function emit(Message $event): void;
public function linkTo(string $streamName, Message $event): void;
public function delete(bool $deleteEmittedEvents): void;
}
prepareEventStream('user-123', $eventStore);
$eventStore->create(new Stream(
new StreamName('foo'),
new ArrayIterator()
));
$projection = new PostgresEventStoreProjection(
$eventStore,
$connection,
'test_projection', // unique name of projection
'event_streams', // event streams table
'projections', // projections table
1000, // timeout in ms
true // enable emit
);
// more code
// more code
$projection
->fromStream('user-123')
->whenAny(
function (array $state, Message $event): array {
$this->linkTo('foo', $event);
return $state;
}
)
->run();
echo iterator_count($eventStore->load(
new StreamName('foo'))->streamEvents()
); // 50
// more code
// more code
$events = [];
for ($i = 51; $i < 100; $i++) {
$events[] = UsernameChanged::with([
'name' => uniqid('name_')
], $i);
}
$events[] = UsernameChanged::with([
'name' => 'Oliver'
], 100);
$eventStore->appendTo(
new StreamName('user-123'),
new ArrayIterator($events)
);
// more code
// more code
$projection
->fromStream('user-123')
->whenAny(
function (array $state, Message $event): array {
$this->linkTo('foo', $event);
return $state;
})
->run();
echo iterator_count($eventStore->load(
new StreamName('foo'))->streamEvents()
); // 100
$prepareEventStream('user-123', $eventStore);
$projection = new PostgresEventStoreProjection(
$eventStore,
$connection,
'user_created_projection',
'event_streams',
'projections',
1000,
true
);
$projection
->fromStream('user-123')
->when([
'user-created' => function ($state, $event): void {
$this->emit($event);
}
])
->run();
Create state in minutes without writing a new model
Indexing events
Faster event store queries
$query->fromStream('$all')
// equals $query->fromAll(), but faster
$prepareEventStream('user-123', $eventStore);
$prepareEventStream('user-234', $eventStore);
$query->fromStream('$ct-user')
// equals $query->fromCategory('user'), but faster
$query->fromStream('$mn-user-created')
$query->fromStream('$mn-user-changed-name')
var_dump($message->metadata());
// output:
array(1) {
["username"]=>
string(6) "Sascha"
}
$projection = new PostgresEventStoreProjection(
$eventStore,
$connection,
'user_sorted_events',
'event_streams',
'projections',
1000,
true
);
$projection
->fromAll()
->whenAny(function ($state, Message $event): void {
$this->linkTo($event->metadata()['username'], $event);
})
->run();
namespace Prooph\EventStore;
interface ReadModelProjection
{
public function initProjection(): void;
// f.e. create db table
public function projectionIsInitialized(): bool;
// return true if db table is created
public function resetProjection(): void;
// f.e. truncate db table
public function deleteProjection(): void;
// f.e. delete db table
}
$this->prepareEventStream('user-123', $eventStore);
$projection = new PostgresEventStoreReadModelProjection(
$eventStore,
$connection,
'test_projection',
$readModel,
'event_streams',
'projections',
1000
);
// more code
// more code
$projection
->fromAll()
->when([
'user-created' => function ($state, Message $event) {
$this->readModelProjection()->insert(
'name',
$event->payload()['name']
);
},
'username-changed' => function ($state, Message $event) {
$this->readModelProjection()->update(
'name',
$event->payload()['name']
);
}
])
->run();
Updating some other components
Write some more tests
Write documentation