Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

GridFS

GridFS is a way to store and retrieve files using MongoDB.

ReactiveMongo provides an API for MongoDB GridFS, whose references can be resolved as bellow.

import reactivemongo.api.DB
import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS

type BSONGridFS = GridFS[BSONSerializationPack.type]
def resolveGridFS(db: DB): BSONGridFS = db.gridfs

Save files to GridFS

Once a reference to GridFS is obtained, it can be used to push a file in a streaming way (for now using Play Iteratees).

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS
import reactivemongo.api.bson.BSONValue

def saveToGridFS(
  gfs: GridFS[BSONSerializationPack.type],
  filename: String, 
  contentType: Option[String], 
  data: java.io.InputStream
)(implicit ec: ExecutionContext): Future[gfs.ReadFile[BSONValue]] = {
  // Prepare the GridFS object to the file to be pushed
  val gridfsObj = gfs.fileToSave(Some(filename), contentType)

  gfs.writeFromInputStream(gridfsObj, data)
}

A function update is provided to update the file metadata.

import scala.concurrent.ExecutionContext

import reactivemongo.api.bson.{ BSONDocument, BSONObjectID }

import reactivemongo.api.DB
import reactivemongo.api.gridfs.GridFS

def updateFile(db: DB, fileId: BSONObjectID)(implicit ec: ExecutionContext) =
  db.gridfs.update(fileId, BSONDocument(f"$$set" ->
    BSONDocument("meta" -> "data")))

The GridFS writeFromInputStream operation will return a reference to the stored object, represented with the ReadFile type.

The reference for a file save in this way will have Some MD5 property.

The Akka Stream module is providing the GridFSStreams.sinkWithMD5, which allows to stream data to a GridFS file.

import scala.concurrent.Future

import akka.NotUsed
import akka.util.ByteString

import akka.stream.Materializer
import akka.stream.scaladsl.Source

import reactivemongo.api.bson.BSONValue
import reactivemongo.api.bson.collection.BSONSerializationPack

import reactivemongo.api.gridfs.GridFS

import reactivemongo.akkastream.GridFSStreams

def saveWithComputedMD5(
  gfs: GridFS[BSONSerializationPack.type],
  filename: String, 
  contentType: Option[String], 
  data: Source[ByteString, NotUsed]
)(implicit m: Materializer): Future[gfs.ReadFile[BSONValue]] = {
  implicit def ec = m.executionContext

  // Prepare the GridFS object to the file to be pushed
  val gridfsObj = gfs.fileToSave(Some(filename), contentType)

  val streams = GridFSStreams[BSONSerializationPack.type](gfs)
  val upload = streams.sinkWithMD5(gridfsObj)

  data.runWith(upload)
}

Find a file from GridFS

A file previously stored in a GridFS can be retrieved as any MongoDB, using a find operation.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.{ GridFS, ReadFile }
import reactivemongo.api.bson.{ BSONDocument, BSONValue }

def gridfsByFilename(
  gridfs: GridFS[BSONSerializationPack.type],
  filename: String
)(implicit ec: ExecutionContext): Future[ReadFile[BSONValue, BSONDocument]] = {
  def cursor = gridfs.find(BSONDocument("filename" -> filename))
  cursor.head
}

The Akka Stream module is providing the GridFSStreams.source to stream data from GridFS file.

import scala.concurrent.Future

import akka.util.ByteString

import akka.stream.Materializer
import akka.stream.scaladsl.Source

import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS
import reactivemongo.api.bson.BSONDocument

import reactivemongo.akkastream.{ GridFSStreams, State }

def downloadGridFSFile(
  gridfs: GridFS[BSONSerializationPack.type],
  filename: String
)(implicit m: Materializer): Source[ByteString, Future[State]] = {
  implicit def ec = m.executionContext

  val src = gridfs.find(BSONDocument("filename" -> filename)).head.map { file =>
    val streams = GridFSStreams(gridfs)

    streams.source(file)
  }

  Source.fromFutureSource(src).mapMaterializedValue(_.flatten)
}

Delete a file

A file can be removed from a GridFS using the appropriate operation.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS
import reactivemongo.api.bson.BSONValue

def removeFrom(
  gridfs: GridFS[BSONSerializationPack.type],
  id: BSONValue // see ReadFile.id
)(implicit ec: ExecutionContext): Future[Unit] =
  gridfs.remove(id).map(_ => {})

See also:

Suggest changes