The book begins with a quick look at the characteristics of systems (reliability, maintainability…) to set a common language to talk about. This part assumes a single machine.
Then it jumps into different data models from relational to graph. Although it’s brief, it’s an interesting look at the essence of them and the goals that each one pursues. For example, it focuses on the convenience of different models for representing many-to-many relationships as the basis for new needs.
Data models without persistence might be interesting from a philosophical point of view but not from a practical one, so storage must be studied. The third chapter explains OLTP vs OLAP systems. Within OLTP systems there’s a great analysis of log-like vs B-tree-like structures.
This part continues with an in-depth look at the lower detail: the encoding. It compares several approaches, from language-dependent serialization to textual formats (CSV, JSON…) or binary, schema-based formats (Protocol Buffers, Avro…). It ends with a summary of data flows, data protocols for communication (RPC, databases, REST, actor model…
The first step is replication, which is driven by the need for availability, improving the latency, scalability or disconnected operation. The main approaches are single-leader, multi-leader, and leaderless replication. Replication can be synchronous or asynchronous. Replication lag is a challenge for read-after-write consistency, monotonic reads, and consistent prefix reads.
For scalability, the main tool might be partitioning. The two main approaches are key range partitioning and hash partitioning. For secondary indexes, there are document-partitioned indexes and term-partitioned indexes. And you need a way to route requests, either client know, a routing tier, or the nodes themselves.
Transactions try to make concurrency easier. The transaction chapter describes the whole set of issues that can occur (dirty reads, dirty writes, read skew, lost updates, write skew, phantom reads), the different isolation levels with the guarantees that they provide (read committed, snapshot isolation, serializable). It ends with three different approaches to serialization (literally in order, two-phase locking, and serializable snapshot isolation).
In distributed environments, a whole range of problems may occur, from network failures to pauses. Systems must be prepared to detect and tolerate them for reliability.
Consistency and consensus begin learning about linearizability, appealing but slow. A looser guarantee is causality, and requires less coordination and has better tolerance for network issues. But not everything can be implemented with causal guarantees. Consensus algorithms lay in the middle. Consensus, which is coming to an agreement among several nodes, is very useful for many tasks and guarantees, but become really hard if you need fault tolerance. Anyway, it comes with a cost.
UNIX tools are a great example of batch processing tools because its simplicity and composability make them really powerful. They’re limited to single nodes, though. MapReduce can be seen as the UNIX tools within a cluster, with a distributed filesystem and small jobs that are used in workflows (but with an intermediate state). Distributed batch processing must solve partitioning and fault-tolerance, and implement join mechanisms.
Steam processing is similar to batching but unbounded. There are two types of message brokers: AMQP/JMS style (with producers/consumers and explicit message processing acknowledgment) and log-based (all messages in a partition are read by a consumer). Log-based has similarities with the DB replication log. You could even plug a change data capture system on top of it to turn a DB into an event stream.
The final chapter exposes the author’s vision about the future of data. As there’s no chance for a one-size-fits-all solution, we come to the realization that some data integration is mandatory, and batch processing and event streams help with that. Thinking about that and dataflows as transformations, the author proposes unbundling the components of a database and building an application by composing those loosely coupled components. Anyway, we must be aware of fault tolerance and performance balance. And, finally, there’s a section about data ethics.
Depending on your knowledge and expertise you’ll find more or less new stuff in this book. For example, once you have a little background with databases you’ll probably know most of Part I, but even in these foundation chapters, there are some gems, such as explaining serialization in depth, including Protocol Buffer, Thrift or Avro. I find these chapters useful even if they don’t provide a lot of new information, because they help in remembering details and tradeoffs. Comparing overlapping technologies and designs is always useful.
Despite the title, it’s also a must read if you’re interested in distributed systems, as a consequence of Internet-scale data needing distributed databases. The book spends many pages around distributed systems failures, consensus…
While it’s not a book about specific technologies, it mentions many of the state-of-the-art ones with context and explaining the differences. You can learn how come ZooKeeper seems omnipresent or the different guarantees provided by different distributed databases.
Although it’s mostly abstract and generic, it also exposes real scenarios and tooling best practices, such as “how do people load the output of a MapReduce job into a database”.
References! I mean… REFERENCES! There are chapters with +100 references. You might be interested in reading all of them if, let’s say, you’re preparing for a position in Google, but otherwise it’s overwhelming. I will certainly go back to the book from time to time to recall some topics, and that will be a great moment to go through the references :) In fact, the heavy use of internal references through the book makes me think that the author is aware of this usage pattern ;)
It does a remarkable job of capturing the essence of the underlying common characteristics. The climax of this might be the parallelism between creating an index in a DB and creating a new follower replica, but there are many cases. The last chapter about the future leverages that into a broader vision of data systems.
As a guide for future work and application design, the idea of unbundling the database is really powerful and feels like the natural consequence of the essence distillation effort that the author does. He goes to the underlying characteristics and the problem that features solve and propose a loosely coupled design for applications. Something worth exploring!
As a final closure, he writes down his thoughts about data ethics. There are many relevant thoughts and memorable sentences, I’ll just pick one:
A blind belief in the supremacy of data for making decisions is not only delusional, it is positively dangerous
IMHO this book is a modern classic, a must read for every software engineer and developer. I’m certain that it will be reread it from time.
Click to see my collection of notes. It’s not a summary at all but a mix of highlights, definitions, important stuff or things that I didn’t know.
(…) there are datastores that are also used as mes‐ sage queues (Redis), and there are message queues with database-like durability guarantees (Apache Kafka). The boundaries between the categories are becoming blurred.
Reliability: the system should continue to work correctly (performing the correct function at the desired level of performance) even in the face of adversity (hardware or software faults, and even human error).
Scalability: as the system grows (in data volume, traffic volume, or complexity), there should be reasonable ways of dealing with that growth.
Maintainability Over time, many different people will work on the system (engineering and operations, both maintaining current behavior and adapting the system to new use cases), and they should all be able to work on it productively.
Note that a fault is not the same as a failure. A fault is (…) one component of the system deviating from its spec, whereas a failure is when the system as a whole stops providing the required service to the user.
there is a move toward systems that can tolerate the loss of entire machines, by using software fault-tolerance techniques in preference or in addition to hardware redundancy.
we need to succinctly describe the current load on the system; only then can we discuss growth questions. Load can be described with a few numbers which we call load parameters. The best choice of parameters depends on the architecture of your system: it may be requests per second to a web server, the ratio of reads to writes in a database, the number of simultaneously active users in a chat room, the hit rate on a cache, or something else.
In a batch processing system such as Hadoop, we usually care about throughput —the number of records we can process per second, or the total time it takes to run a job on a dataset of a certain size. In online systems, what’s usually more important is the service’s response time—that is, the time between a client sending a request and receiving a response.
Latency and response time are often used synonymously, but they are not the same. The response time is what the client sees: besides the actual time to process the request (the service time), it includes network delays and queueing delays. Latency is the duration that a request is waiting to be handled—during which it is latent, awaiting service.
Scaling up vs. scaling out
It is conceivable that distributed data systems will become the default in the future, even for use cases that don’t handle large volumes of data or traffic.
Operability + Simplicity + Evolvability
The network, hierarchical and document models
Mostly equivalent to property graphs.
Related to the semantic web through RDF, which is a format to specify data combinable with other sources using the same namespaces and attributes.
SPARQL is a query language for triple stores using the RDF model.
Subset of Prolog, close to triple-store model but generalized.
Just like Prolog, it’s based on rules (that can be reusable among different queries).
Log: append-only data file.
Index: additional structure derived from the primary data for improving performance.
Log compaction: removal of old values of keys (by keeping just the last one).
Hash indexes: n-memory hash from key to a disk position. Example: Bitcask (default storage engine in Riak).
Tombstone: record to mark deletion in a log.
Append-only logs are really convenient:
… but have limitations:
Sorted String Table (SSTable): storage segments that are sorted by key , and each key appears only once within each merged segment. Advantages over log segments with hash indexes:
Constructing and maintaining SSTables:
Caveat: on crash, the most recent entries at the memtable would be lost, so they have to be written to a (unsorted) “emergency” log as well.
Standard index implementation in many databases.
Key-value pairs sorted by key, allowing efficient key-value lookups and range queries.
While previous log-structured indexes break the database down into variable-size segments, B-trees break the database down into fixed-size blocks and read one page at a time. This corresponds more closely to the underlying hardware.
Note: if you want to learn more about B-Trees, read SQL Performance Explained (slides).
As a rule of thumb, B-trees are faster for reads, and LSM-trees are faster for writes. But that’s often sensitive to workload, so you must test them with your particular workload.
Advantages of LSM-trees:
Downsides of LSM-trees:
OLTP (online transaction processing): access pattern of looking up a small number of records by key, with an index, and then insert or update based on user’s input.
OLAP (online analytic processing): scan a few columns over a huge number of records and calculate aggregate statistics.
Data warehouse: separate, read-only database for analytics purposes.
Most data warehouses are modeled in star schema: a large fact table with attributes and dimensions (FKs to other tables).
Analytics query often specify a subset of columns, making column-oriented storage more suitable.
Column values often are repetitive, making bitmap encoding compression very attractive (storing a value table and its indexes instead of the value itself), and even compressing the indexes themselves.
Using language-specific formats is not recommended (you get coupled to languages, and they’re usually not efficient), and widespread encoding formats such as XML, JSON or CSV are limited (ambiguous, incomplete and not always without proper schema specification support).
They’re binary encoding libraries that require an schema.libraries that require an schema.
Thrift has three different binary encoding formats. It uses aliases for fields. Protocol Buffers is quite similar.
Forward compatibility (old code can read records that are written by new code) is guaranteed:
Backwards compatibility (can new code read old records?):
Removing a field is just like adding a field, with backward/forward concerns reversed (you can only remove a field that is optional) and you can’t reuse the tag number.
Changing the type can lead to precision loss.
Protocol buffers doesn’t have array types but a repeated keyword. You could change from optional to repeated .
Thrift does have array types. You can’t change from single-valued to multi-valued as with Protocol Buffers, but you can have nested lists.
Thrift wasn’t a good fit for Hadoop, specially for dynamically generated schemas and dynamic languages, so Apache came out with Avro.
Schema-based. Both reader and writer must have compatible schemas, because the binary encoding is just a concatenation of the values, without type or field information.
Reader and writer schemas don’t have to be the same, though. The library compares both schemas and perform needed transformations, such as different order or filling missing values with default ones.
For schema evolution, you must ensure that new and removed fields have default values. null is supported through union types.
The schema can be bundled in the file if it has a lot of records (so it’s not a problem from storage point of view). A database could store a versioned list of schemas. Finally, for network communication, the schema can be negotiated in advance.
You want to distribute data because of scalability, fault tolerance/HA and/or latency.
A first approach for scaling to higher load is a bigger machine, but the cost grows faster than linearly and is limited to a single geographic location.
This part focuses on “shared-nothing architectures”.
Ways to distribute data: replication vs. partitioning.
Clients must send the writes to the leader, which propagates the changes to the followers through the replication log. Reads can be queried both to leaders and followers.
Followers can be synchronous or asynchronous. Synchronous guarantee that the change is propagated, but might cause writes to fail if they’re not available.
Follower failure: Catch-up recovery. As followers keep a log, if there’s been a crash or network interruption, they can request the data from the last one processed.
Leader failure: Failover. One of the followers needs to be promoted to leader, clients need to be reconfigured, and other followers need to start consuming data from the new leader.
Synchronous replication is not feasible in practice (a single node failure can bring the whole system down), so you must deal with asynchronous replication lag. That comes with problems.
If a user performs a change and immediately reads from a follower that doesn’t have the change yet the data won’t be read. Read-after-write consistency might be needed (“read-your-writes”). Techniques:
A user might do a query to one follower and then to another one that is behind the previous one (in terms of replication), so the user perception might be “going back in time”. Monotonic reads is having the guarantee that this won’t happen. You can guarantee this by having users asking to the same followers.
Causally relationships should be read in order. For example, a question-answer conversation must be read by people watching it in order. This is a problem at sharded systems. You might need to record causal relationships explicitly.
In multi-leader configurations each leader is also a follower to the other leaders.
A conflict resolution system is needed.
You probably need multi-leader replication in multi-datacenter setups. Otherwise the whole system might be slower (due to the need of routing all reads through the leader).
Offline devices support is another case of multi-leader replication, as they also support writes.
Real-time collaborative editing are essentially multi-leader replication cases.
There are several approaches. One of the most interesting one is converging towards a consistent state, because it avoids locks or simulating a single leader approach. The goal is allow conflicting writes but set up a conflict resolution strategy so all DBs end in the same state. Strategy example:
The most common approaches are circular, star and all-to-all topologies.
Star and all-to-all topologies make all data go through all nodes, one by one. Ids are assigned to avoid infinite loops. One failure can stop propagation. On the other hand, all-to-all strategies can have other issues, as ordering is not granted, potentially causing casualty errors. To avoid that you can use version vectors.
Clients can send writes to any DB. Dynamo, Riak, Cassandra and Voldemort use this approach.
In these setups, a client send both writes and reads to multiple nodes. That way you don’t even need to handle failovers.
Mechanisms to cope with failover:
Quorum reads and writes: given n replicas, if the write must be confirmed by w writes and r nodes must be queried in each read, w + r > n . As at least one of the node r nodes must be up to date, we’d get an up-to-date value. This also allows some tolerance to unavailable nodes:
Normally, reads and writes are sent to all nodes. w and r determine the number of nodes that we wait for.
Even with quourum, edge cases are likely to happen, because of hardware failures, failed reads, replication lag and other. This kind of system is suited for applications that can cope with eventual consistency. Anyway, monitoring replication lag and staleness is a key practice in a healthy system.
Sloppy quorum: in case of not reaching w nodes in a write, instead of failing, choose a different set of w nodes (“overriding the normal sharding policy”). This assumes that you could get an old value. When the original w nodes are back, push the write there.
In multi-datacenter replication, systems like Cassandra, Voldemort or Riak keep most of the communication local by waiting only for nodes within the same datacenter (although the writes are sent asynchronously to all replicas in all datacenters).
Last write wins (LWW): discarding concurrent writes. When there are concurrent writes, they get assigned a timestamp, and older keys are overwritten. This guarantees convergence at the cost of data loss. A recommended way of using Cassandra is using a UUID as the key, so each write is unique (and immutable), so concurrent updates are impossible.
Note that two operations are said to be concurrent if they don’t know about each other, no matter the time. That’s because time in distributed systems is complicated.
Clients can send the last known version number, and the server can return all the values that have not been overwritten, and the actual version number after the write. That way, the server can determine if operations are concurrent based on the version numbers.
Although previous algorithm ensures that there’s no data loss, clients must resolve the conflict, for example, with unions. But plain unions would drop deletions. To avoid that, instead of deleting items, items are marked as deleted, thus allowing unions.
In leaderless replication, instead of keeping a single version number, a different version number is tracked in each replica, allowing to differentiate between an overwrite and a concurrent write.
The main reason for partitioning (breaking up data into partitions) is scalability.
Partitioning is usually combined with replication.
Partitioning schema is important because if it were skewed one node might have to handle most of the load, making partitioning inefficient.
Given a key with a continuous range of values, we can split it among nodes. Boundaries could be chosen automatically or by an administrator. Within a partition we can keep the keys in order, making range scans easy. This can lead to hot spots. For example, if the key were the creation date, all writes would go to the same node. In these cases you could prepend a prefix that distributed the load among the nodes.
To distribute the load evenly you could apply a hash function to the key and then partition by the range of the hash of the key. Although this balances the load, it makes range searches by key very inefficient. Cassandra uses a compromise: a table can be declared with a compound PK, and only the first column is hashed. So, queries where the first column has a fixed values are efficient (as all data is at the same node, and sorted by the rest of columns).
Even hash partitioning can’t avoid hotspots if one key is used intensively. In those scenarios you could add random numbers in front of “hot keys” to balance the partitioning, at the cost of increasing the bookkeeping (you have to query all nodes for queries on hot keys. That tradeoff nowadays must be handled at application level.
While secondary indexes are the bread and butter to relational databases, distributed DBs have avoided them because they don’t match well with partitioning.
The index contains the keys to the documents within its partition that contain the value. With this system, each partition is completely separated and contains the secondary index for the documents at it. Also called “local indexes”. Using it requires querying all partitions. This schema is prone to tail latency amplification. Most vendors recommend a partitioning scheme where secondary indexes can be served from the same partition, but that’s not always possible.
Instead of having a local index for each partition, the index itself can be partitioned. For example, we might have a secondary index on a term and have the index of values a-m in one partition and the index of n-z in other. This way you only have to query one index.
It makes reads more efficient but writes are slower, as a single document write might have to update indexes at many partitions.
Rebalancing: moving data (partitions) from one node to another.
Don’t use hash mod N , because if N changes you have to move your data all around.
Using a fixed number of partitions, relatively larger than N, and assign them to the nodes, is a easy and good approach. The main problem is choosing a number that is not too low (because it’s an upper boundary for N) nor too high (managing partitions has a cost).
Specially for key-range partitioning, but also useful for hash partitioning, there’s dynamic partitioning: the DB starts with a single partition (or a preconfigured pre-splitting and as ranges are populated, partitions are split or merged. This optimizes the partition overhead.
Previous approaches were proportional to the size of the dataset. For hash partitioning you can use partitioning proportionally to nodes: each node has a fixed number of partitions. This keeps the size of each partition fairly stable.
Balancing is expensive, so automatic balancing can lead to surprises to avoid network or IO overload. Putting a human in the loop is often convenient.
In order to do a request, we need to know which node to connect to. This is a problem of service discovery. We could delegate on the client, on a routing tier, or in the nodes themselves. Be it the routing engine, the client or the nodes, we need a mechanism that learns about changes in partitions so the reqeusts are sent to the right nodes. Many technologies (HBase, Kafka…) use ZooKeeper for service discovery. It talks to the nodes and keep track of the IPs and partitions. Cassandra takes a different approach, with a gossip protocol among the nodes to disseminate the changes. If a request is sent to the wrong node, it’s redirected.
Transaction guarantees are often described as ACID: Atomicity, Consistency, Isolation and Durability (although definitions are vague and ACID has become a marketing term).
Atomicity: a transaction is atomic if when an error occurs in the middle of it, all the writes are discarded or undone and the transaction is aborted.
Consistency: if the transaction starts in a valid state (from the standpoint of the variants of your application) and writes preserve the validity, the invariants are always satisfied. Consistency actually belongs to the application, that must take advantage of Atomicity, Isolation and Durability to be consistent.
Isolation: if two transactions occur concurrently, they must run as if they were run in isolation (serially).
Durability: if the transaction ends successfully (written to a non-volatile store), the data won’t be lost. This often implies writing to a write-ahead log.
In order to change data in several object (rows, documents…) the DB must support a way to group operations in a transaction.
Single-object operations, such as updating one record or incrementing a counter (if the database provides that mechanism) are not considered transactions.
ACID has a performance cost. Knowing the isolation levels can help you choosing the right tool.
When you read you only read committed data. When you write, you only overwrite committed data.
Dirty writes are often prevented with row locks that transactions must get before modifying a value.
Dirty reads could be prevented with a lock as well, but that would cause long transactions to block reads for a long time. Instead, both the old value and the new one are remembered until the transaction ends. Queries outside the transaction read the old one, queries inside read the new one.
Nonrepeatable read or read skew: two consecutive queries return different values for the same field (because a transaction changed it). That’s considered acceptable under read committed isolation level, but it’s not for long running queries such as the ones under analytical queries or backups. Snapshot isolation fixes this: each transaction reads from a consistent snapshot of the DB.
It’s usually implemented with MVCC (multi-version concurrency control): each transaction gets a unique identifier, and when it modifies some data, the new one is tagged with that id. That way the DB know what data should be seen by each transaction.
Reads are based in visibility rules: a record is visible if the transaction that created that object was already committed and the object is not marked for deletion by a committed transaction. This is performed by remembering ongoing transactions when another transaction begins.
In concurrent read-modify-write cycles, you can lose updates (because the first transaction doesn’t know about the first one).
In replicated DBs you can’t use locks or compare-and-set, because there are several copies. In these scenarios, using special data structures that allow fixing the conflict at application level, or that take advantage of commutative properties of some operations, are the way to go. The alternative, last write wins, is prone to lost updates.
Write skew: two transactions read the same objects, and then update some of those objects. It’s hard to control. You might be able to do it with true serializable isolation, triggers and materialized views, or explicit locks.
The pattern is doing a select that then, depending on application logic, performs a write. This, having a write in one transaction changing the result of a search query in another, is called a phantom. You can’t use a lock to avoid them, because there’s no object to attach it to it.
A solution is materializing conflicts: adding rows that allow explicit locking. It’s not a great solution because it’s error-prone and leaks a concurrency control into the data model. Serializable isolation is preferable.
Serialization provides the strongest guarantees, but with a cost.
Single-threaded systems allow executing transactions actually in serial, and can perform well. They’re nevertheless limited to a CPU core, and should only be used if all data fits in memory.
To get rid of human interaction and reduce network communication, you can perform the complete transaction in a safe stored procedure.
In 2PL, if a transaction wants to read a object written by another one, it must wait. And if a transaction wants to write an object read by another, it must wait as well.
It’s implemented with locks in shared mode (for reads) and exclusive mode (for reads). Exclusive lock imply waiting.
The main problem is performance. Partly by management overhead but mostly because the loss of concurrency. They’re really slow at high percentiles.
For phantom prevention a DB must implement predicate locks, which are locks over objects that match some search condition (even if they don’t exist yet!).
Predicate locks are expensive to check, so index-range locks are a good compromise. Instead of locking the precise ranges, they leverage existing indexes to create the locks on them. They’re broader, but much faster.
SSI is optimistic in the sense that it allows transaction to operate as if nothing wrong happened, and performs the checks at commit. If everything was OK (if the transactions actually run in serial) it allows committing, otherwise transaction is aborted.
This performs good if contention (access to the same resource) is low and there are spare resources.
If the transaction makes decisions based upon a read in the beginning, on commit the premise must be checked again.
Detecting stale MVCC reads (uncommitted writes before the read).
Detecting writes that affect prior reads (write after the read).
In single-machine there’s no essential reason for things to go wrong. As a single computer is deterministic, the problems are always faulty software. In distributed systems that’s no longer true, because you must face real-world issues, making the system non-deterministic and prone to partial failures.
There’s a myriad of ways a network can fail, and some components need to detect failures to react (for example, a load balancer). Timeouts are the only reliable mechanism for detecting a fault.
Monotonic clocks are clocks that always move ahead, valid for measuring duration.
Spanner uses time of the day for generating transaction ids for snapshot isolation across datacenters. That’s normally hard to achieve, but in their case it’s based on a TrueTime API that returns the earliest possible time and the latest possible time. If given two pairs they don’t overlap, they have confidence that one transaction happened before another. That helps avoiding causality issues. In order to finish a commit it must wait until all possible transactions that might’ve read values also end. So, to reduce wait time it must improve clock synchronization.
One node own knowledge can’t be trusted because of individual issues (GC…), and because node faults can’t be reliably distinguished from network faults, so “the truth” is determined by a absolute majority of more than half the nodes.
Fencing tokens (that can be implemented returning a monotonic token when the lock is granted) fix the problem of granting a lock to a unresponsive client.
Byzantine faults are problems caused by nodes that “lie”, like sending corrupted messages or claiming something that is not the truth (from a protocol standpoint). This book assume that these kind of problems don’t appear because nodes are under your control and hardware failures are assisted at hardware level. Byzantine faults often need to be addressed by consensus of super majorities (two thirds).
As algorithms need to be defined independently from the hardware, we need to define which assumptions they can work with. This is done with system models:
Models in regard of timing assumptions:
Models in regard of node failures:
An algorithm is considered correct in a system model if it always satisfies its properties in all situations that can happen in that system model.
There are two kind of properties:
A good approach for building fault-tolerant systems is creating general-purpose abstractions with useful guarantees, and then let software rely on those guarantees.
Linearizability is making a system look like there’s only one copy of the data and all operations are atomic.
It’s not the same than serializability, which is an isolation property of transactions. Linearizability is all about the recency of individual records.
Important in many applications:
Causal order is not a total order. Total order allows any two elements to be compared. Linearizability has total order, and implies causality.
Causality can be preserved without linearizability, and it’s the strongest possible consistency model that doesn’t slow down because of network delays, and remains available in face of network failures.
In order to determine causality, the dependencies must be tracked, for example with version vectors. Nevertheless, storing all that is impractical and can be replaced by sequence number ordering. If we can just add a monotonic integer that’s enough to be consistent with causality. A replication log is also valid.
Without a single leader, generating sequence numbers is not easy, and approximations are often used, such as timestamps or pre-allocated ranges, which are inconsistent with causality.
There’s other simple method which is consistent: Lamport timestamps. Essentially, it’s keeping track of the number of operations performed to keep track of the causality. Each node has an identifier and a counter. The identifier provides uniqueness. Each operation goes with the counter, and if any node or client receives a bigger counter than the one stored, it’s updated.
Lamport timestamps can provide causally dependency at a lower storage cost than version vectors, but version vectors can detect concurrent operations.
Total ordering is not enough for guarantees such as uniqueness, because the order is only valid afterwards, not for taking decisions in the moment.
Total order broadcast is a protocol aimed to provide total order guarantees in distributed environments, without leaders. It requires reliable delivery and total ordered delivery.
Consensus is a hard topic required for many needs such as leader election and atomic commits. In the asynchronous model, it’s impossible to solve, but if nodes can use timeouts the problem becomes solvable.
In 2PC, applications write on several nodes as usual. For the commit, a prepare signal is first sent by the coordinator, and once both answers are received, the final commit is sent. A global transaction id is shared in all requests. The prepare signal is only returned by the nodes if they can guarantee that the commit can take place, so nodes actually perform all needed tasks, but without committing. When the coordinator receives all OKs from the prepare , it’s written to disk into a log and sent to the nodes.
It’s a blocking protocol because nodes might have to wait for a coordinator recovery after the prepare . In theory it could be done non-blocking, but it’s not practical.
Most algorithms decide upon a sequence of numbers, so they become total order broadcast algorithms.
In single-leader replication that is not manually solved, on failover a new leader must be chosen, which is closer to consensus.
Consensus algorithm only guarantee uniqueness of leader within the same epoch, which roughly is a monotonic number that is increased when a new leader is chosen. When a decision must be taken, the leader first asks whether it’s still the leader, and then the voting takes place. At least one of the nodes that confirmed the leadership must take place in the voting.
The biggest differences between fault-tolerant consensus algorithms and 2PC is that in 2PC the leader is always the same and it must wait for all followers.
ZooKeeper is seen as a distributed key-value store that implements fault-tolerant consensus and has other properties:
UNIX design philosophy (small tools, composability…) created a really powerful tooling for batch processing.
MapReduce processes are analogous to UNIX tools, but instead of operating on stdin and stdout they use distributed filesystems such as HDFS.
HDFS is based on shared-nothing principle. Consists of a daemon in every node, and a central NameNode that keeps information of which file blocks are stored on which machine. Conceptually it’s one big filesystem over a machine network.
MapReduce: 1.- Split files in records. 2.- Call the mapper function, extract a key from each record, leave the value empty. 3.- Sort all key-value pairs by key. 4.- Call the reducer function to iterate over sorted key-value pairs.
To create a MapReduce job you need to implement the mapper and the reducer functions:
The advantage over UNIX tools is that it can be parallelized.
The scheduler tries to run the mapper where there’s a replica of the input file, putting the computation near the data.
As the number of problems that you can solve with a single MapReduce job is limited, you usually work with worflows, chained jobs, which materialized state among them.
There’s no support for “traditional joins”, so specific algorithms must be developed. For example, sort-merge joins, where the mapper outputs different data for a given key (for example, user birth date followed by activity) and then the reducer processes the information by user.
If some keys have much more data than others (such as celebrities in a social network), called hot spots, reducers job might have to be split among several nodes.
Hadoop encourages storing a lot of raw data to be processed later.
MapReduce jobs need to finish before the next can start, and that often makes mappers redundant (as they just read the output of the previous jobs). Also forces to write a lot of temporary data.
To overcome that, some dataflow engines such as Spark, Tez and Flink appeared, handling the workflow as a single job. There’s no role (map/reduce) separation, just “operators”.
Graphs require iterative processing, which can’t be expressed in MapReduce terms, because it only performs a single pass over the data. To overcome this, the Pregel model, in each iteration a function is called for each vertex, and the node remembers its state (similar to an actor model, but with fault-tolerance).
UDP multicast, ZeroMQ, direct RPC/HTTP calls… They require application level code to be aware of possible data loss.
Intermediate server similar to a database but optimized for streaming processing. Makes consumers asynchronous. Often provides unbounded queuing. Differences from databases:
Multiple consumers: load balancing vs. fan-out.
Acknowledges and redelivery: customers must send a confirmation when they process the message. This can cause message reordering if load-balancing is used.
They combine DB durability with message brokers low-latency notifications.
Logs vs. traditional messaging:
Conclusion: if message processing is slow and you need to parallelize processing on a message-by-message basis, and ordering is not important, a traditional message broker is preferable. If ordering is important and processing is fast, logs-based works well.
Reading the “internal” DB replication log can be seen as a way to detect data changes and react to them (such as writing to a search index).
Event Sourcing and CQRS are techniques closely related to that. For example, we can use the immutable log for the events and derive read-only views.
It also affects concurrency control. On one hand, it has the problem that consumers are asynchronous, so providing read-your-own-writes guarantee might become harder. On the other, it allows addressing concurrency differently: you could store an atomic event that completely resembles the user action.
Uses of stream processing:
Reasoning about time is often based on windows of time. Types:
Reasoning about data flows is hard, and you probably need to derive data. Using total order (based on event sourcing logs or change data capture) is a way to handle that easily. It’s also a way to avoid distributed transactions for data integration in some cases.
Thinking about what a database provides (secondary indexes, materialized views, replication logs…) and how a index is created (similar to setting up a new follower replica), you can begin seeing the dataflow across an entire organization as a big database. Assuming that there’s no single storage system that can fit all needs, there might be two approaches for providing a cohesive system:
Designing Applications Around Dataflow: application code might be seen as a derivation function, listening to changes of the data, creating derived datasets as needed and removing the need for querying outside the local machine (as the local data would always be updated).
The derived dataset can be seen as the meeting point between the read and write paths, and shifting it is interesting. Depending on the needs, the two paths might do more work.
Taking this dataflow idea to the limit, and given the improved features of web clients, maybe we should consider designing all applications like messaging applications are, pushing changes to the clients all the time.
Even if the application uses a data store with strong guarantees, or even immutability, it’s not free of errors. Generally speaking, end-to-end safety measurements are needed in order to guarantee correctness.
The so called “eventual consistency” conflates two topics: timeliness and integrity. While integrity is a must, timeliness and other constraints can often be loosely interpreted as part of the normal business rules.
Either to protect us from hardware failures or from software bugs, we should consider designing for auditability.
This section must be read, can’t be summarized.