engine.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. const _ = require('lodash')
  2. const AWS = require('aws-sdk')
  3. const { pipeline, Transform } = require('stream')
  4. module.exports = {
  5. async activate() {
  6. // not used
  7. },
  8. async deactivate() {
  9. // not used
  10. },
  11. /**
  12. * INIT
  13. */
  14. async init() {
  15. WIKI.logger.info(`(SEARCH/AWS) Initializing...`)
  16. this.client = new AWS.CloudSearch({
  17. apiVersion: '2013-01-01',
  18. accessKeyId: this.config.accessKeyId,
  19. secretAccessKey: this.config.secretAccessKey,
  20. region: this.config.region
  21. })
  22. this.clientDomain = new AWS.CloudSearchDomain({
  23. apiVersion: '2013-01-01',
  24. endpoint: this.config.endpoint,
  25. accessKeyId: this.config.accessKeyId,
  26. secretAccessKey: this.config.secretAccessKey,
  27. region: this.config.region
  28. })
  29. let rebuildIndex = false
  30. // -> Define Analysis Schemes
  31. const schemes = await this.client.describeAnalysisSchemes({
  32. DomainName: this.config.domain,
  33. AnalysisSchemeNames: ['default_anlscheme']
  34. }).promise()
  35. if (_.get(schemes, 'AnalysisSchemes', []).length < 1) {
  36. WIKI.logger.info(`(SEARCH/AWS) Defining Analysis Scheme...`)
  37. await this.client.defineAnalysisScheme({
  38. DomainName: this.config.domain,
  39. AnalysisScheme: {
  40. AnalysisSchemeLanguage: this.config.AnalysisSchemeLang,
  41. AnalysisSchemeName: 'default_anlscheme'
  42. }
  43. }).promise()
  44. rebuildIndex = true
  45. }
  46. // -> Define Index Fields
  47. const fields = await this.client.describeIndexFields({
  48. DomainName: this.config.domain
  49. }).promise()
  50. if (_.get(fields, 'IndexFields', []).length < 1) {
  51. WIKI.logger.info(`(SEARCH/AWS) Defining Index Fields...`)
  52. await this.client.defineIndexField({
  53. DomainName: this.config.domain,
  54. IndexField: {
  55. IndexFieldName: 'id',
  56. IndexFieldType: 'literal'
  57. }
  58. }).promise()
  59. await this.client.defineIndexField({
  60. DomainName: this.config.domain,
  61. IndexField: {
  62. IndexFieldName: 'path',
  63. IndexFieldType: 'literal'
  64. }
  65. }).promise()
  66. await this.client.defineIndexField({
  67. DomainName: this.config.domain,
  68. IndexField: {
  69. IndexFieldName: 'locale',
  70. IndexFieldType: 'literal'
  71. }
  72. }).promise()
  73. await this.client.defineIndexField({
  74. DomainName: this.config.domain,
  75. IndexField: {
  76. IndexFieldName: 'title',
  77. IndexFieldType: 'text',
  78. TextOptions: {
  79. ReturnEnabled: true,
  80. AnalysisScheme: 'default_anlscheme'
  81. }
  82. }
  83. }).promise()
  84. await this.client.defineIndexField({
  85. DomainName: this.config.domain,
  86. IndexField: {
  87. IndexFieldName: 'description',
  88. IndexFieldType: 'text',
  89. TextOptions: {
  90. ReturnEnabled: true,
  91. AnalysisScheme: 'default_anlscheme'
  92. }
  93. }
  94. }).promise()
  95. await this.client.defineIndexField({
  96. DomainName: this.config.domain,
  97. IndexField: {
  98. IndexFieldName: 'content',
  99. IndexFieldType: 'text',
  100. TextOptions: {
  101. ReturnEnabled: false,
  102. AnalysisScheme: 'default_anlscheme'
  103. }
  104. }
  105. }).promise()
  106. rebuildIndex = true
  107. }
  108. //-> Define suggester
  109. const suggesters = await this.client.describeSuggesters({
  110. DomainName: this.config.domain,
  111. SuggesterNames: ['default_suggester']
  112. }).promise()
  113. if(_.get(suggesters, 'Suggesters', []).length < 1) {
  114. WIKI.logger.info(`(SEARCH/AWS) Defining Suggester...`)
  115. await this.client.defineSuggester({
  116. DomainName: this.config.domain,
  117. Suggester: {
  118. SuggesterName: 'default_suggester',
  119. DocumentSuggesterOptions: {
  120. SourceField: 'title',
  121. FuzzyMatching: 'high'
  122. }
  123. }
  124. }).promise()
  125. rebuildIndex = true
  126. }
  127. // -> Rebuild Index
  128. if (rebuildIndex) {
  129. WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`)
  130. await this.client.indexDocuments({
  131. DomainName: this.config.domain
  132. }).promise()
  133. }
  134. WIKI.logger.info(`(SEARCH/AWS) Initialization completed.`)
  135. },
  136. /**
  137. * QUERY
  138. *
  139. * @param {String} q Query
  140. * @param {Object} opts Additional options
  141. */
  142. async query(q, opts) {
  143. try {
  144. let suggestions = []
  145. const results = await this.clientDomain.search({
  146. query: q,
  147. partial: true,
  148. size: 50
  149. }).promise()
  150. if (results.hits.found < 5) {
  151. const suggestResults = await this.clientDomain.suggest({
  152. query: q,
  153. suggester: 'default_suggester',
  154. size: 5
  155. }).promise()
  156. suggestions = suggestResults.suggest.suggestions.map(s => s.suggestion)
  157. }
  158. return {
  159. results: _.map(results.hits.hit, r => ({
  160. id: r.id,
  161. path: _.head(r.fields.path),
  162. locale: _.head(r.fields.locale),
  163. title: _.head(r.fields.title) || '',
  164. description: _.head(r.fields.description) || ''
  165. })),
  166. suggestions: suggestions,
  167. totalHits: results.hits.found
  168. }
  169. } catch (err) {
  170. WIKI.logger.warn('Search Engine Error:')
  171. WIKI.logger.warn(err)
  172. }
  173. },
  174. /**
  175. * CREATE
  176. *
  177. * @param {Object} page Page to create
  178. */
  179. async created(page) {
  180. await this.clientDomain.uploadDocuments({
  181. contentType: 'application/json',
  182. documents: JSON.stringify([
  183. {
  184. type: 'add',
  185. id: page.hash,
  186. fields: {
  187. locale: page.localeCode,
  188. path: page.path,
  189. title: page.title,
  190. description: page.description,
  191. content: page.content
  192. }
  193. }
  194. ])
  195. }).promise()
  196. },
  197. /**
  198. * UPDATE
  199. *
  200. * @param {Object} page Page to update
  201. */
  202. async updated(page) {
  203. await this.clientDomain.uploadDocuments({
  204. contentType: 'application/json',
  205. documents: JSON.stringify([
  206. {
  207. type: 'add',
  208. id: page.hash,
  209. fields: {
  210. locale: page.localeCode,
  211. path: page.path,
  212. title: page.title,
  213. description: page.description,
  214. content: page.content
  215. }
  216. }
  217. ])
  218. }).promise()
  219. },
  220. /**
  221. * DELETE
  222. *
  223. * @param {Object} page Page to delete
  224. */
  225. async deleted(page) {
  226. await this.clientDomain.uploadDocuments({
  227. contentType: 'application/json',
  228. documents: JSON.stringify([
  229. {
  230. type: 'delete',
  231. id: page.hash
  232. }
  233. ])
  234. }).promise()
  235. },
  236. /**
  237. * RENAME
  238. *
  239. * @param {Object} page Page to rename
  240. */
  241. async renamed(page) {
  242. await this.clientDomain.uploadDocuments({
  243. contentType: 'application/json',
  244. documents: JSON.stringify([
  245. {
  246. type: 'delete',
  247. id: page.sourceHash
  248. }
  249. ])
  250. }).promise()
  251. await this.clientDomain.uploadDocuments({
  252. contentType: 'application/json',
  253. documents: JSON.stringify([
  254. {
  255. type: 'add',
  256. id: page.destinationHash,
  257. fields: {
  258. locale: page.localeCode,
  259. path: page.destinationPath,
  260. title: page.title,
  261. description: page.description,
  262. content: page.content
  263. }
  264. }
  265. ])
  266. }).promise()
  267. },
  268. /**
  269. * REBUILD INDEX
  270. */
  271. async rebuild() {
  272. WIKI.logger.info(`(SEARCH/AWS) Rebuilding Index...`)
  273. const MAX_DOCUMENT_BYTES = Math.pow(2, 20)
  274. const MAX_INDEXING_BYTES = 5 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength
  275. const MAX_INDEXING_COUNT = 1000
  276. const COMMA_BYTES = Buffer.from(',').byteLength
  277. let chunks = []
  278. let bytes = 0
  279. const processDocument = async (cb, doc) => {
  280. try {
  281. if (doc) {
  282. const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
  283. // -> Document too large
  284. if (docBytes >= MAX_DOCUMENT_BYTES) {
  285. throw new Error('Document exceeds maximum size allowed by AWS CloudSearch.')
  286. }
  287. // -> Current batch exceeds size hard limit, flush
  288. if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
  289. await flushBuffer()
  290. }
  291. if (chunks.length > 0) {
  292. bytes += COMMA_BYTES
  293. }
  294. bytes += docBytes
  295. chunks.push(doc)
  296. // -> Current batch exceeds count soft limit, flush
  297. if (chunks.length >= MAX_INDEXING_COUNT) {
  298. await flushBuffer()
  299. }
  300. } else {
  301. // -> End of stream, flush
  302. await flushBuffer()
  303. }
  304. cb()
  305. } catch (err) {
  306. cb(err)
  307. }
  308. }
  309. const flushBuffer = async () => {
  310. WIKI.logger.info(`(SEARCH/AWS) Sending batch of ${chunks.length}...`)
  311. try {
  312. const resp = await this.clientDomain.uploadDocuments({
  313. contentType: 'application/json',
  314. documents: JSON.stringify(_.map(chunks, doc => ({
  315. type: 'add',
  316. id: doc.id,
  317. fields: {
  318. locale: doc.locale,
  319. path: doc.path,
  320. title: doc.title,
  321. description: doc.description,
  322. content: doc.content
  323. }
  324. })))
  325. }).promise()
  326. } catch (err) {
  327. WIKI.logger.warn('(SEARCH/AWS) Failed to send batch to AWS CloudSearch: ', err)
  328. }
  329. chunks.length = 0
  330. bytes = 0
  331. }
  332. await pipeline(
  333. WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'content').select().from('pages').where({
  334. isPublished: true,
  335. isPrivate: false
  336. }).stream(),
  337. new Transform({
  338. objectMode: true,
  339. transform: async (chunk, enc, cb) => await processDocument(cb, chunk),
  340. flush: async (cb) => await processDocument(cb)
  341. })
  342. )
  343. WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`)
  344. await this.client.indexDocuments({
  345. DomainName: this.config.domain
  346. }).promise()
  347. WIKI.logger.info(`(SEARCH/AWS) Index rebuilt successfully.`)
  348. }
  349. }