engine.js 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. const _ = require('lodash')
  2. const stream = require('stream')
  3. const Promise = require('bluebird')
  4. const pipeline = Promise.promisify(stream.pipeline)
  5. /* global WIKI */
  6. module.exports = {
  7. async activate() {
  8. // not used
  9. },
  10. async deactivate() {
  11. // not used
  12. },
  13. /**
  14. * INIT
  15. */
  16. async init() {
  17. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
  18. switch (this.config.apiVersion) {
  19. case '7.x':
  20. const { Client: Client7 } = require('elasticsearch7')
  21. this.client = new Client7({
  22. nodes: this.config.hosts.split(',').map(_.trim),
  23. sniffOnStart: this.config.sniffOnStart,
  24. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  25. name: 'wiki-js'
  26. })
  27. break
  28. case '6.x':
  29. const { Client: Client6 } = require('elasticsearch6')
  30. this.client = new Client6({
  31. nodes: this.config.hosts.split(',').map(_.trim),
  32. sniffOnStart: this.config.sniffOnStart,
  33. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  34. name: 'wiki-js'
  35. })
  36. break
  37. default:
  38. throw new Error('Unsupported version of elasticsearch! Update your settings in the Administration Area.')
  39. }
  40. // -> Create Search Index
  41. await this.createIndex()
  42. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
  43. },
  44. /**
  45. * Create Index
  46. */
  47. async createIndex() {
  48. try {
  49. const indexExists = await this.client.indices.exists({ index: this.config.indexName })
  50. if (!indexExists.body) {
  51. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
  52. try {
  53. const idxBody = {
  54. properties: {
  55. suggest: { type: 'completion' },
  56. title: { type: 'text', boost: 4.0 },
  57. description: { type: 'text', boost: 3.0 },
  58. content: { type: 'text', boost: 1.0 },
  59. locale: { type: 'keyword' },
  60. path: { type: 'text' }
  61. }
  62. }
  63. await this.client.indices.create({
  64. index: this.config.indexName,
  65. body: {
  66. mappings: (this.config.apiVersion === '6.x') ? {
  67. _doc: idxBody
  68. } : idxBody
  69. }
  70. })
  71. } catch (err) {
  72. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Create Index Error: `, _.get(err, 'meta.body.error', err))
  73. }
  74. }
  75. } catch (err) {
  76. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Index Check Error: `, _.get(err, 'meta.body.error', err))
  77. }
  78. },
  79. /**
  80. * QUERY
  81. *
  82. * @param {String} q Query
  83. * @param {Object} opts Additional options
  84. */
  85. async query(q, opts) {
  86. try {
  87. const results = await this.client.search({
  88. index: this.config.indexName,
  89. body: {
  90. query: {
  91. bool: {
  92. filter: [
  93. {
  94. bool: {
  95. should: [
  96. {
  97. simple_query_string: {
  98. query: q
  99. }
  100. },
  101. {
  102. query_string: {
  103. query: `*${q}*`
  104. }
  105. }
  106. ],
  107. minimum_should_match: 1
  108. }
  109. }
  110. ]
  111. }
  112. },
  113. from: 0,
  114. size: 50,
  115. _source: ['title', 'description', 'path', 'locale'],
  116. suggest: {
  117. suggestions: {
  118. text: q,
  119. completion: {
  120. field: 'suggest',
  121. size: 5,
  122. skip_duplicates: true,
  123. fuzzy: true
  124. }
  125. }
  126. }
  127. }
  128. })
  129. return {
  130. results: _.get(results, 'body.hits.hits', []).map(r => ({
  131. id: r._id,
  132. locale: r._source.locale,
  133. path: r._source.path,
  134. title: r._source.title,
  135. description: r._source.description
  136. })),
  137. suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
  138. totalHits: _.get(results, 'body.hits.total.value', _.get(results, 'body.hits.total', 0))
  139. }
  140. } catch (err) {
  141. WIKI.logger.warn('Search Engine Error: ', _.get(err, 'meta.body.error', err))
  142. }
  143. },
  144. /**
  145. * Build suggest field
  146. */
  147. buildSuggest(page) {
  148. return _.uniq(_.concat(
  149. page.title.split(' ').map(s => ({
  150. input: s,
  151. weight: 4
  152. })),
  153. page.description.split(' ').map(s => ({
  154. input: s,
  155. weight: 3
  156. })),
  157. page.safeContent.split(' ').map(s => ({
  158. input: s,
  159. weight: 1
  160. }))
  161. ))
  162. },
  163. /**
  164. * CREATE
  165. *
  166. * @param {Object} page Page to create
  167. */
  168. async created(page) {
  169. await this.client.index({
  170. index: this.config.indexName,
  171. type: '_doc',
  172. id: page.hash,
  173. body: {
  174. suggest: this.buildSuggest(page),
  175. locale: page.localeCode,
  176. path: page.path,
  177. title: page.title,
  178. description: page.description,
  179. content: page.safeContent
  180. },
  181. refresh: true
  182. })
  183. },
  184. /**
  185. * UPDATE
  186. *
  187. * @param {Object} page Page to update
  188. */
  189. async updated(page) {
  190. await this.client.index({
  191. index: this.config.indexName,
  192. type: '_doc',
  193. id: page.hash,
  194. body: {
  195. suggest: this.buildSuggest(page),
  196. locale: page.localeCode,
  197. path: page.path,
  198. title: page.title,
  199. description: page.description,
  200. content: page.safeContent
  201. },
  202. refresh: true
  203. })
  204. },
  205. /**
  206. * DELETE
  207. *
  208. * @param {Object} page Page to delete
  209. */
  210. async deleted(page) {
  211. await this.client.delete({
  212. index: this.config.indexName,
  213. type: '_doc',
  214. id: page.hash,
  215. refresh: true
  216. })
  217. },
  218. /**
  219. * RENAME
  220. *
  221. * @param {Object} page Page to rename
  222. */
  223. async renamed(page) {
  224. await this.client.delete({
  225. index: this.config.indexName,
  226. type: '_doc',
  227. id: page.hash,
  228. refresh: true
  229. })
  230. await this.client.index({
  231. index: this.config.indexName,
  232. type: '_doc',
  233. id: page.destinationHash,
  234. body: {
  235. suggest: this.buildSuggest(page),
  236. locale: page.destinationLocaleCode,
  237. path: page.destinationPath,
  238. title: page.title,
  239. description: page.description,
  240. content: page.safeContent
  241. },
  242. refresh: true
  243. })
  244. },
  245. /**
  246. * REBUILD INDEX
  247. */
  248. async rebuild() {
  249. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
  250. await this.client.indices.delete({ index: this.config.indexName })
  251. await this.createIndex()
  252. const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
  253. const MAX_INDEXING_COUNT = 1000
  254. const COMMA_BYTES = Buffer.from(',').byteLength
  255. let chunks = []
  256. let bytes = 0
  257. const processDocument = async (cb, doc) => {
  258. try {
  259. if (doc) {
  260. const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
  261. // -> Current batch exceeds size limit, flush
  262. if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
  263. await flushBuffer()
  264. }
  265. if (chunks.length > 0) {
  266. bytes += COMMA_BYTES
  267. }
  268. bytes += docBytes
  269. chunks.push(doc)
  270. // -> Current batch exceeds count limit, flush
  271. if (chunks.length >= MAX_INDEXING_COUNT) {
  272. await flushBuffer()
  273. }
  274. } else {
  275. // -> End of stream, flush
  276. await flushBuffer()
  277. }
  278. cb()
  279. } catch (err) {
  280. cb(err)
  281. }
  282. }
  283. const flushBuffer = async () => {
  284. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
  285. try {
  286. await this.client.bulk({
  287. index: this.config.indexName,
  288. body: _.reduce(chunks, (result, doc) => {
  289. result.push({
  290. index: {
  291. _index: this.config.indexName,
  292. _type: '_doc',
  293. _id: doc.id
  294. }
  295. })
  296. doc.safeContent = WIKI.models.pages.cleanHTML(doc.render)
  297. result.push({
  298. suggest: this.buildSuggest(doc),
  299. locale: doc.locale,
  300. path: doc.path,
  301. title: doc.title,
  302. description: doc.description,
  303. content: doc.safeContent
  304. })
  305. return result
  306. }, []),
  307. refresh: true
  308. })
  309. } catch (err) {
  310. WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
  311. }
  312. chunks.length = 0
  313. bytes = 0
  314. }
  315. await pipeline(
  316. WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render').select().from('pages').where({
  317. isPublished: true,
  318. isPrivate: false
  319. }).stream(),
  320. new stream.Transform({
  321. objectMode: true,
  322. transform: async (chunk, enc, cb) => processDocument(cb, chunk),
  323. flush: async (cb) => processDocument(cb)
  324. })
  325. )
  326. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
  327. }
  328. }