engine.js 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. const _ = require('lodash')
  2. const stream = require('stream')
  3. const Promise = require('bluebird')
  4. const pipeline = Promise.promisify(stream.pipeline)
  5. /* global WIKI */
  6. module.exports = {
  7. async activate() {
  8. // not used
  9. },
  10. async deactivate() {
  11. // not used
  12. },
  13. /**
  14. * INIT
  15. */
  16. async init() {
  17. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
  18. switch (this.config.apiVersion) {
  19. case '7.x':
  20. const { Client: Client7 } = require('elasticsearch7')
  21. this.client = new Client7({
  22. nodes: this.config.hosts.split(',').map(_.trim),
  23. sniffOnStart: this.config.sniffOnStart,
  24. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  25. name: 'wiki-js'
  26. })
  27. break
  28. case '6.x':
  29. const { Client: Client6 } = require('elasticsearch6')
  30. this.client = new Client6({
  31. nodes: this.config.hosts.split(',').map(_.trim),
  32. sniffOnStart: this.config.sniffOnStart,
  33. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  34. name: 'wiki-js'
  35. })
  36. break
  37. default:
  38. throw new Error('Unsupported version of elasticsearch! Update your settings in the Administration Area.')
  39. }
  40. // -> Create Search Index
  41. await this.createIndex()
  42. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
  43. },
  44. /**
  45. * Create Index
  46. */
  47. async createIndex() {
  48. try {
  49. const indexExists = await this.client.indices.exists({ index: this.config.indexName })
  50. if (!indexExists.body) {
  51. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
  52. try {
  53. const idxBody = {
  54. properties: {
  55. suggest: { type: 'completion' },
  56. title: { type: 'text', boost: 10.0 },
  57. description: { type: 'text', boost: 3.0 },
  58. content: { type: 'text', boost: 1.0 },
  59. locale: { type: 'keyword' },
  60. path: { type: 'text' },
  61. tags: { type: 'text', boost: 8.0 }
  62. }
  63. }
  64. await this.client.indices.create({
  65. index: this.config.indexName,
  66. body: {
  67. mappings: (this.config.apiVersion === '6.x') ? {
  68. _doc: idxBody
  69. } : idxBody
  70. }
  71. })
  72. } catch (err) {
  73. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Create Index Error: `, _.get(err, 'meta.body.error', err))
  74. }
  75. }
  76. } catch (err) {
  77. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Index Check Error: `, _.get(err, 'meta.body.error', err))
  78. }
  79. },
  80. /**
  81. * QUERY
  82. *
  83. * @param {String} q Query
  84. * @param {Object} opts Additional options
  85. */
  86. async query(q, opts) {
  87. try {
  88. const results = await this.client.search({
  89. index: this.config.indexName,
  90. body: {
  91. query: {
  92. simple_query_string: {
  93. query: `*${q}*`,
  94. fields: ['title^20', 'description^3', 'tags^8', 'content^1'],
  95. default_operator: 'and',
  96. analyze_wildcard: true
  97. }
  98. },
  99. from: 0,
  100. size: 50,
  101. _source: ['title', 'description', 'path', 'locale'],
  102. suggest: {
  103. suggestions: {
  104. text: q,
  105. completion: {
  106. field: 'suggest',
  107. size: 5,
  108. skip_duplicates: true,
  109. fuzzy: true
  110. }
  111. }
  112. }
  113. }
  114. })
  115. return {
  116. results: _.get(results, 'body.hits.hits', []).map(r => ({
  117. id: r._id,
  118. locale: r._source.locale,
  119. path: r._source.path,
  120. title: r._source.title,
  121. description: r._source.description
  122. })),
  123. suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
  124. totalHits: _.get(results, 'body.hits.total.value', _.get(results, 'body.hits.total', 0))
  125. }
  126. } catch (err) {
  127. WIKI.logger.warn('Search Engine Error: ', _.get(err, 'meta.body.error', err))
  128. }
  129. },
  130. /**
  131. * Build tags field
  132. * @param id
  133. * @returns {Promise<*|*[]>}
  134. */
  135. async buildTags(id) {
  136. const tags = await WIKI.models.pages.query().findById(id).select('*').withGraphJoined('tags')
  137. return (tags.tags && tags.tags.length > 0) ? tags.tags.map(function (tag) {
  138. return tag.title
  139. }) : []
  140. },
  141. /**
  142. * Build suggest field
  143. */
  144. buildSuggest(page) {
  145. return _.reject(_.uniq(_.concat(
  146. page.title.split(' ').map(s => ({
  147. input: s,
  148. weight: 10
  149. })),
  150. page.description.split(' ').map(s => ({
  151. input: s,
  152. weight: 3
  153. })),
  154. page.safeContent.split(' ').map(s => ({
  155. input: s,
  156. weight: 1
  157. }))
  158. )), ['input', ''])
  159. },
  160. /**
  161. * CREATE
  162. *
  163. * @param {Object} page Page to create
  164. */
  165. async created(page) {
  166. await this.client.index({
  167. index: this.config.indexName,
  168. type: '_doc',
  169. id: page.hash,
  170. body: {
  171. suggest: this.buildSuggest(page),
  172. locale: page.localeCode,
  173. path: page.path,
  174. title: page.title,
  175. description: page.description,
  176. content: page.safeContent,
  177. tags: await this.buildTags(page.id)
  178. },
  179. refresh: true
  180. })
  181. },
  182. /**
  183. * UPDATE
  184. *
  185. * @param {Object} page Page to update
  186. */
  187. async updated(page) {
  188. await this.client.index({
  189. index: this.config.indexName,
  190. type: '_doc',
  191. id: page.hash,
  192. body: {
  193. suggest: this.buildSuggest(page),
  194. locale: page.localeCode,
  195. path: page.path,
  196. title: page.title,
  197. description: page.description,
  198. content: page.safeContent,
  199. tags: await this.buildTags(page.id)
  200. },
  201. refresh: true
  202. })
  203. },
  204. /**
  205. * DELETE
  206. *
  207. * @param {Object} page Page to delete
  208. */
  209. async deleted(page) {
  210. await this.client.delete({
  211. index: this.config.indexName,
  212. type: '_doc',
  213. id: page.hash,
  214. refresh: true
  215. })
  216. },
  217. /**
  218. * RENAME
  219. *
  220. * @param {Object} page Page to rename
  221. */
  222. async renamed(page) {
  223. await this.client.delete({
  224. index: this.config.indexName,
  225. type: '_doc',
  226. id: page.hash,
  227. refresh: true
  228. })
  229. await this.client.index({
  230. index: this.config.indexName,
  231. type: '_doc',
  232. id: page.destinationHash,
  233. body: {
  234. suggest: this.buildSuggest(page),
  235. locale: page.destinationLocaleCode,
  236. path: page.destinationPath,
  237. title: page.title,
  238. description: page.description,
  239. content: page.safeContent,
  240. tags: await this.buildTags(page.id)
  241. },
  242. refresh: true
  243. })
  244. },
  245. /**
  246. * REBUILD INDEX
  247. */
  248. async rebuild() {
  249. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
  250. await this.client.indices.delete({ index: this.config.indexName })
  251. await this.createIndex()
  252. const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
  253. const MAX_INDEXING_COUNT = 1000
  254. const COMMA_BYTES = Buffer.from(',').byteLength
  255. let chunks = []
  256. let bytes = 0
  257. const processDocument = async (cb, doc) => {
  258. try {
  259. if (doc) {
  260. const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
  261. doc['tags'] = await this.buildTags(doc.realId)
  262. // -> Current batch exceeds size limit, flush
  263. if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
  264. await flushBuffer()
  265. }
  266. if (chunks.length > 0) {
  267. bytes += COMMA_BYTES
  268. }
  269. bytes += docBytes
  270. chunks.push(doc)
  271. // -> Current batch exceeds count limit, flush
  272. if (chunks.length >= MAX_INDEXING_COUNT) {
  273. await flushBuffer()
  274. }
  275. } else {
  276. // -> End of stream, flush
  277. await flushBuffer()
  278. }
  279. cb()
  280. } catch (err) {
  281. cb(err)
  282. }
  283. }
  284. const flushBuffer = async () => {
  285. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
  286. try {
  287. await this.client.bulk({
  288. index: this.config.indexName,
  289. body: _.reduce(chunks, (result, doc) => {
  290. result.push({
  291. index: {
  292. _index: this.config.indexName,
  293. _type: '_doc',
  294. _id: doc.id
  295. }
  296. })
  297. doc.safeContent = WIKI.models.pages.cleanHTML(doc.render)
  298. result.push({
  299. suggest: this.buildSuggest(doc),
  300. tags: doc.tags,
  301. locale: doc.locale,
  302. path: doc.path,
  303. title: doc.title,
  304. description: doc.description,
  305. content: doc.safeContent
  306. })
  307. return result
  308. }, []),
  309. refresh: true
  310. })
  311. } catch (err) {
  312. WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
  313. }
  314. chunks.length = 0
  315. bytes = 0
  316. }
  317. // Added real id in order to fetch page tags from the query
  318. await pipeline(
  319. WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render', { realId: 'id' }).select().from('pages').where({
  320. isPublished: true,
  321. isPrivate: false
  322. }).stream(),
  323. new stream.Transform({
  324. objectMode: true,
  325. transform: async (chunk, enc, cb) => processDocument(cb, chunk),
  326. flush: async (cb) => processDocument(cb)
  327. })
  328. )
  329. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
  330. }
  331. }