Hi!

me

Sascha-Oliver Prolic

Prooph components maintainer

Prooph Event Store v7.0.0

Changes & New Features

Requires PHP 7.1 (scalar type hints, return types, ...)

Event Store Interfaces

Different Event Store Implementations - no adapters

Implementation details of PDO-EventStore

Queries & Projections

namespace Prooph\EventStore;

interface EventStore
{
    public function create(Stream $stream): void;

    public function appendTo(
        StreamName $streamName,
        Iterator $streamEvents
    ): void;

    /* more methods */
}
namespace Prooph\EventStore;

interface EventStore
{
    /* more methods */

    public function fetchStreamMetadata(
        StreamName $streamName
    ): array;

    public function hasStream(StreamName $streamName): bool;

    public function delete(StreamName $streamName): void;

    /* more methods */
}
namespace Prooph\EventStore;

interface EventStore
{
    /* more methods */

    public function load(
        StreamName $streamName,
        int $fromNumber = 1,
        int $count = null,
        MetadataMatcher $metadataMatcher = null
    ): Stream;

    public function loadReverse(
        StreamName $streamName,
        int $fromNumber = PHP_INT_MAX,
        int $count = null,
        MetadataMatcher $metadataMatcher = null
    ): Stream;
}

MetadataMatcher

$event = UserCreated::with(['name' => 'John'], 1);
$event = $event->withAddedMetadata('foo', 'bar');
$event = $event->withAddedMetadata('int', 5);

var_dump($event->metadata());

// output

[
    'foo' => 'bar',
    'int' => 5
]

MetadataMatcher

$metadataMatcher = new MetadataMatcher();
$metadataMatcher = $metadataMatcher->withMetadataMatch(
    'foo',
    Operator::EQUALS(),
    'bar'
);
$metadataMatcher = $metadataMatcher->withMetadataMatch(
    'int',
    Operator::GREATER_THAN(),
    4
);

$stream = $eventStore->load(
    $streamName,
    1,
    null,
    $metadataMatcher
);
namespace Prooph\EventStore;

interface TransactionalEventStore extends EventStore
{
    public function beginTransaction(): void;

    public function commit(): void;

    public function rollback(): void;

    public function isInTransaction(): bool;
}
namespace Prooph\EventStore;

interface ActionEventEmitterEventStore extends EventStore
{
    const EVENT_APPEND_TO = 'appendTo';
    const EVENT_CREATE = 'create';
    const EVENT_LOAD = 'load';
    const EVENT_LOAD_REVERSE = 'loadReverse';
    const EVENT_DELETE = 'delete';
    const EVENT_HAS_STREAM = 'hasStream';
    const EVENT_FETCH_STREAM_METADATA = 'fetchStreamMetadata';

    public function getActionEventEmitter(): ActionEventEmitter;
}
namespace Prooph\EventStore;

interface TransactionalActionEventEmitterEventStore extends
    ActionEventEmitterEventStore,
    TransactionalEventStore
{
    const EVENT_BEGIN_TRANSACTION = 'beginTransaction';
    const EVENT_COMMIT = 'commit';
    const EVENT_IS_IN_TRANSACTION = 'isInTransaction';
    const EVENT_ROLLBACK = 'rollback';
}

Provides abstract classes

AbstractActionEventEmitterEventStore

AbstractTransactionalActionEventEmitterEventStore

New implementations

InMemoryEventStore (extends AbstractTransactionalActionEventEmitterEventStore)

MySQLEventStore (extends AbstractActionEventEmitterEventStore)

PostgresEventStore (extends AbstractTransactionalActionEventEmitterEventStore)

Details of PDOEventStore

namespace Prooph\EventStore\PDO;

interface TableNameGeneratorStrategy
{
    public function __invoke(StreamName $streamName): string;
}
namespace Prooph\EventStore\PDO\TableNameGeneratorStrategy;

final class Sha1 implements TableNameGeneratorStrategy
{
    public function __invoke(StreamName $streamName): string
    {
        return '_' . sha1($streamName->toString());
    }
}
namespace Prooph\EventStore\PDO;

interface IndexingStrategy
{
    /**
     * @return string[]
     */
    public function createSchema(string $tableName): array;

    public function oneStreamPerAggregate(): bool;

