Protocol‎ > ‎Design Documents‎ > ‎

Checkpoint-based recovery design proposal

    2010-12 (based on an earlier design from 2010-06)

    Jochen Bekmann (jochen@google.com), Alex North (anorth@google.com)

    Note: this design was originally proposed for Google Wave. Wave in a Box may have different requirements and this design may need modification

    Objective

    This design describes a library and service enabling checkpoint-based recovery for:
    • listeners on the wave bus
    • outbound federation commit notifications

    Wave bus listeners and federation gateways are henceforth referred to as clients of the checkpoint library.

    The checkpoint library provides periodic notification of remote events (checkpoints) and a means for clients to acknowledge successful processing of those events, and persist those acknowledgements. After a failure, the library provides a way to query for missed events.

    The library only provides notification of missed events; it is the client's responsibility to take appropriate recovery action. Such notification is adequate for clients which can perform their actions based only on the current state of a wave, rather than knowledge of the path to reach that state.

    Background

    The wave server does not necessarily commit received deltas as soon as they are applied. Instead, the server is free to commit to persistent storage some time after applying and broadcasting deltas. This freedom reduces latencies and supports higher throughput with write batching techniques. Wave in a Box does not currently make much use of this freedom, except for committing deltas after broadcasting them.

    The term "checkpoint" throughout this document refers to a message describing a set of changes committed to some set of wavelets over some time period. Checkpoints describe beginning and ending hashed versions for the wavelets in question but elide intervening versions. The Wave in a Box server doesn't currently generate checkpoints, but this design presupposes that such a mechanism is implemented. (This should be further explained in another document).

    The federation protocol requires reliable delivery of commit notifications to remote domains. This requires the WIAB server to send a notice to each affected remote domain for each wavelet in a checkpoint, and retransmission of said notice until each remote domain has acknowledged it. Affected remote domains are computed from a wavelet's participant list.

    Some wave bus clients such as the indexing module need to reliably process every change to every wavelet in order to produce correct results. After a server failure, where some indexing operations may not have been completed, the indexer needs a mechanism to recover and re-index waves that may have changed.

    Requirements and Scale

    The checkpoint library allows the creation of reliable clients. Reliable here means that after a server or network failure the clients can learn of any out-of-date data and take action to recover. Depending on how the clients works, this may not have the same result as if the failure never happened.

    The library provides:
    • A means for clients to acknowledge successful processing of a wavelet version (a "receipt")
    • Notification to clients of wavelet versions as they are committed to persistent storage
    • A pluggable system for persisting receipts, with a (filesystem-backed?) implementation
    • A means for clients to learn of missed wavelet events.

    The client receives initial notification of changes via some other channel (such as the wave bus).

    Failure need not be transparent to the client. Client-specific recovery action must be taken when a failure is detected. In particular, the library will not initially provide "replay the world" transparent recovery, though such reliability could be implemented on top of this library. Notification of missed events will have "at least once" semantics; that is, a client is guaranteed notification of missed events, but may receive duplicate notification for the same events in some cases (such as when storing a receipt fails).

    Knowledge of which wavelets a client has potentially missed updates from is sufficient for clients which can recover with access to only the current state of a wave (i.e., are idempotent), including:
    • indexing and inboxing
    • federation updater

    The checkpoint library will make use of a pluggable storage service for persisting receipts. Receipts are to be stored with checkpoint granularity; a receipt is persisted only after all changes in a checkpoint are acknowledged by the client.

    Design Ideas

    The checkpoint recovery library is split into two parts: the checkpoint service and the storage service. The following sections describe the interfaces, then some key processes.

    Checkpoint service interfaces

    The checkpoint library interfaces with:
    • The wave bus for checkpoint streams and requests
    • A client, providing checkpoint events and receiving receipts
    • A storage service for storing receipts



    Client interface: the wavelet recovery service

    The client interacts directly with the checkpoint library to submit receipts for wavelets and query for unprocessed events.

    A client receives events from some channel such as the wave bus. A client
    must eventually provide a receipt for the events for recovery to be feasible. If a client continually experiences failure processing some wavelet it should make a note of the failure elsewhere and provide a receipt (later extensions to the service may support "negative receipts" for this).

    interface RecoveryServiceFactory {
    // Initializes a checkpoint service for a client.
    RecoveryService initializeRecoveryService(CheckpointSource source, ReceiptStore store);
    }

    A client submits a receipt for a single wavelet at some version. Per-wavelet receipts are not persisted, however the recovery library detects when all wavelets in a checkpoint have been receipted and considers that a receipt for the checkpoint. Submitting a receipt is idempotent.

    Receipts are submitted asynchronously since many clients don't need to take explicit action after the receipt is persisted. The client may not submit receipts for versions not yet shared with it by the RecoveryService.

    interface RecoveryService {

    // Submits a receipt for processing a wavelet events up to some version.
    // This method blocks until the receipt is written to persistent storage, which is
    // only after all other wavelets in the same checkpoint are receipted.
    // Fails if the version is not known to be committed (see subscribe())
    void submitReceipt(WaveletName key, SignedVersion version) throws RecoveryException;

    // Continued...
    }

    Clients are vulnerable to a repeated failure to process some wavelet preventing the persistence of receipts. Clients that detect such failures should eventually provide a receipt for the wavelet anyway, perhaps noting the failure in some other persistent store.

    TODO: possibly design support for storing negative receipts or supplementary receipt information

    Clients should periodically request recovery information to ensure that all changes are receipted. The recovery query requests checkpoint information from the wave server and compares it with the persisted receipts, providing a collection of mismatches. The service is configured with a flag providing a bound on how far back to examine checkpoints and receipts.

    Checkpoints received from the wave server since startup are included in this information, but those received very recently (confgurable by a flag) are not included as needing recovery; they are likely still being processed by the client.

    If a client has no persisted receipts then no recovery information is returned. The service assumes that this is the first time the client has enabled the service.

    interface RecoveryService {
    // ...

    // Represents wavelet version known to be committed to storage.
    // NB: the actual implementation will use accessor methods
    interface CommittedVersion {
    public final WaveletName key;
    public final SignedVersion committedVersion;
    }

    interface CommittedVersionListener {
    void receive(CommittedVersion cv);
    }

    // Subscribes to a live stream of wavelet committed versions and
    // finds committed wavelet versions for which the client has submitted no receipt
    // The returned iterator presents all wavelet versions which have already been
    // persisted but for which the client has not provided a receipt. Items are returned in no particular order.
    // The listener is guaranteed to receive all version updates subsequent to those returned via the iterator.
    ExceptionalIterator<CommittedVersion> subscribe(CommittedVersionListener listener);
    }

    Checkpoint stream interface


    interface CheckpointSource {
    interface CheckpointListener {
    void receiveCheckpoint(Checkpoint cp);
    }

    // Registers a listener to receive checkpoints
    void registerCheckpointListener(CheckpoinListener l);

    // Requests a stream of checkpoints back until some timestamp.
    // Checkpoints are returned in reverse chronological order.
    Iterator<Checkpoint> requestCheckpoints(long earliestTimestampMicros);
    }

    Storage service

    The checkpoint service persists receipts and low watermark via a storage service, which provides reliable persistent storage.

    Receipt store interface

    The storage service stores receipts for contiguous ranges of checkpoints along with a low-water mark to bound recovery.

    interface ReceiptStore {
    // Stores a low watermark.
    Future<?> storeLowWatermark(CheckpointId checkpoint);

    // Retrives the low watermark.
    CheckpointId getLowWatermark();

    // Stores a receipt for all changes in a contiguous, inclusive range of checkpoints. This does not imply
    // receipt of changes in earlier checkpoints for the same wave range.
    // If an existing range receipt exists with the same beginCheckpoint and an earlier endCheckpoint
    // then this method may update that receipt rather than store a new one.
    Future<?> storeCheckpointRangeReceipt(CheckpointId beginCheckpoint, checkpointId EndCheckpoint);

    // Retrieves all checkpoint range receipts after (and excluding) some checkpointId.
    // Results are provided in reverse chronological order by beginCheckpoint. The ranges may overlap.
    ExceptionalIterator<Pair<CheckpointId, CheckpointId>> getCheckpointRangeReceipts(CheckpointId antecedent);

    // NOTE: Compression of checkpoint range receipts is future optimisation
    // Deletes checkpoint receipts (presumably because they are
    // obsoleted by a checkpoint range receipt).
    Future<?> purgeCheckpointReceipts(String clientId, long fromTimestamp, long toTimestamp);
    }

    Receipt storage module

    The receipt store writes checkpoint receipts to persistent storage. We envisage all clients sharing a single table.

    The store acts on behalf of a single client at a time, and this client id provides part of the row key.

    class ReceiptStoreFactory {
    ReceiptStore initializeStore(String clientIdentifier);
    }

    Each checkpoint receipt record comprises three fields:
    clientIdentifier: the client's task name, e.g. "indexer"
    beginCheckpointId: the first receipted checkpoint id
    endCheckpointId: the last receipted checkpoint id

    This gives about 24 bytes of raw data.

    TODO: write this up as a schema, address keys, indexing and sorting etc.

    Scale

    Each client stores a receipt for each checkpoint message on the wave bus.

    If checkpoints are generated every second then for each client, and with no sharding of waves, the raw receipt data rate is approximately:
    3600 * 24 = 86KB / hour (721 MB / year)

    Storing range receipts reduces this data significantly. Running a compression job would bring it to an approximately constant size.

    Processes

    This design is written with the intention of running the checkpoint service and client in the same process. i.e. in-memory state held by the checkpoint service will not be lost independently of the client's state.

    Receiving and receipting checkpoints

    When the checkpoint service receives a checkpoint message from the checkpoint source it records it in memory. The service then waits for the client to provide a receipt for wavelets named in the checkpoint.

    As the client provides receipts for individual wavelets the checkpoint service records these in memory until the client has provided a receipt for all wavelets named in the checkpoint. It then initiates a write to the receipt store. The client is notified of the succesful receipt storage via the future provided when the wavelet receipt was submitted.

    A "low watermark" for each client is also persisted defining a lower bound on requests for receipted versions. The watermark is written concurrently with a receipt write when all checkpoints between the current watermark and the new receipt have been receipted. The checkpoint service exposes a flag which bounds the low watermark. Any persisted watermarks that precedes this window will be overwritten with a new value which lies within the maximum recovery window. The low watermark is advanced whenever a checkpoint is subsumed (i.e. all wavelet versions are subsumed in subsequent checkpoints and are thus not needed for recovery).

    Recovery

    Upon startup (and, for a paranoid client, periodically while executing) a client should query the checkpoint service for unreceipted wavelets. The service will in turn:
    • query the checkpoint source for the checkpoint stream
    • query the receipt store for range receipts and checkpoint receipts
    Both streams are provided in reverse chronological order. Changes older than the receipt low watermark may not be discovered

    The checkpoint service returns the name and latest committed version number of each wavelet version discovered in the checkpoint stream that does not have a subsequent receipt. The client will not receive information about a wavelet more than once. The checkpoint service holds in memory those checkpoints corresponding to unreceipted wavelets, with an implicit receipt for wavelet versions subsumed by a receipt message for a later version already stored. It then processes receipt calls from the client as usual. The client need only provide receipts for those wavelets with unreceipted events.

    The service may return wavelets in an order that facilitates efficiency of recovery, but it's unclear whether there's a better way than just chronological or reverse chronological. For example, changes from very old checkpoints may be returned early in the stream so that the low water mark may be moved.

    The mechanics of recovery from this list of wavelets is the client's responsibility.

    Bootstrapping

    The very first time a client starts up with the checkpoint service it will have no stored receipts and recovery queries will return no results. Client registration subsequent to writing its first receipt will initiate recovery. The recovery burden will be limited by a checkpoint service flag which bounds the earliest possible recovery point.

    Receipt store compression (optional)

    The storage service provides storage of both individual checkpoint receipts and checkpoint range receipts. A checkpoint range receipt represents a receipt for each checkpoint in a contiguous sequence of checkpoints. By periodically compressing reciepts into range receipts the checkpoint service reduces the amount of storage required for receipts and the amount of data to be fetched and processed during recovery.

    The checkpoint service periodically queries the storage service for the checkpoints and ranges and writes a range receipt encapsulating significant contiguous ranges. It then purges the redundant individual receipts from storage.

    Caveats

    Future wave bus subscribers may receive a filtered view of the wave bus, restricted by wave id. Such a restricted view should implicitly acknowledge all changes from waves excluded by the filter.

    Alternatives Considered

    Explicit storage of per-wavelet receipts was considered as mitigation against a few wavelet failures preventing receipt of checkpoints. It was excluded as it added significant complexity for questionable gain. Strategies for dealing with failing wavelets may be client-specific. 
Comments