Engineering
Engineering

How to make MongoDB not suck for analytics

 by on June 26, 2018

TLDR: Row stores are fast to write but slow to read. Column stores are fast to

read but slow to write. Load data from Mongo into Parquet files for fast

querying using AWS Athena.


MongoDB is a schema-less NoSQL document store that uses a JSON-like format for

each document. Each document in Mongo WiredTiger is stored as a contiguous

binary blob, which makes our MongoDB instance a row store. AWS Athena is a

SaaS offering by Amazon that queries files on S3 using Presto, a distributed

SQL query engine that can query high-performance columnar formats like Parquet

and ORC.


Here at Scale we use MongoDB backed by WiredTiger as our primary data store.

This blog post describes why and how we load data from MongoDB into AWS Athena

using Parquet files in order to get fast ad-hoc analytics queries on our data.

Row stores and Column stores



Suppose you want to get the sum of a single column(field) over all the

rows(documents) in a table(collection).


With a Row Store, without an index, to get a field you need to fetch the

document, and to get the same field for all documents you would have to

essentially load the entire table from disk. All the columns in your table

(even columns that do not appear in your query) get fetched as well, which is

slow:

Data Fields



You can speed up this query by building a covering index. However, each index

you add increases disk usage and slows down inserts. Also, when doing data

exploration it can be difficult to predict what indices you need ahead of

time.


With a Column Store, the query is much more efficient. A column store puts

data from the same column together. This would allow us to just retrieve the

columns we are filtering / aggregating over when doing queries. This allows us

to get good performance on ad-hoc queries, without indices:

Column Fields



Column stores, however, make it very difficult to update a single record - due

to the way data is arranged you have to move all of the data to change one

record. Also, column stores require us to commit to a schema, to define the

columns ahead of time.


Trading off read and write performance


The slowness of column-store writes is related to the size of the table,

because you need to rewrite the whole table to change a single record. By

representing each row-store table as multiple column-store tables, we can get

the column store to have faster writes at the expense of slowing down its

reads.


For our implementation, this means we map each Mongo collection to multiple

Parquet files (each file actually contains multiple row groups, with each row

group being column-major, but we use files as a unit for convenience), and

refresh all the documents in a Parquet file whenever any one of them changes.

We use smaller file sizes than what would be optimal for reads, in order to

make writes faster and reduce replication lag.

Cache Invalidation


When updating the column store from reading the row store, we need to


  1. Figure out which records are stale
  2. Update those records efficiently


To determine staleness, we need to know the identity of the document and the

time of modification. This data is obtained by reading the Mongo oplog, a data

structure that represents all inserts/updates/deletes to the database

chronologically, available on Mongo replica sets for internal use in

replication from primary to secondary nodes within the cluster.


For efficiency, we want to ensure that documents to be modified belong in as

few files as possible. We achieve this by grouping records by their Mongo

ObjectIDs. We are making use of the fact that, like in

Generational Garbage Collection, young documents tend to get modified more than old ones. We maintain an

index recording the ObjectID ranges of each file, starting a new file whenever

the newest bucket is filled with more than N documents.

class Buckets(object):
    def __init__(self, size_limit):
        self.size_limit = size_limit
        self.buckets = [ObjectId("0" * 24)]
        self.max_bucket = set()

    def find_le(self, x):
        'Find rightmost value less than x'
        i = bisect.bisect_left(self.buckets, x)
        if i:
            return i - 1
        raise ValueError

    def get_bucket(self, oid):
        i = self.find_le(oid)

        # oid is in one of lower buckets
        if i < len(self.buckets) - 1:
            return self.buckets[i], self.buckets[i + 1]

        # oid is in maximum bucket
        self.max_bucket.add(oid)

        if len(self.max_bucket) < self.size_limit:
            return self.buckets[i], max(self.max_bucket)

        # maximum bucket is oversized, start a new bucket
        self.buckets.append(max(self.max_bucket))
        self.max_bucket = set()
        return self.buckets[i], self.buckets[i + 1]

Implementation Details



  1. The scripts that read from mongo and create parquet files are written in
  2. Python and use the
  3. pyarrow library to write
  4. Parquet files. The first version implemented a filter-and-append strategy
  5. for updating Parquet files, which works faster than overwriting the entire
  6. file. This proved to be a mistake as pyarrow (8.9.0) support for reading is
  7. much less mature than that for writing. We would occasionally lose data when
  8. reading rows for rewriting into the new parquet file, and eventually gave up
  9. and went to an implementation that never read from the Parquet files.

  10. Each Parquet file is made of row groups, and there is a size limit for each
  11. row group. We batch documents by size to avoid this problem.

def islice_docs(docs, size):
    tot = 0
    for doc in docs:
        tot += len(doc.raw)
        yield doc
        if tot > size * 1024**2:
            return

def batch_docs(docs, size):
    sourceiter = iter(docs)
    while True:
        batchiter = islice_docs(sourceiter, size)
        try:
            yield itertools.chain([next(batchiter)], batchiter)
        except StopIteration:
            return

  1. When using pymongo, use RawBSONDocument to defer parsing. In the case of
  2. multiple processes writing different Parquet files, this means you avoid
  3. having parsing happen in the serial part of your program. You can
  4. selectively parse sub-documents using bson.BSON

  def realize_raw_bson(d):
    if isinstance(d, RawBSONDocument):
        return bson.BSON(d.raw).decode()
    if isinstance(d, list):
        return [realize_raw_bson(v) for v in d]
    return d

  1. Use atomicwrites to
  2. make writing a new Parquet file all-or-nothing

@contextmanager
  def atomic_parquet_writer(output, *args):
      os.makedirs(os.path.dirname(output), exist_ok=True)
      with atomic_write(output, mode="wb", overwrite=True) as f:
          with pq.ParquetWriter(f, *args) as writer:
              yield writer

  1. On top of extracting explicitly enumerated fields, put the whole json in a
  2. column by itself for querying fields not in the schema

  3. For reading the Parquet files locally using SQL,
  4. Apache Drill is very convenient -
  5. just untar and run drill-embedded

  6. After updating a parquet file, it has to be copied to S3 (using sync, not
  7. cp). Using EC2 is essential here because we are constantly copying files
  8. from the server to S3, and the bandwidth costs would be prohibitive
  9. otherwise - traffic between EC2 and S3 in the same region is free. After
  10. copying the file to S3, use AWS Glue to discover the schema from the files -
  11. call Glue only when the schema changes, AWS charges you each time you call
  12. it.

  13. The oplog contains all the information you need to update the Parquet files.
  14. We run an instance of mongo on the same machine, and maintain a one-node
  15. slave as it were by using the applyOps command on oplog entries read from
  16. the main cluster. Beware, however - Mongo developers might
  17. remove the docs
  18. for applyOps completely in the future.

Scale's flow


The future of your industry starts here.