tempStore.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // ##Temporary Storage
  2. //
  3. // Temporary storage is used for chunked uploads until all chunks are received
  4. // and all copies have been made or given up. In some cases, the original file
  5. // is stored only in temporary storage (for example, if all copies do some
  6. // manipulation in beforeSave). This is why we use the temporary file as the
  7. // basis for each saved copy, and then remove it after all copies are saved.
  8. //
  9. // Every chunk is saved as an individual temporary file. This is safer than
  10. // attempting to write multiple incoming chunks to different positions in a
  11. // single temporary file, which can lead to write conflicts.
  12. //
  13. // Using temp files also allows us to easily resume uploads, even if the server
  14. // restarts, and to keep the working memory clear.
  15. // The FS.TempStore emits events that others are able to listen to
  16. var EventEmitter = Npm.require('events').EventEmitter;
  17. // We have a special stream concating all chunk files into one readable stream
  18. var CombinedStream = Npm.require('combined-stream');
  19. /** @namespace FS.TempStore
  20. * @property FS.TempStore
  21. * @type {object}
  22. * @public
  23. * @summary An event emitter
  24. */
  25. FS.TempStore = new EventEmitter();
  26. // Create a tracker collection for keeping track of all chunks for any files that are currently in the temp store
  27. var tracker = FS.TempStore.Tracker = new Mongo.Collection('cfs._tempstore.chunks');
  28. /**
  29. * @property FS.TempStore.Storage
  30. * @type {StorageAdapter}
  31. * @namespace FS.TempStore
  32. * @private
  33. * @summary This property is set to either `FS.Store.FileSystem` or `FS.Store.GridFS`
  34. *
  35. * __When and why:__
  36. * We normally default to `cfs-filesystem` unless its not installed. *(we default to gridfs if installed)*
  37. * But if `cfs-gridfs` and `cfs-worker` is installed we default to `cfs-gridfs`
  38. *
  39. * If `cfs-gridfs` and `cfs-filesystem` is not installed we log a warning.
  40. * the user can set `FS.TempStore.Storage` them selfs eg.:
  41. * ```js
  42. * // Its important to set `internal: true` this lets the SA know that we
  43. * // are using this internally and it will give us direct SA api
  44. * FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true });
  45. * ```
  46. *
  47. * > Note: This is considered as `advanced` use, its not a common pattern.
  48. */
  49. FS.TempStore.Storage = null;
  50. // We will not mount a storage adapter until needed. This allows us to check for the
  51. // existance of FS.FileWorker, which is loaded after this package because it
  52. // depends on this package.
  53. function mountStorage() {
  54. if (FS.TempStore.Storage) return;
  55. // XXX: We could replace this test, testing the FS scope for grifFS etc.
  56. // This is on the todo later when we get "stable"
  57. if (Package["wekan-cfs-gridfs"] && (Package["wekan-cfs-worker"] || !Package["wekan-cfs-filesystem"])) {
  58. // If the file worker is installed we would prefer to use the gridfs sa
  59. // for scalability. We also default to gridfs if filesystem is not found
  60. // Use the gridfs
  61. FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true });
  62. } else if (Package["wekan-cfs-filesystem"]) {
  63. // use the Filesystem
  64. FS.TempStore.Storage = new FS.Store.FileSystem('_tempstore', { internal: true });
  65. } else {
  66. throw new Error('FS.TempStore.Storage is not set: Install wekan-cfs-filesystem or wekan-cfs-gridfs or set it manually');
  67. }
  68. FS.debug && console.log('TempStore is mounted on', FS.TempStore.Storage.typeName);
  69. }
  70. function mountFile(fileObj, name) {
  71. if (!fileObj.isMounted()) {
  72. throw new Error(name + ' cannot work with unmounted file');
  73. }
  74. }
  75. // We update the fileObj on progress
  76. FS.TempStore.on('progress', function(fileObj, chunkNum, count, total, result) {
  77. FS.debug && console.log('TempStore progress: Received ' + count + ' of ' + total + ' chunks for ' + fileObj.name());
  78. });
  79. // XXX: TODO
  80. // FS.TempStore.on('stored', function(fileObj, chunkCount, result) {
  81. // // This should work if we pass on result from the SA on stored event...
  82. // fileObj.update({ $set: { chunkSum: 1, chunkCount: chunkCount, size: result.size } });
  83. // });
  84. // Stream implementation
  85. /**
  86. * @method _chunkPath
  87. * @private
  88. * @param {Number} [n] Chunk number
  89. * @returns {String} Chunk naming convention
  90. */
  91. _chunkPath = function(n) {
  92. return (n || 0) + '.chunk';
  93. };
  94. /**
  95. * @method _fileReference
  96. * @param {FS.File} fileObj
  97. * @param {Number} chunk
  98. * @private
  99. * @returns {String} Generated SA specific fileKey for the chunk
  100. *
  101. * Note: Calling function should call mountStorage() first, and
  102. * make sure that fileObj is mounted.
  103. */
  104. _fileReference = function(fileObj, chunk, existing) {
  105. // Maybe it's a chunk we've already saved
  106. existing = existing || tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName});
  107. // Make a temporary fileObj just for fileKey generation
  108. var tempFileObj = new FS.File({
  109. collectionName: fileObj.collectionName,
  110. _id: fileObj._id,
  111. original: {
  112. name: _chunkPath(chunk)
  113. },
  114. copies: {
  115. _tempstore: {
  116. key: existing && existing.keys[chunk]
  117. }
  118. }
  119. });
  120. // Return a fitting fileKey SA specific
  121. return FS.TempStore.Storage.adapter.fileKey(tempFileObj);
  122. };
  123. /**
  124. * @method FS.TempStore.exists
  125. * @param {FS.File} File object
  126. * @returns {Boolean} Is this file, or parts of it, currently stored in the TempStore
  127. */
  128. FS.TempStore.exists = function(fileObj) {
  129. var existing = tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName});
  130. return !!existing;
  131. };
  132. /**
  133. * @method FS.TempStore.listParts
  134. * @param {FS.File} fileObj
  135. * @returns {Object} of parts already stored
  136. * @todo This is not yet implemented, milestone 1.1.0
  137. */
  138. FS.TempStore.listParts = function fsTempStoreListParts(fileObj) {
  139. var self = this;
  140. console.warn('This function is not correctly implemented using SA in TempStore');
  141. //XXX This function might be necessary for resume. Not currently supported.
  142. };
  143. /**
  144. * @method FS.TempStore.removeFile
  145. * @public
  146. * @param {FS.File} fileObj
  147. * This function removes the file from tempstorage - it cares not if file is
  148. * already removed or not found, goal is reached anyway.
  149. */
  150. FS.TempStore.removeFile = function fsTempStoreRemoveFile(fileObj) {
  151. var self = this;
  152. // Ensure that we have a storage adapter mounted; if not, throw an error.
  153. mountStorage();
  154. // If fileObj is not mounted or can't be, throw an error
  155. mountFile(fileObj, 'FS.TempStore.removeFile');
  156. // Emit event
  157. self.emit('remove', fileObj);
  158. var chunkInfo = tracker.findOne({
  159. fileId: fileObj._id,
  160. collectionName: fileObj.collectionName
  161. });
  162. if (chunkInfo) {
  163. // Unlink each file
  164. FS.Utility.each(chunkInfo.keys || {}, function (key, chunk) {
  165. var fileKey = _fileReference(fileObj, chunk, chunkInfo);
  166. FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop);
  167. });
  168. // Remove fileObj from tracker collection, too
  169. tracker.remove({_id: chunkInfo._id});
  170. }
  171. };
  172. /**
  173. * @method FS.TempStore.removeAll
  174. * @public
  175. * @summary This function removes all files from tempstorage - it cares not if file is
  176. * already removed or not found, goal is reached anyway.
  177. */
  178. FS.TempStore.removeAll = function fsTempStoreRemoveAll() {
  179. var self = this;
  180. // Ensure that we have a storage adapter mounted; if not, throw an error.
  181. mountStorage();
  182. tracker.find().forEach(function (chunkInfo) {
  183. // Unlink each file
  184. FS.Utility.each(chunkInfo.keys || {}, function (key, chunk) {
  185. var fileKey = _fileReference({_id: chunkInfo.fileId, collectionName: chunkInfo.collectionName}, chunk, chunkInfo);
  186. FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop);
  187. });
  188. // Remove from tracker collection, too
  189. tracker.remove({_id: chunkInfo._id});
  190. });
  191. };
  192. /**
  193. * @method FS.TempStore.createWriteStream
  194. * @public
  195. * @param {FS.File} fileObj File to store in temporary storage
  196. * @param {Number | String} [options]
  197. * @returns {Stream} Writeable stream
  198. *
  199. * `options` of different types mean differnt things:
  200. * * `undefined` We store the file in one part
  201. * *(Normal server-side api usage)*
  202. * * `Number` the number is the part number total
  203. * *(multipart uploads will use this api)*
  204. * * `String` the string is the name of the `store` that wants to store file data
  205. * *(stores that want to sync their data to the rest of the files stores will use this)*
  206. *
  207. * > Note: fileObj must be mounted on a `FS.Collection`, it makes no sense to store otherwise
  208. */
  209. FS.TempStore.createWriteStream = function(fileObj, options) {
  210. var self = this;
  211. // Ensure that we have a storage adapter mounted; if not, throw an error.
  212. mountStorage();
  213. // If fileObj is not mounted or can't be, throw an error
  214. mountFile(fileObj, 'FS.TempStore.createWriteStream');
  215. // Cache the selector for use multiple times below
  216. var selector = {fileId: fileObj._id, collectionName: fileObj.collectionName};
  217. // TODO, should pass in chunkSum so we don't need to use FS.File for it
  218. var chunkSum = fileObj.chunkSum || 1;
  219. // Add fileObj to tracker collection
  220. tracker.upsert(selector, {$setOnInsert: {keys: {}}});
  221. // Determine how we're using the writeStream
  222. var isOnePart = false, isMultiPart = false, isStoreSync = false, chunkNum = 0;
  223. if (options === +options) {
  224. isMultiPart = true;
  225. chunkNum = options;
  226. } else if (options === ''+options) {
  227. isStoreSync = true;
  228. } else {
  229. isOnePart = true;
  230. }
  231. // XXX: it should be possible for a store to sync by storing data into the
  232. // tempstore - this could be done nicely by setting the store name as string
  233. // in the chunk variable?
  234. // This store name could be passed on the the fileworker via the uploaded
  235. // event
  236. // So the uploaded event can return:
  237. // undefined - if data is stored into and should sync out to all storage adapters
  238. // number - if a chunk has been uploaded
  239. // string - if a storage adapter wants to sync its data to the other SA's
  240. // Find a nice location for the chunk data
  241. var fileKey = _fileReference(fileObj, chunkNum);
  242. // Create the stream as Meteor safe stream
  243. var writeStream = FS.TempStore.Storage.adapter.createWriteStream(fileKey);
  244. // When the stream closes we update the chunkCount
  245. writeStream.safeOn('stored', function(result) {
  246. // Save key in tracker document
  247. var setObj = {};
  248. setObj['keys.' + chunkNum] = result.fileKey;
  249. tracker.update(selector, {$set: setObj});
  250. var temp = tracker.findOne(selector);
  251. if (!temp) {
  252. FS.debug && console.log('NOT FOUND FROM TEMPSTORE => EXIT (REMOVED)');
  253. return;
  254. }
  255. // Get updated chunkCount
  256. var chunkCount = FS.Utility.size(temp.keys);
  257. // Progress
  258. self.emit('progress', fileObj, chunkNum, chunkCount, chunkSum, result);
  259. var modifier = { $set: {} };
  260. if (!fileObj.instance_id) {
  261. modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID;
  262. }
  263. // If upload is completed
  264. if (chunkCount === chunkSum) {
  265. // We no longer need the chunk info
  266. modifier.$unset = {chunkCount: 1, chunkSum: 1, chunkSize: 1};
  267. // Check if the file has been uploaded before
  268. if (typeof fileObj.uploadedAt === 'undefined') {
  269. // We set the uploadedAt date
  270. modifier.$set.uploadedAt = new Date();
  271. } else {
  272. // We have been uploaded so an event were file data is updated is
  273. // called synchronizing - so this must be a synchronizedAt?
  274. modifier.$set.synchronizedAt = new Date();
  275. }
  276. // Update the fileObject
  277. fileObj.update(modifier);
  278. // Fire ending events
  279. var eventName = isStoreSync ? 'synchronized' : 'stored';
  280. self.emit(eventName, fileObj, result);
  281. // XXX is emitting "ready" necessary?
  282. self.emit('ready', fileObj, chunkCount, result);
  283. } else {
  284. // Update the chunkCount on the fileObject
  285. modifier.$set.chunkCount = chunkCount;
  286. fileObj.update(modifier);
  287. }
  288. });
  289. // Emit errors
  290. writeStream.on('error', function (error) {
  291. FS.debug && console.log('TempStore writeStream error:', error);
  292. self.emit('error', error, fileObj);
  293. });
  294. return writeStream;
  295. };
  296. /**
  297. * @method FS.TempStore.createReadStream
  298. * @public
  299. * @param {FS.File} fileObj The file to read
  300. * @return {Stream} Returns readable stream
  301. *
  302. */
  303. FS.TempStore.createReadStream = function(fileObj) {
  304. // Ensure that we have a storage adapter mounted; if not, throw an error.
  305. mountStorage();
  306. // If fileObj is not mounted or can't be, throw an error
  307. mountFile(fileObj, 'FS.TempStore.createReadStream');
  308. FS.debug && console.log('FS.TempStore creating read stream for ' + fileObj._id);
  309. // Determine how many total chunks there are from the tracker collection
  310. var chunkInfo = tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName}) || {};
  311. var totalChunks = FS.Utility.size(chunkInfo.keys);
  312. function getNextStreamFunc(chunk) {
  313. return Meteor.bindEnvironment(function(next) {
  314. var fileKey = _fileReference(fileObj, chunk);
  315. var chunkReadStream = FS.TempStore.Storage.adapter.createReadStream(fileKey);
  316. next(chunkReadStream);
  317. }, function (error) {
  318. throw error;
  319. });
  320. }
  321. // Make a combined stream
  322. var combinedStream = CombinedStream.create();
  323. // Add each chunk stream to the combined stream when the previous chunk stream ends
  324. var currentChunk = 0;
  325. for (var chunk = 0; chunk < totalChunks; chunk++) {
  326. combinedStream.append(getNextStreamFunc(chunk));
  327. }
  328. // Return the combined stream
  329. return combinedStream;
  330. };