put.js 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. 'use strict'
  2. const index = require('./lib/entry-index')
  3. const memo = require('./lib/memoization')
  4. const write = require('./lib/content/write')
  5. const to = require('mississippi').to
  6. module.exports = putData
  7. function putData (cache, key, data, opts) {
  8. opts = opts || {}
  9. return write(cache, data, opts).then(res => {
  10. // TODO - stop modifying opts
  11. opts.size = res.size
  12. return index.insert(cache, key, res.integrity, opts).then(entry => {
  13. if (opts.memoize) {
  14. memo.put(cache, entry, data, opts)
  15. }
  16. return res.integrity
  17. })
  18. })
  19. }
  20. module.exports.stream = putStream
  21. function putStream (cache, key, opts) {
  22. opts = opts || {}
  23. let integrity
  24. let size
  25. const contentStream = write.stream(
  26. cache, opts
  27. ).on('integrity', int => {
  28. integrity = int
  29. }).on('size', s => {
  30. size = s
  31. })
  32. let memoData
  33. let memoTotal = 0
  34. const stream = to((chunk, enc, cb) => {
  35. contentStream.write(chunk, enc, () => {
  36. if (opts.memoize) {
  37. if (!memoData) { memoData = [] }
  38. memoData.push(chunk)
  39. memoTotal += chunk.length
  40. }
  41. cb()
  42. })
  43. }, cb => {
  44. contentStream.end(() => {
  45. // TODO - stop modifying `opts`
  46. opts.size = size
  47. index.insert(cache, key, integrity, opts).then(entry => {
  48. if (opts.memoize) {
  49. memo.put(cache, entry, Buffer.concat(memoData, memoTotal), opts)
  50. }
  51. stream.emit('integrity', integrity)
  52. cb()
  53. })
  54. })
  55. })
  56. let erred = false
  57. stream.once('error', err => {
  58. if (erred) { return }
  59. erred = true
  60. contentStream.emit('error', err)
  61. })
  62. contentStream.once('error', err => {
  63. if (erred) { return }
  64. erred = true
  65. stream.emit('error', err)
  66. })
  67. return stream
  68. }