    /**
     * @return string[]
     */
    public function uniqueViolationErrorCodes(): array;
}

Different indexing strategies

MySQLAggregateStreamStrategy

MySQLSingleStreamStrategy

MySQLSimpleStreamStrategy

Postgres...

AggregateStreamStrategy

Used for event sourcing

One stream per aggregate

SingleStreamStrategy

Used for event sourcing

One stream per aggregate type

or one stream for all aggregates

SimpleStreamStrategy

Used outside of event sourcing

Useful especially for projections

Queries &
Projections

Projections in the past

namespace Prooph\ProophessorDo\Projection\User;

final class UserProjector
{
    public function onUserWasRegistered(UserWasRegistered $event)
    {
        $this->connection->insert(Table::USER, [
            'id' => $event->userId()->toString(),
            'name' => $event->name(),
            'email' => $event->emailAddress()->toString()
        ]);
    }

    public function onTodoWasPosted(TodoWasPosted $event)
    {
        // ...
    }
}

Queries

namespace Prooph\EventStore\Projection;

interface Query
{
    public function init(Closure $callback): Query;
    public function fromStream(string $streamName): Query;
    public function fromStreams(string ...$streamNames): Query;
    public function fromCategory(string $name): Query;
    public function fromCategories(string ...$names): Query;
    public function fromAll(): Query;
    public function when(array $handlers): Query;
    public function whenAny(Closure $closure): Query;
    public function reset(): void;
    public function run(): void;
    public function stop(): void;
    public function getState(): array;
}
function prepareEventStream(string $name, EventStore $eventStore)
{
    $events = [];
    $events[] = UserCreated::with([
        'name' => 'Alex'
    ], 1);
    for ($i = 2; $i < 50; $i++) {
        $events[] = UsernameChanged::with([
            'name' => uniqid('name_')
        ], $i);
    }
    $events[] = UsernameChanged::with([
        'name' => 'Sascha'
    ], 50);

    $eventStore->create(new Stream(
        new StreamName($name),
        new ArrayIterator($events)
    ));
}
prepareEventStream('user-123', $eventStore);
$query = new InMemoryEventStoreQuery($eventStore);
$query
    ->init(function (): array {
        return ['count' => 0];
    })
    ->fromStream('user-123')
    ->when([
        'user-name-changed' => function (
            array $state, UsernameChanged $event
        ): array {
            $state['count']++;
            return $state;
        }
    ])
    ->run();

echo $query->getState()['count']; // 49
$query->reset();
$query->run();
prepareEventStream('user-123', $eventStore);
prepareEventStream('user-234', $eventStore);

$query = new PostgresEventStoreQuery($eventStore, $connection, 'event_streams');

$query
    ->init(function (): array {
        return ['count' => 0];
    })
    ->fromStreams('user-123', 'user-234')
    ->whenAny(
        function (array $state, Message $event): array {
            $state['count']++;
            return $state;
        }
    )
    ->run();

echo $query->getState()['count']; // 100
prepareEventStream('user-123', $eventStore);

$query = new PostgresEventStoreQuery($eventStore, $connection, 'event_streams');
$query
    ->init(function (): array {
        return ['count' => 0];
    })
    ->fromCategory('user')
    ->whenAny(
        function (array $state, Message $event): array {
            $state['count']++;
            return $state;
        }
    )
    ->run();

// more code
// more code

$events = [];
for ($i = 51; $i <= 100; $i++) {
    $events[] = UsernameChanged::with([
        'name' => uniqid('name_')
    ], $i);
}
$eventStore->appendTo(
    new StreamName('user-123'),
    new ArrayIterator($events)
);

$query->run();
echo $query->getState()['count']; // 100

Projections in the past

namespace Prooph\ProophessorDo\Projection\User;

final class UserProjector
{
    public function onUserWasRegistered(UserWasRegistered $event)
    {
        $this->connection->insert(Table::USER, [
            'id' => $event->userId()->toString(),
            'name' => $event->name(),
            'email' => $event->emailAddress()->toString()
        ]);
    }

    public function onTodoWasPosted(TodoWasPosted $event)
    {
        // ...
    }
}

Projections

Projections are like queries, but persistent

Projections have special additional methods:

linkTo(string $streamName, Message $event)

emit(Message $event)

namespace Prooph\EventStore\Projection;

