engine.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. const _ = require('lodash')
  2. const stream = require('stream')
  3. const Promise = require('bluebird')
  4. const fs = require('fs')
  5. const pipeline = Promise.promisify(stream.pipeline)
  6. /* global WIKI */
  7. module.exports = {
  8. async activate() {
  9. // not used
  10. },
  11. async deactivate() {
  12. // not used
  13. },
  14. /**
  15. * INIT
  16. */
  17. async init() {
  18. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
  19. switch (this.config.apiVersion) {
  20. case '7.x':
  21. const { Client: Client7 } = require('elasticsearch7')
  22. this.client = new Client7({
  23. nodes: this.config.hosts.split(',').map(_.trim),
  24. sniffOnStart: this.config.sniffOnStart,
  25. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  26. ssl: getTlsOptions(this.config),
  27. name: 'wiki-js'
  28. })
  29. break
  30. case '6.x':
  31. const { Client: Client6 } = require('elasticsearch6')
  32. this.client = new Client6({
  33. nodes: this.config.hosts.split(',').map(_.trim),
  34. sniffOnStart: this.config.sniffOnStart,
  35. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  36. ssl: getTlsOptions(this.config),
  37. name: 'wiki-js'
  38. })
  39. break
  40. default:
  41. throw new Error('Unsupported version of elasticsearch! Update your settings in the Administration Area.')
  42. }
  43. // -> Create Search Index
  44. await this.createIndex()
  45. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
  46. },
  47. /**
  48. * Create Index
  49. */
  50. async createIndex() {
  51. try {
  52. const indexExists = await this.client.indices.exists({ index: this.config.indexName })
  53. if (!indexExists.body) {
  54. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
  55. try {
  56. const idxBody = {
  57. properties: {
  58. suggest: { type: 'completion' },
  59. title: { type: 'text', boost: 10.0 },
  60. description: { type: 'text', boost: 3.0 },
  61. content: { type: 'text', boost: 1.0 },
  62. locale: { type: 'keyword' },
  63. path: { type: 'text' },
  64. tags: { type: 'text', boost: 8.0 }
  65. }
  66. }
  67. await this.client.indices.create({
  68. index: this.config.indexName,
  69. body: {
  70. mappings: (this.config.apiVersion === '6.x') ? {
  71. _doc: idxBody
  72. } : idxBody,
  73. settings: {
  74. analysis: {
  75. analyzer: {
  76. default: {
  77. type: this.config.analyzer
  78. }
  79. }
  80. }
  81. }
  82. }
  83. })
  84. } catch (err) {
  85. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Create Index Error: `, _.get(err, 'meta.body.error', err))
  86. }
  87. }
  88. } catch (err) {
  89. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Index Check Error: `, _.get(err, 'meta.body.error', err))
  90. }
  91. },
  92. /**
  93. * QUERY
  94. *
  95. * @param {String} q Query
  96. * @param {Object} opts Additional options
  97. */
  98. async query(q, opts) {
  99. try {
  100. const results = await this.client.search({
  101. index: this.config.indexName,
  102. body: {
  103. query: {
  104. simple_query_string: {
  105. query: `*${q}*`,
  106. fields: ['title^20', 'description^3', 'tags^8', 'content^1'],
  107. default_operator: 'and',
  108. analyze_wildcard: true
  109. }
  110. },
  111. from: 0,
  112. size: 50,
  113. _source: ['title', 'description', 'path', 'locale'],
  114. suggest: {
  115. suggestions: {
  116. text: q,
  117. completion: {
  118. field: 'suggest',
  119. size: 5,
  120. skip_duplicates: true,
  121. fuzzy: true
  122. }
  123. }
  124. }
  125. }
  126. })
  127. return {
  128. results: _.get(results, 'body.hits.hits', []).map(r => ({
  129. id: r._id,
  130. locale: r._source.locale,
  131. path: r._source.path,
  132. title: r._source.title,
  133. description: r._source.description
  134. })),
  135. suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
  136. totalHits: _.get(results, 'body.hits.total.value', _.get(results, 'body.hits.total', 0))
  137. }
  138. } catch (err) {
  139. WIKI.logger.warn('Search Engine Error: ', _.get(err, 'meta.body.error', err))
  140. }
  141. },
  142. /**
  143. * Build tags field
  144. * @param id
  145. * @returns {Promise<*|*[]>}
  146. */
  147. async buildTags(id) {
  148. const tags = await WIKI.models.pages.query().findById(id).select('*').withGraphJoined('tags')
  149. return (tags.tags && tags.tags.length > 0) ? tags.tags.map(function (tag) {
  150. return tag.title
  151. }) : []
  152. },
  153. /**
  154. * Build suggest field
  155. */
  156. buildSuggest(page) {
  157. return _.reject(_.uniq(_.concat(
  158. page.title.split(' ').map(s => ({
  159. input: s,
  160. weight: 10
  161. })),
  162. page.description.split(' ').map(s => ({
  163. input: s,
  164. weight: 3
  165. })),
  166. page.safeContent.split(' ').map(s => ({
  167. input: s,
  168. weight: 1
  169. }))
  170. )), ['input', ''])
  171. },
  172. /**
  173. * CREATE
  174. *
  175. * @param {Object} page Page to create
  176. */
  177. async created(page) {
  178. await this.client.index({
  179. index: this.config.indexName,
  180. type: '_doc',
  181. id: page.hash,
  182. body: {
  183. suggest: this.buildSuggest(page),
  184. locale: page.localeCode,
  185. path: page.path,
  186. title: page.title,
  187. description: page.description,
  188. content: page.safeContent,
  189. tags: await this.buildTags(page.id)
  190. },
  191. refresh: true
  192. })
  193. },
  194. /**
  195. * UPDATE
  196. *
  197. * @param {Object} page Page to update
  198. */
  199. async updated(page) {
  200. await this.client.index({
  201. index: this.config.indexName,
  202. type: '_doc',
  203. id: page.hash,
  204. body: {
  205. suggest: this.buildSuggest(page),
  206. locale: page.localeCode,
  207. path: page.path,
  208. title: page.title,
  209. description: page.description,
  210. content: page.safeContent,
  211. tags: await this.buildTags(page.id)
  212. },
  213. refresh: true
  214. })
  215. },
  216. /**
  217. * DELETE
  218. *
  219. * @param {Object} page Page to delete
  220. */
  221. async deleted(page) {
  222. await this.client.delete({
  223. index: this.config.indexName,
  224. type: '_doc',
  225. id: page.hash,
  226. refresh: true
  227. })
  228. },
  229. /**
  230. * RENAME
  231. *
  232. * @param {Object} page Page to rename
  233. */
  234. async renamed(page) {
  235. await this.client.delete({
  236. index: this.config.indexName,
  237. type: '_doc',
  238. id: page.hash,
  239. refresh: true
  240. })
  241. await this.client.index({
  242. index: this.config.indexName,
  243. type: '_doc',
  244. id: page.destinationHash,
  245. body: {
  246. suggest: this.buildSuggest(page),
  247. locale: page.destinationLocaleCode,
  248. path: page.destinationPath,
  249. title: page.title,
  250. description: page.description,
  251. content: page.safeContent,
  252. tags: await this.buildTags(page.id)
  253. },
  254. refresh: true
  255. })
  256. },
  257. /**
  258. * REBUILD INDEX
  259. */
  260. async rebuild() {
  261. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
  262. await this.client.indices.delete({ index: this.config.indexName })
  263. await this.createIndex()
  264. const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
  265. const MAX_INDEXING_COUNT = 1000
  266. const COMMA_BYTES = Buffer.from(',').byteLength
  267. let chunks = []
  268. let bytes = 0
  269. const processDocument = async (cb, doc) => {
  270. try {
  271. if (doc) {
  272. const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
  273. doc['tags'] = await this.buildTags(doc.realId)
  274. // -> Current batch exceeds size limit, flush
  275. if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
  276. await flushBuffer()
  277. }
  278. if (chunks.length > 0) {
  279. bytes += COMMA_BYTES
  280. }
  281. bytes += docBytes
  282. chunks.push(doc)
  283. // -> Current batch exceeds count limit, flush
  284. if (chunks.length >= MAX_INDEXING_COUNT) {
  285. await flushBuffer()
  286. }
  287. } else {
  288. // -> End of stream, flush
  289. await flushBuffer()
  290. }
  291. cb()
  292. } catch (err) {
  293. cb(err)
  294. }
  295. }
  296. const flushBuffer = async () => {
  297. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
  298. try {
  299. await this.client.bulk({
  300. index: this.config.indexName,
  301. body: _.reduce(chunks, (result, doc) => {
  302. result.push({
  303. index: {
  304. _index: this.config.indexName,
  305. _type: '_doc',
  306. _id: doc.id
  307. }
  308. })
  309. doc.safeContent = WIKI.models.pages.cleanHTML(doc.render)
  310. result.push({
  311. suggest: this.buildSuggest(doc),
  312. tags: doc.tags,
  313. locale: doc.locale,
  314. path: doc.path,
  315. title: doc.title,
  316. description: doc.description,
  317. content: doc.safeContent
  318. })
  319. return result
  320. }, []),
  321. refresh: true
  322. })
  323. } catch (err) {
  324. WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
  325. }
  326. chunks.length = 0
  327. bytes = 0
  328. }
  329. // Added real id in order to fetch page tags from the query
  330. await pipeline(
  331. WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render', { realId: 'id' }).select().from('pages').where({
  332. isPublished: true,
  333. isPrivate: false
  334. }).stream(),
  335. new stream.Transform({
  336. objectMode: true,
  337. transform: async (chunk, enc, cb) => processDocument(cb, chunk),
  338. flush: async (cb) => processDocument(cb)
  339. })
  340. )
  341. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
  342. }
  343. }
  344. function getTlsOptions(conf) {
  345. if (!conf.tlsCertPath) {
  346. return {
  347. rejectUnauthorized: conf.verifyTLSCertificate
  348. }
  349. }
  350. const caList = []
  351. if (conf.verifyTLSCertificate) {
  352. caList.push(fs.readFileSync(conf.tlsCertPath))
  353. }
  354. return {
  355. rejectUnauthorized: conf.verifyTLSCertificate,
  356. ca: caList
  357. }
  358. }