Streaming Ledger Quick Start

Developing against Ververica Streaming Ledger is simple. The APIs are designed to feel natural both to users that have used stream processing before, and to users that are familiar with databases.

We illustrate this with an example (the full code is included with the API on GitHub). The use case has the following properties: - There are two tables: Accounts and Book Entries - We have two streams of events: Deposits and Transfers - Deposits put values into Accounts and the Book - Transfers atomically move values between accounts and book entries, under a precondition

../../_images/ledger-tutorial-overview.png

2. Define the Transactional Scope and Tables

The next step is to define the scope and tables. For simplicity, our tables are using strings as keys (account ID, entry ID), and longs for the current value/balance:

// start building the transactional streams
StreamLedger tradeLedger = StreamLedger.create("simple trade example");

// define the transactional states
StreamLedger.State<String, Long> accounts = tradeLedger.declareState("accounts")
        .withKeyType(String.class)
        .withValueType(Long.class);

StreamLedger.State<String, Long> books = tradeLedger.declareState("bookEntries")
        .withKeyType(String.class)
        .withValueType(Long.class);

3. Define the Transaction Functions and the Keys

Next, we specify which transaction function to apply to each stream, and for each table that will be used, how to select the keys for the rows that will be accessed.

The .apply(...) functions themselves contain the transactions’ business logic (see below).

For each row being accessed, we add a call that specifies the access: .on(table, key, name, type):
  • table indicates the table that is accessed
  • key is a function through which the key for that row is derived from the input event
  • name is a logical name for that particular row (used later)
  • type qualifies read-only, write-only, or read-write access to the row. This is an optimization, where READ_WRITE is the most generic option
// define transactors on states
tradeLedger.usingStream(deposits, "deposits")
        .apply(new DepositHandler())
        .on(accounts, DepositEvent::getAccountId, "account", READ_WRITE)
        .on(books, DepositEvent::getBookEntryId, "asset", READ_WRITE);
// define the transfes that update always four rows on different keys.
// We store a handle to the result stream for later use.
OutputTag<TransactionResult> transactionResults = tradeLedger.usingStream(transfers, "transactions")
        .apply(new TxnHandler())
        .on(accounts, TransactionEvent::getSourceAccountId, "source-account", READ_WRITE)
        .on(accounts, TransactionEvent::getTargetAccountId, "target-account", READ_WRITE)
        .on(books, TransactionEvent::getSourceBookEntryId, "source-asset", READ_WRITE)
        .on(books, TransactionEvent::getTargetBookEntryId, "target-asset", READ_WRITE)
        .output();

4. Implement the Transaction Function

A transaction function contains the business logic that decides whether and how to update the table rows provided to it, and what to emit as a result.

These Transaction Functions are passed a state access object for each row that is being read or updated. To correlate the state accesses with the rows and keys, they are annotated with the name defined in the prior step.

For simplicity, we only show the implementation of the TxnHandler:

private static final class TxnHandler extends TransactionProcessFunction<TransactionEvent, TransactionResult> {
    private static final Supplier<Long> ZERO = () -> 0L;

    @ProcessTransaction
    public void process(
            final TransactionEvent txn,
            final Context<TransactionResult> ctx,
            final @State("source-account") StateAccess<Long> sourceAccount,
            final @State("target-account") StateAccess<Long> targetAccount,
            final @State("source-asset") StateAccess<Long> sourceAsset,
            final @State("target-asset") StateAccess<Long> targetAsset) {

        final long sourceAccountBalance = sourceAccount.readOr(ZERO);
        final long sourceAssetValue = sourceAsset.readOr(ZERO);
        final long targetAccountBalance = targetAccount.readOr(ZERO);
        final long targetAssetValue = targetAsset.readOr(ZERO);

        // check the preconditions
        if (sourceAccountBalance > txn.getMinAccountBalance()
                && sourceAccountBalance > txn.getAccountTransfer()
                && sourceAssetValue > txn.getBookEntryTransfer()) {

            // compute the new balances
            final long newSourceBalance = sourceAccountBalance - txn.getAccountTransfer();
            final long newTargetBalance = targetAccountBalance + txn.getAccountTransfer();
            final long newSourceAssets = sourceAssetValue - txn.getBookEntryTransfer();
            final long newTargetAssets = targetAssetValue + txn.getBookEntryTransfer();

            // write back the updated values
            sourceAccount.write(newSourceBalance);
            targetAccount.write(newTargetBalance);
            sourceAsset.write(newSourceAssets);
            targetAsset.write(newTargetAssets);

            // emit result event with updated balances and flag to mark transaction as processed
            ctx.emit(new TransactionResult(txn, true, newSourceBalance, newTargetBalance));
        }
        else {
            // emit result with unchanged balances and a flag to mark transaction as rejected
            ctx.emit(new TransactionResult(txn, false, sourceAccountBalance, targetAccountBalance));
        }
    }
}

5. Optionally obtain the resulting streams

As an optional step, we can grab the stream of result events produced by the TransferFunction. We use the result tag which was created when we defined the transaction in step (3):

ResultStreams resultsStreams = txStreams.resultStreams();
DataStream<TransferResult> output= resultsStreams.getResultStream(result);

That’s it!