interface Projection extends Query
{
    public function getName(): string;

    public function emit(Message $event): void;

    public function linkTo(string $streamName, Message $event): void;

    public function delete(bool $deleteEmittedEvents): void;
}
prepareEventStream('user-123', $eventStore);

$eventStore->create(new Stream(
    new StreamName('foo'),
    new ArrayIterator()
));

$projection = new PostgresEventStoreProjection(
    $eventStore,
    $connection,
    'test_projection', // unique name of projection
    'event_streams', // event streams table
    'projections', // projections table
    1000, // timeout in ms
    true // enable emit
);

// more code
// more code

$projection
    ->fromStream('user-123')
    ->whenAny(
        function (array $state, Message $event): array {
            $this->linkTo('foo', $event);
            return $state;
        }
    )
    ->run();

echo iterator_count($eventStore->load(
    new StreamName('foo'))->streamEvents()
); // 50

// more code
// more code

$events = [];
for ($i = 51; $i < 100; $i++) {
    $events[] = UsernameChanged::with([
        'name' => uniqid('name_')
    ], $i);
}
$events[] = UsernameChanged::with([
    'name' => 'Oliver'
], 100);
$eventStore->appendTo(
    new StreamName('user-123'),
    new ArrayIterator($events)
);

// more code
// more code

$projection
    ->fromStream('user-123')
    ->whenAny(
    function (array $state, Message $event): array {
        $this->linkTo('foo', $event);
        return $state;
    })
    ->run();

echo iterator_count($eventStore->load(
    new StreamName('foo'))->streamEvents()
); // 100
$prepareEventStream('user-123', $eventStore);

$projection = new PostgresEventStoreProjection(
    $eventStore,
    $connection,
    'user_created_projection',
    'event_streams',
    'projections',
    1000,
    true
);

$projection
    ->fromStream('user-123')
    ->when([
        'user-created' => function ($state, $event): void {
            $this->emit($event);
        }
    ])
    ->run();

Projections, nice!
But for what?

Create state in minutes without writing a new model

Indexing events

Faster event store queries

Examples

Standard-Projections
https://github.com/prooph/standard-projections

AllStreamProjection


$query->fromStream('$all')
// equals $query->fromAll(), but faster

CategoryStreamProjection

$prepareEventStream('user-123', $eventStore);
$prepareEventStream('user-234', $eventStore);

$query->fromStream('$ct-user')
// equals $query->fromCategory('user'), but faster

MessageName-
StreamProjection

$query->fromStream('$mn-user-created')
$query->fromStream('$mn-user-changed-name')

Use case:

Finding all events coming from user Sascha

var_dump($message->metadata());

// output:
array(1) {
  ["username"]=>
  string(6) "Sascha"
}
$projection = new PostgresEventStoreProjection(
    $eventStore,
    $connection,
    'user_sorted_events',
    'event_streams',
    'projections',
    1000,
    true
);

$projection
    ->fromAll()
    ->whenAny(function ($state, Message $event): void {
        $this->linkTo($event->metadata()['username'], $event);
    })
    ->run();

ReadModelProjections

namespace Prooph\EventStore;

interface ReadModelProjection
{
    public function initProjection(): void;
    // f.e. create db table

    public function projectionIsInitialized(): bool;
    // return true if db table is created

    public function resetProjection(): void;
    // f.e. truncate db table

    public function deleteProjection(): void;
    // f.e. delete db table
}
$this->prepareEventStream('user-123', $eventStore);

$projection = new PostgresEventStoreReadModelProjection(
    $eventStore,
    $connection,
    'test_projection',
    $readModel,
    'event_streams',
    'projections',
    1000
);

// more code
// more code

$projection
    ->fromAll()
    ->when([
        'user-created' => function ($state, Message $event) {
            $this->readModelProjection()->insert(
                'name',
                $event->payload()['name']
            );
        },
        'username-changed' => function ($state, Message $event) {
            $this->readModelProjection()->update(
                'name',
                $event->payload()['name']
            );
        }
    ])
    ->run();

Before final release

Updating some other components

Write some more tests

Write documentation

Hire me!

Contact me

github.com/prolic

Twitter @sasaprolic

Prooph Gitter Chat (https://gitter.im/prooph/improoph)

Thank you!

Slides at: https://prolic.github.io/prooph-v7-talk/