瀏覽代碼

feat: AWS CloudSearch engine

Nick 6 年之前
父節點
當前提交
6eb6eecfa2
共有 1 個文件被更改,包括 178 次插入47 次删除
  1. 178 47
      server/modules/search/aws/engine.js

+ 178 - 47
server/modules/search/aws/engine.js

@@ -1,6 +1,6 @@
 const _ = require('lodash')
 const AWS = require('aws-sdk')
-const { pipeline } = require('stream')
+const { pipeline, Transform } = require('stream')
 
 module.exports = {
   async activate() {
@@ -20,6 +20,13 @@ module.exports = {
       secretAccessKey: this.config.secretAccessKey,
       region: this.config.region
     })
+    this.clientDomain = new AWS.CloudSearchDomain({
+      apiVersion: '2013-01-01',
+      endpoint: this.config.endpoint,
+      accessKeyId: this.config.accessKeyId,
+      secretAccessKey: this.config.secretAccessKey,
+      region: this.config.region
+    })
 
     let rebuildIndex = false
 
@@ -141,10 +148,30 @@ module.exports = {
    */
   async query(q, opts) {
     try {
+      let suggestions = []
+      const results = await this.clientDomain.search({
+        query: q,
+        partial: true,
+        size: 50
+      }).promise()
+      if (results.hits.found < 5) {
+        const suggestResults = await this.clientDomain.suggest({
+          query: q,
+          suggester: 'default_suggester',
+          size: 5
+        }).promise()
+        suggestions = suggestResults.suggest.suggestions.map(s => s.suggestion)
+      }
       return {
-        results: [],
-        suggestions: [],
-        totalHits: 0
+        results: _.map(results.hits.hit, r => ({
+          id: r.id,
+          path: _.head(r.fields.path),
+          locale: _.head(r.fields.locale),
+          title: _.head(r.fields.title),
+          description: _.head(r.fields.description)
+        })),
+        suggestions: suggestions,
+        totalHits: results.hits.found
       }
     } catch (err) {
       WIKI.logger.warn('Search Engine Error:')
@@ -157,16 +184,22 @@ module.exports = {
    * @param {Object} page Page to create
    */
   async created(page) {
-    await this.client.indexes.use(this.config.indexName).index([
-      {
-        id: page.hash,
-        locale: page.localeCode,
-        path: page.path,
-        title: page.title,
-        description: page.description,
-        content: page.content
-      }
-    ])
+    await this.clientDomain.uploadDocuments({
+      contentType: 'application/json',
+      documents: JSON.stringify([
+        {
+          type: 'add',
+          id: page.hash,
+          fields: {
+            locale: page.localeCode,
+            path: page.path,
+            title: page.title,
+            description: page.description,
+            content: page.content
+          }
+        }
+      ])
+    }).promise()
   },
   /**
    * UPDATE
@@ -174,16 +207,22 @@ module.exports = {
    * @param {Object} page Page to update
    */
   async updated(page) {
-    await this.client.indexes.use(this.config.indexName).index([
-      {
-        id: page.hash,
-        locale: page.localeCode,
-        path: page.path,
-        title: page.title,
-        description: page.description,
-        content: page.content
-      }
-    ])
+    await this.clientDomain.uploadDocuments({
+      contentType: 'application/json',
+      documents: JSON.stringify([
+        {
+          type: 'add',
+          id: page.hash,
+          fields: {
+            locale: page.localeCode,
+            path: page.path,
+            title: page.title,
+            description: page.description,
+            content: page.content
+          }
+        }
+      ])
+    }).promise()
   },
   /**
    * DELETE
@@ -191,12 +230,15 @@ module.exports = {
    * @param {Object} page Page to delete
    */
   async deleted(page) {
-    await this.client.indexes.use(this.config.indexName).index([
-      {
-        '@search.action': 'delete',
-        id: page.hash
-      }
-    ])
+    await this.clientDomain.uploadDocuments({
+      contentType: 'application/json',
+      documents: JSON.stringify([
+        {
+          type: 'delete',
+          id: page.hash
+        }
+      ])
+    }).promise()
   },
   /**
    * RENAME
@@ -204,33 +246,122 @@ module.exports = {
    * @param {Object} page Page to rename
    */
   async renamed(page) {
-    await this.client.indexes.use(this.config.indexName).index([
-      {
-        '@search.action': 'delete',
-        id: page.sourceHash
-      }
-    ])
-    await this.client.indexes.use(this.config.indexName).index([
-      {
-        id: page.destinationHash,
-        locale: page.localeCode,
-        path: page.destinationPath,
-        title: page.title,
-        description: page.description,
-        content: page.content
-      }
-    ])
+    await this.clientDomain.uploadDocuments({
+      contentType: 'application/json',
+      documents: JSON.stringify([
+        {
+          type: 'delete',
+          id: page.sourceHash
+        }
+      ])
+    }).promise()
+    await this.clientDomain.uploadDocuments({
+      contentType: 'application/json',
+      documents: JSON.stringify([
+        {
+          type: 'add',
+          id: page.destinationHash,
+          fields: {
+            locale: page.localeCode,
+            path: page.destinationPath,
+            title: page.title,
+            description: page.description,
+            content: page.content
+          }
+        }
+      ])
+    }).promise()
   },
   /**
    * REBUILD INDEX
    */
   async rebuild() {
+    WIKI.logger.info(`(SEARCH/AWS) Rebuilding Index...`)
+
+    const MAX_DOCUMENT_BYTES = Math.pow(2, 20)
+    const MAX_INDEXING_BYTES = 5 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength
+    const MAX_INDEXING_COUNT = 1000
+    const COMMA_BYTES = Buffer.from(',').byteLength
+
+    let chunks = []
+    let bytes = 0
+
+    const processDocument = async (cb, doc) => {
+      try {
+        if (doc) {
+          const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
+          // -> Document too large
+          if (docBytes >= MAX_DOCUMENT_BYTES) {
+            throw new Error('Document exceeds maximum size allowed by AWS CloudSearch.')
+          }
+
+          // -> Current batch exceeds size hard limit, flush
+          if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
+            await flushBuffer()
+          }
+
+          if (chunks.length > 0) {
+            bytes += COMMA_BYTES
+          }
+          bytes += docBytes
+          chunks.push(doc)
+
+          // -> Current batch exceeds count soft limit, flush
+          if (chunks.length >= MAX_INDEXING_COUNT) {
+            await flushBuffer()
+          }
+        } else {
+          // -> End of stream, flush
+          await flushBuffer()
+        }
+        cb()
+      } catch (err) {
+        cb(err)
+      }
+    }
+
+    const flushBuffer = async () => {
+      WIKI.logger.info(`(SEARCH/AWS) Sending batch of ${chunks.length}...`)
+      try {
+        const resp = await this.clientDomain.uploadDocuments({
+          contentType: 'application/json',
+          documents: JSON.stringify(_.map(chunks, doc => ({
+            type: 'add',
+            id: doc.id,
+            fields: {
+              locale: doc.locale,
+              path: doc.path,
+              title: doc.title,
+              description: doc.description,
+              content: doc.content
+            }
+          })))
+        }).promise()
+      } catch (err) {
+        WIKI.logger.warn('(SEARCH/AWS) Failed to send batch to AWS CloudSearch: ', err)
+      }
+      chunks.length = 0
+      bytes = 0
+    }
+
     await pipeline(
       WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'content').select().from('pages').where({
         isPublished: true,
         isPrivate: false
       }).stream(),
-      this.client.indexes.use(this.config.indexName).createIndexingStream()
+      new Transform({
+        objectMode: true,
+        transform: async (chunk, enc, cb) => await processDocument(cb, chunk),
+        flush: async (cb) => await processDocument(cb)
+      })
     )
+
+    WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`)
+    await this.client.indexDocuments({
+      DomainName: this.config.domain
+    }).promise()
+
+    WIKI.logger.info(`(SEARCH/AWS) Index rebuilt successfully.`)
   }
 }
+