2
0

db.js 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. const _ = require('lodash')
  2. const autoload = require('auto-load')
  3. const path = require('path')
  4. const Knex = require('knex')
  5. const fs = require('fs')
  6. const Objection = require('objection')
  7. const PGPubSub = require('pg-pubsub')
  8. const migrationSource = require('../db/migrator-source')
  9. const migrateFromLegacy = require('../db/legacy')
  10. const { setTimeout } = require('timers/promises')
  11. /**
  12. * ORM DB module
  13. */
  14. module.exports = {
  15. Objection,
  16. knex: null,
  17. listener: null,
  18. config: null,
  19. /**
  20. * Initialize DB
  21. */
  22. init(workerMode = false) {
  23. let self = this
  24. WIKI.logger.info('Checking DB configuration...')
  25. // Fetch DB Config
  26. this.config = (!_.isEmpty(process.env.DATABASE_URL)) ? process.env.DATABASE_URL : {
  27. host: WIKI.config.db.host.toString(),
  28. user: WIKI.config.db.user.toString(),
  29. password: WIKI.config.db.pass.toString(),
  30. database: WIKI.config.db.db.toString(),
  31. port: WIKI.config.db.port
  32. }
  33. // Handle SSL Options
  34. let dbUseSSL = (WIKI.config.db.ssl === true || WIKI.config.db.ssl === 'true' || WIKI.config.db.ssl === 1 || WIKI.config.db.ssl === '1')
  35. let sslOptions = null
  36. if (dbUseSSL && _.isPlainObject(this.config) && _.get(WIKI.config.db, 'sslOptions.auto', null) === false) {
  37. sslOptions = WIKI.config.db.sslOptions
  38. sslOptions.rejectUnauthorized = sslOptions.rejectUnauthorized !== false
  39. if (sslOptions.ca && sslOptions.ca.indexOf('-----') !== 0) {
  40. sslOptions.ca = fs.readFileSync(path.resolve(WIKI.ROOTPATH, sslOptions.ca))
  41. }
  42. if (sslOptions.cert) {
  43. sslOptions.cert = fs.readFileSync(path.resolve(WIKI.ROOTPATH, sslOptions.cert))
  44. }
  45. if (sslOptions.key) {
  46. sslOptions.key = fs.readFileSync(path.resolve(WIKI.ROOTPATH, sslOptions.key))
  47. }
  48. if (sslOptions.pfx) {
  49. sslOptions.pfx = fs.readFileSync(path.resolve(WIKI.ROOTPATH, sslOptions.pfx))
  50. }
  51. } else {
  52. sslOptions = true
  53. }
  54. // Handle inline SSL CA Certificate mode
  55. if (!_.isEmpty(process.env.DB_SSL_CA)) {
  56. const chunks = []
  57. for (let i = 0, charsLength = process.env.DB_SSL_CA.length; i < charsLength; i += 64) {
  58. chunks.push(process.env.DB_SSL_CA.substring(i, i + 64))
  59. }
  60. dbUseSSL = true
  61. sslOptions = {
  62. rejectUnauthorized: true,
  63. ca: '-----BEGIN CERTIFICATE-----\n' + chunks.join('\n') + '\n-----END CERTIFICATE-----\n'
  64. }
  65. }
  66. if (dbUseSSL && _.isPlainObject(this.config)) {
  67. this.config.ssl = (sslOptions === true) ? { rejectUnauthorized: true } : sslOptions
  68. }
  69. // Initialize Knex
  70. this.knex = Knex({
  71. client: 'pg',
  72. useNullAsDefault: true,
  73. asyncStackTraces: WIKI.IS_DEBUG,
  74. connection: this.config,
  75. searchPath: [WIKI.config.db.schemas.wiki],
  76. pool: {
  77. ...workerMode ? { min: 0, max: 1 } : WIKI.config.pool,
  78. async afterCreate(conn, done) {
  79. // -> Set Connection App Name
  80. if (workerMode) {
  81. await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}'`)
  82. } else {
  83. await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}:MAIN'`)
  84. }
  85. done()
  86. }
  87. },
  88. debug: WIKI.IS_DEBUG
  89. })
  90. Objection.Model.knex(this.knex)
  91. // Load DB Models
  92. WIKI.logger.info('Loading DB models...')
  93. const models = autoload(path.join(WIKI.SERVERPATH, 'models'))
  94. // Set init tasks
  95. let conAttempts = 0
  96. let initTasks = {
  97. // -> Attempt initial connection
  98. async connect () {
  99. try {
  100. WIKI.logger.info('Connecting to database...')
  101. await self.knex.raw('SELECT 1 + 1;')
  102. WIKI.logger.info('Database Connection Successful [ OK ]')
  103. } catch (err) {
  104. if (conAttempts < 10) {
  105. if (err.code) {
  106. WIKI.logger.error(`Database Connection Error: ${err.code} ${err.address}:${err.port}`)
  107. } else {
  108. WIKI.logger.error(`Database Connection Error: ${err.message}`)
  109. }
  110. WIKI.logger.warn(`Will retry in 3 seconds... [Attempt ${++conAttempts} of 10]`)
  111. await setTimeout(3000)
  112. await initTasks.connect()
  113. } else {
  114. throw err
  115. }
  116. }
  117. },
  118. // -> Migrate DB Schemas
  119. async syncSchemas () {
  120. WIKI.logger.info('Ensuring DB schema exists...')
  121. await self.knex.raw(`CREATE SCHEMA IF NOT EXISTS ${WIKI.config.db.schemas.wiki}`)
  122. WIKI.logger.info('Ensuring DB migrations have been applied...')
  123. return self.knex.migrate.latest({
  124. tableName: 'migrations',
  125. migrationSource,
  126. schemaName: WIKI.config.db.schemas.wiki
  127. })
  128. },
  129. // -> Migrate DB Schemas from 2.x
  130. async migrateFromLegacy () {
  131. return migrateFromLegacy.migrate(self.knex)
  132. }
  133. }
  134. // Perform init tasks
  135. this.onReady = workerMode ? Promise.resolve() : (async () => {
  136. await initTasks.connect()
  137. await initTasks.migrateFromLegacy()
  138. await initTasks.syncSchemas()
  139. })()
  140. return {
  141. ...this,
  142. ...models
  143. }
  144. },
  145. /**
  146. * Subscribe to database LISTEN / NOTIFY for multi-instances events
  147. */
  148. async subscribeToNotifications () {
  149. let connSettings = this.knex.client.connectionSettings
  150. if (typeof connSettings === 'string') {
  151. const encodedName = encodeURIComponent(`Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`)
  152. if (connSettings.indexOf('?') > 0) {
  153. connSettings = `${connSettings}&ApplicationName=${encodedName}`
  154. } else {
  155. connSettings = `${connSettings}?ApplicationName=${encodedName}`
  156. }
  157. } else {
  158. connSettings.application_name = `Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`
  159. }
  160. this.listener = new PGPubSub(connSettings, {
  161. log (ev) {
  162. WIKI.logger.debug(ev)
  163. }
  164. })
  165. // -> Outbound events handling
  166. this.listener.addChannel('wiki', payload => {
  167. if (_.has(payload, 'event') && payload.source !== WIKI.INSTANCE_ID) {
  168. WIKI.logger.info(`Received event ${payload.event} from instance ${payload.source}: [ OK ]`)
  169. WIKI.events.inbound.emit(payload.event, payload.value)
  170. }
  171. })
  172. WIKI.events.outbound.onAny(this.notifyViaDB)
  173. // -> Listen to inbound events
  174. WIKI.auth.subscribeToEvents()
  175. WIKI.configSvc.subscribeToEvents()
  176. WIKI.db.pages.subscribeToEvents()
  177. WIKI.logger.info(`PG PubSub Listener initialized successfully: [ OK ]`)
  178. },
  179. /**
  180. * Unsubscribe from database LISTEN / NOTIFY
  181. */
  182. async unsubscribeToNotifications () {
  183. if (this.listener) {
  184. WIKI.events.outbound.offAny(this.notifyViaDB)
  185. WIKI.events.inbound.removeAllListeners()
  186. this.listener.close()
  187. }
  188. },
  189. /**
  190. * Publish event via database NOTIFY
  191. *
  192. * @param {string} event Event fired
  193. * @param {object} value Payload of the event
  194. */
  195. notifyViaDB (event, value) {
  196. WIKI.db.listener.publish('wiki', {
  197. source: WIKI.INSTANCE_ID,
  198. event,
  199. value
  200. })
  201. }
  202. }