Large-scale Incremental Processing Using Distributed Transactions and Notifications
Large-scale Incremental Processing
Using Distributed Transactions and Notifications
Daniel Peng and Frank Dabek,
Google, Inc.
Updating an index of the web as documents are
crawled requires continuously transforming a large
repository of existing documents as new documents ar-
rive. This task is one example of a class of data pro-
cessing tasks that transform a large repository of data
via small, independent mutations. These tasks lie in a
gap between the capabilities of existing infrastructure.
Databases do not meet the storage or throughput require-
ments of these tasks: Google’s indexing system stores
tens of petabytes of data and processes billions of up-
dates per day on thousands of machines. MapReduce and
other batch-processing systems cannot process small up-
dates individually as they rely on creating large batches
for efficiency.
We have built Percolator, a system for incrementally
processing updates to a large data set, and deployed it
to create the Google web search index. By replacing a
batch-based indexing system with an indexing system
based on incremental processing using Percolator, we
process the same number of documents per day, while
reducing the average age of documents in Google search
results by 50%.
1 Introduction
Consider the task of building an index of the web that
can be used to answer search queries. The indexing sys-
tem starts by crawling every page on the web and pro-
cessing them while maintaining a set of invariants on the
index. For example, if the same content is crawled un-
der multiple URLs, only the URL with the highest Page-
Rank [28] appears in the index. Each link is also inverted
so that the anchor text from each outgoing link is at-
tached to the page the link points to. Link inversion must
work across duplicates: links to a duplicate of a page
should be forwarded to the highest PageRank duplicate
if necessary.
This is a bulk-processing task that can be expressed
as a series of MapReduce [13] operations: one for clus-
tering duplicates, one for link inversion, etc. It’s easy to
maintain invariants since MapReduce limits the paral-
lelism of the computation; all documents finish one pro-
cessing step before starting the next. For example, when
the indexing system is writing inverted links to the cur-
rent highest-PageRank URL, we need not worry about
its PageRank concurrently changing; a previous MapRe-
duce step has already determined its PageRank.
Now, consider how to update that index after recrawl-
ing some small portion of the web. It’s not sufficient to
run the MapReduces over just the new pages since, for
example, there are links between the new pages and the
rest of the web. The MapReduces must be run again over
the entire repository, that is, over both the new pages
and the old pages. Given enough computing resources,
MapReduce’s scalability makes this approach feasible,
and, in fact, Google’s web search index was produced
in this way prior to the work described here. However,
reprocessing the entire web discards the work done in
earlier runs and makes latency proportional to the size of
the repository, rather than the size of an update.
The indexing system could store the repository in a
DBMS and update individual documents while using
transactions to maintain invariants. However, existing
DBMSs can’t handle the sheer volume of data: Google’s
indexing system stores tens of petabytes across thou-
sands of machines [30]. Distributed storage systems like
Bigtable [9] can scale to the size of our repository but
don’t provide tools to help programmers maintain data
invariants in the face of concurrent updates.
An ideal data processing system for the task of main-
taining the web search index would be optimized for in-
cremental processing; that is, it would allow us to main-
tain a very large repository of documents and update it
efficiently as each new document was crawled. Given
that the system will be processing many small updates
concurrently, an ideal system would also provide mech-
anisms for maintaining invariants despite concurrent up-
dates and for keeping track of which updates have been
The remainder of this paper describes a particular in-
cremental processing system: Percolator. Percolator pro-
vides the user with random access to a multi-PB reposi-
tory. Random access allows us to process documents in-
Percolator Library
Bigtable Tabletserver
Figure 1: Percolator and its dependencies
dividually, avoiding the global scans of the repository
that MapReduce requires. To achieve high throughput,
many threads on many machines need to transform the
repository concurrently, so Percolator provides ACID-
compliant transactions to make it easier for programmers
to reason about the state of the repository; we currently
implement snapshot isolation semantics [5].
In addition to reasoning about concurrency, program-
mers of an incremental system need to keep track of the
state of the incremental computation. To assist them in
this task, Percolator provides observers: pieces of code
that are invoked by the system whenever a user-specified
column changes. Percolator applications are structured
as a series of observers; each observer completes a task
and creates more work for “downstream” observers by
writing to the table. An external process triggers the first
observer in the chain by writing initial data into the table.
Percolator was built specifically for incremental pro-
cessing and is not intended to supplant existing solutions
for most data processing tasks. Computations where the
result can’t be broken down into small updates (sorting
a file, for example) are better handled by MapReduce.
Also, the computation should have strong consistency
requirements; otherwise, Bigtable is sufficient. Finally,
the computation should be very large in some dimen-
sion (total data size, CPU required for transformation,
etc.); smaller computations not suited to MapReduce or
Bigtable can be handled by traditional DBMSs.
Within Google, the primary application of Percola-
tor is preparing web pages for inclusion in the live web
search index. By converting the indexing system to an
incremental system, we are able to process individual
documents as they are crawled. This reduced the aver-
age document processing latency by a factor of 100, and
the average age of a document appearing in a search re-
sult dropped by nearly 50 percent (the age of a search re-
sult includes delays other than indexing such as the time
between a document being changed and being crawled).
The system has also been used to render pages into
images; Percolator tracks the relationship between web
pages and the resources they depend on, so pages can be
reprocessed when any depended-upon resources change.
2 Design
Percolator provides two main abstractions for per-
forming incremental processing at large scale: ACID
transactions over a random-access repository and ob-
servers, a way to organize an incremental computation.
A Percolator system consists of three binaries that run
on every machine in the cluster: a Percolator worker, a
Bigtable [9] tablet server, and a GFS [20] chunkserver.
All observers are linked into the Percolator worker,
which scans the Bigtable for changed columns (“noti-
fications”) and invokes the corresponding observers as
a function call in the worker process. The observers
perform transactions by sending read/write RPCs to
Bigtable tablet servers, which in turn send read/write
RPCs to GFS chunkservers. The system also depends
on two small services: the timestamp oracle and the
lightweight lock service. The timestamp oracle pro-
vides strictly increasing timestamps: a property required
for correct operation of the snapshot isolation protocol.
Workers use the lightweight lock service to make the
search for dirty notifications more efficient.
From the programmer’s perspective, a Percolator
repository consists of a small number of tables. Each
table is a collection of “cells” indexed by row and col-
umn. Each cell contains a value: an uninterpreted array of
bytes. (Internally, to support snapshot isolation, we rep-
resent each cell as a series of values indexed by times-
The design of Percolator was influenced by the re-
quirement to run at massive scales and the lack of a
requirement for extremely low latency. Relaxed latency
requirements let us take, for example, a lazy approach
to cleaning up locks left behind by transactions running
on failed machines. This lazy, simple-to-implement ap-
proach potentially delays transaction commit by tens of
seconds. This delay would not be acceptable in a DBMS
running OLTP tasks, but it is tolerable in an incremental
processing system building an index of the web. Percola-
tor has no central location for transaction management;
in particular, it lacks a global deadlock detector. This in-
creases the latency of conflicting transactions but allows
the system to scale to thousands of machines.
2.1 Bigtable overview
Percolator is built on top of the Bigtable distributed
storage system. Bigtable presents a multi-dimensional
sorted map to users: keys are (row, column, times-
tamp) tuples. Bigtable provides lookup and update oper-
ations on each row, and Bigtable row transactions enable
atomic read-modify-write operations on individual rows.
Bigtable handles petabytes of data and runs reliably on
large numbers of (unreliable) machines.
A running Bigtable consists of a collection of tablet
servers, each of which is responsible for serving several
tablets (contiguous regions of the key space). A master
coordinates the operation of tablet servers by, for exam-
ple, directing them to load or unload tablets. A tablet is
stored as a collection of read-only files in the Google
of 14


Top250 周榜 月榜