engine.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. const _ = require('lodash')
  2. const stream = require('stream')
  3. const Promise = require('bluebird')
  4. const fs = require('fs')
  5. const pipeline = Promise.promisify(stream.pipeline)
  6. /* global WIKI */
  7. module.exports = {
  8. async activate() {
  9. // not used
  10. },
  11. async deactivate() {
  12. // not used
  13. },
  14. /**
  15. * INIT
  16. */
  17. async init() {
  18. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
  19. switch (this.config.apiVersion) {
  20. case '8.x':
  21. const { Client: Client8 } = require('elasticsearch8')
  22. this.client = new Client8({
  23. nodes: this.config.hosts.split(',').map(_.trim),
  24. sniffOnStart: this.config.sniffOnStart,
  25. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  26. tls: getTlsOptions(this.config),
  27. name: 'wiki-js'
  28. })
  29. break
  30. case '7.x':
  31. const { Client: Client7 } = require('elasticsearch7')
  32. this.client = new Client7({
  33. nodes: this.config.hosts.split(',').map(_.trim),
  34. sniffOnStart: this.config.sniffOnStart,
  35. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  36. ssl: getTlsOptions(this.config),
  37. name: 'wiki-js'
  38. })
  39. break
  40. case '6.x':
  41. const { Client: Client6 } = require('elasticsearch6')
  42. this.client = new Client6({
  43. nodes: this.config.hosts.split(',').map(_.trim),
  44. sniffOnStart: this.config.sniffOnStart,
  45. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  46. ssl: getTlsOptions(this.config),
  47. name: 'wiki-js'
  48. })
  49. break
  50. default:
  51. throw new Error('Unsupported version of elasticsearch! Update your settings in the Administration Area.')
  52. }
  53. // -> Create Search Index
  54. await this.createIndex()
  55. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
  56. },
  57. /**
  58. * Create Index
  59. */
  60. async createIndex() {
  61. try {
  62. const indexExists = await this.client.indices.exists({ index: this.config.indexName })
  63. // Elasticsearch 6.x / 7.x
  64. if (this.config.apiVersion !== '8.x' && !indexExists.body) {
  65. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
  66. try {
  67. const idxBody = {
  68. properties: {
  69. suggest: { type: 'completion' },
  70. title: { type: 'text', boost: 10.0 },
  71. description: { type: 'text', boost: 3.0 },
  72. content: { type: 'text', boost: 1.0 },
  73. locale: { type: 'keyword' },
  74. path: { type: 'text' },
  75. tags: { type: 'text', boost: 8.0 }
  76. }
  77. }
  78. await this.client.indices.create({
  79. index: this.config.indexName,
  80. body: {
  81. mappings: (this.config.apiVersion === '6.x') ? {
  82. _doc: idxBody
  83. } : idxBody,
  84. settings: {
  85. analysis: {
  86. analyzer: {
  87. default: {
  88. type: this.config.analyzer
  89. }
  90. }
  91. }
  92. }
  93. }
  94. })
  95. } catch (err) {
  96. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Create Index Error: `, _.get(err, 'meta.body.error', err))
  97. }
  98. // Elasticsearch 8.x
  99. } else if (this.config.apiVersion === '8.x' && !indexExists) {
  100. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
  101. try {
  102. // 8.x Doesn't support boost in mappings, so we will need to boost at query time.
  103. const idxBody = {
  104. properties: {
  105. suggest: { type: 'completion' },
  106. title: { type: 'text' },
  107. description: { type: 'text' },
  108. content: { type: 'text' },
  109. locale: { type: 'keyword' },
  110. path: { type: 'text' },
  111. tags: { type: 'text' }
  112. }
  113. }
  114. await this.client.indices.create({
  115. index: this.config.indexName,
  116. body: {
  117. mappings: idxBody,
  118. settings: {
  119. analysis: {
  120. analyzer: {
  121. default: {
  122. type: this.config.analyzer
  123. }
  124. }
  125. }
  126. }
  127. }
  128. })
  129. } catch (err) {
  130. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Create Index Error: `, _.get(err, 'meta.body.error', err))
  131. }
  132. }
  133. } catch (err) {
  134. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Index Check Error: `, _.get(err, 'meta.body.error', err))
  135. }
  136. },
  137. /**
  138. * QUERY
  139. *
  140. * @param {String} q Query
  141. * @param {Object} opts Additional options
  142. */
  143. async query(q, opts) {
  144. try {
  145. const results = await this.client.search({
  146. index: this.config.indexName,
  147. body: {
  148. query: {
  149. simple_query_string: {
  150. query: `*${q}*`,
  151. fields: ['title^20', 'description^3', 'tags^8', 'content^1'],
  152. default_operator: 'and',
  153. analyze_wildcard: true
  154. }
  155. },
  156. from: 0,
  157. size: 50,
  158. _source: ['title', 'description', 'path', 'locale'],
  159. suggest: {
  160. suggestions: {
  161. text: q,
  162. completion: {
  163. field: 'suggest',
  164. size: 5,
  165. skip_duplicates: true,
  166. fuzzy: true
  167. }
  168. }
  169. }
  170. }
  171. })
  172. return {
  173. results: _.get(results, this.config.apiVersion === '8.x' ? 'hits.hits' : 'body.hits.hits', []).map(r => ({
  174. id: r._id,
  175. locale: r._source.locale,
  176. path: r._source.path,
  177. title: r._source.title,
  178. description: r._source.description
  179. })),
  180. suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
  181. totalHits: _.get(results, this.config.apiVersion === '8.x' ? 'hits.total.value' : 'body.hits.total.value', _.get(results, this.config.apiVersion === '8.x' ? 'hits.total' : 'body.hits.total', 0))
  182. }
  183. } catch (err) {
  184. WIKI.logger.warn('Search Engine Error: ', _.get(err, 'meta.body.error', err))
  185. }
  186. },
  187. /**
  188. * Build tags field
  189. * @param id
  190. * @returns {Promise<*|*[]>}
  191. */
  192. async buildTags(id) {
  193. const tags = await WIKI.models.pages.query().findById(id).select('*').withGraphJoined('tags')
  194. return (tags.tags && tags.tags.length > 0) ? tags.tags.map(function (tag) {
  195. return tag.title
  196. }) : []
  197. },
  198. /**
  199. * Build suggest field
  200. */
  201. buildSuggest(page) {
  202. return _.reject(_.uniq(_.concat(
  203. page.title.split(' ').map(s => ({
  204. input: s,
  205. weight: 10
  206. })),
  207. page.description.split(' ').map(s => ({
  208. input: s,
  209. weight: 3
  210. })),
  211. page.safeContent.split(' ').map(s => ({
  212. input: s,
  213. weight: 1
  214. }))
  215. )), ['input', ''])
  216. },
  217. /**
  218. * CREATE
  219. *
  220. * @param {Object} page Page to create
  221. */
  222. async created(page) {
  223. await this.client.index({
  224. index: this.config.indexName,
  225. ...(this.config.apiVersion !== '8.x' && { type: '_doc' }),
  226. id: page.hash,
  227. body: {
  228. suggest: this.buildSuggest(page),
  229. locale: page.localeCode,
  230. path: page.path,
  231. title: page.title,
  232. description: page.description,
  233. content: page.safeContent,
  234. tags: await this.buildTags(page.id)
  235. },
  236. refresh: true
  237. })
  238. },
  239. /**
  240. * UPDATE
  241. *
  242. * @param {Object} page Page to update
  243. */
  244. async updated(page) {
  245. await this.client.index({
  246. index: this.config.indexName,
  247. ...(this.config.apiVersion !== '8.x' && { type: '_doc' }),
  248. id: page.hash,
  249. body: {
  250. suggest: this.buildSuggest(page),
  251. locale: page.localeCode,
  252. path: page.path,
  253. title: page.title,
  254. description: page.description,
  255. content: page.safeContent,
  256. tags: await this.buildTags(page.id)
  257. },
  258. refresh: true
  259. })
  260. },
  261. /**
  262. * DELETE
  263. *
  264. * @param {Object} page Page to delete
  265. */
  266. async deleted(page) {
  267. await this.client.delete({
  268. index: this.config.indexName,
  269. ...(this.config.apiVersion !== '8.x' && { type: '_doc' }),
  270. id: page.hash,
  271. refresh: true
  272. })
  273. },
  274. /**
  275. * RENAME
  276. *
  277. * @param {Object} page Page to rename
  278. */
  279. async renamed(page) {
  280. await this.client.delete({
  281. index: this.config.indexName,
  282. ...(this.config.apiVersion !== '8.x' && { type: '_doc' }),
  283. id: page.hash,
  284. refresh: true
  285. })
  286. await this.client.index({
  287. index: this.config.indexName,
  288. ...(this.config.apiVersion !== '8.x' && { type: '_doc' }),
  289. id: page.destinationHash,
  290. body: {
  291. suggest: this.buildSuggest(page),
  292. locale: page.destinationLocaleCode,
  293. path: page.destinationPath,
  294. title: page.title,
  295. description: page.description,
  296. content: page.safeContent,
  297. tags: await this.buildTags(page.id)
  298. },
  299. refresh: true
  300. })
  301. },
  302. /**
  303. * REBUILD INDEX
  304. */
  305. async rebuild() {
  306. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
  307. await this.client.indices.delete({ index: this.config.indexName })
  308. await this.createIndex()
  309. const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
  310. const MAX_INDEXING_COUNT = 1000
  311. const COMMA_BYTES = Buffer.from(',').byteLength
  312. let chunks = []
  313. let bytes = 0
  314. const processDocument = async (cb, doc) => {
  315. try {
  316. if (doc) {
  317. const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
  318. doc['tags'] = await this.buildTags(doc.realId)
  319. // -> Current batch exceeds size limit, flush
  320. if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
  321. await flushBuffer()
  322. }
  323. if (chunks.length > 0) {
  324. bytes += COMMA_BYTES
  325. }
  326. bytes += docBytes
  327. chunks.push(doc)
  328. // -> Current batch exceeds count limit, flush
  329. if (chunks.length >= MAX_INDEXING_COUNT) {
  330. await flushBuffer()
  331. }
  332. } else {
  333. // -> End of stream, flush
  334. await flushBuffer()
  335. }
  336. cb()
  337. } catch (err) {
  338. cb(err)
  339. }
  340. }
  341. const flushBuffer = async () => {
  342. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
  343. try {
  344. await this.client.bulk({
  345. index: this.config.indexName,
  346. body: _.reduce(chunks, (result, doc) => {
  347. result.push({
  348. index: {
  349. _index: this.config.indexName,
  350. _id: doc.id,
  351. ...(this.config.apiVersion !== '8.x' && { _type: '_doc' })
  352. }
  353. })
  354. doc.safeContent = WIKI.models.pages.cleanHTML(doc.render)
  355. result.push({
  356. suggest: this.buildSuggest(doc),
  357. tags: doc.tags,
  358. locale: doc.locale,
  359. path: doc.path,
  360. title: doc.title,
  361. description: doc.description,
  362. content: doc.safeContent
  363. })
  364. return result
  365. }, []),
  366. refresh: true
  367. })
  368. } catch (err) {
  369. WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
  370. }
  371. chunks.length = 0
  372. bytes = 0
  373. }
  374. // Added real id in order to fetch page tags from the query
  375. await pipeline(
  376. WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render', { realId: 'id' }).select().from('pages').where({
  377. isPublished: true,
  378. isPrivate: false
  379. }).stream(),
  380. new stream.Transform({
  381. objectMode: true,
  382. transform: async (chunk, enc, cb) => processDocument(cb, chunk),
  383. flush: async (cb) => processDocument(cb)
  384. })
  385. )
  386. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
  387. }
  388. }
  389. function getTlsOptions(conf) {
  390. if (!conf.tlsCertPath) {
  391. return {
  392. rejectUnauthorized: conf.verifyTLSCertificate
  393. }
  394. }
  395. const caList = []
  396. if (conf.verifyTLSCertificate) {
  397. caList.push(fs.readFileSync(conf.tlsCertPath))
  398. }
  399. return {
  400. rejectUnauthorized: conf.verifyTLSCertificate,
  401. ca: caList
  402. }
  403. }