db.mjs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. import { get, has, isEmpty, isPlainObject } from 'lodash-es'
  2. import path from 'node:path'
  3. import knex from 'knex'
  4. import fs from 'node:fs/promises'
  5. import Objection from 'objection'
  6. import PGPubSub from 'pg-pubsub'
  7. import migrationSource from '../db/migrator-source.mjs'
  8. // const migrateFromLegacy = require('../db/legacy')
  9. import { setTimeout } from 'node:timers/promises'
  10. /**
  11. * ORM DB module
  12. */
  13. export default {
  14. Objection,
  15. knex: null,
  16. listener: null,
  17. config: null,
  18. /**
  19. * Initialize DB
  20. */
  21. async init (workerMode = false) {
  22. let self = this
  23. WIKI.logger.info('Checking DB configuration...')
  24. // Fetch DB Config
  25. this.config = (!isEmpty(process.env.DATABASE_URL)) ? process.env.DATABASE_URL : {
  26. host: WIKI.config.db.host.toString(),
  27. user: WIKI.config.db.user.toString(),
  28. password: WIKI.config.db.pass.toString(),
  29. database: WIKI.config.db.db.toString(),
  30. port: WIKI.config.db.port
  31. }
  32. // Handle SSL Options
  33. let dbUseSSL = (WIKI.config.db.ssl === true || WIKI.config.db.ssl === 'true' || WIKI.config.db.ssl === 1 || WIKI.config.db.ssl === '1')
  34. let sslOptions = null
  35. if (dbUseSSL && isPlainObject(this.config) && get(WIKI.config.db, 'sslOptions.auto', null) === false) {
  36. sslOptions = WIKI.config.db.sslOptions
  37. sslOptions.rejectUnauthorized = sslOptions.rejectUnauthorized !== false
  38. if (sslOptions.ca && sslOptions.ca.indexOf('-----') !== 0) {
  39. sslOptions.ca = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.ca), 'utf-8')
  40. }
  41. if (sslOptions.cert) {
  42. sslOptions.cert = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.cert), 'utf-8')
  43. }
  44. if (sslOptions.key) {
  45. sslOptions.key = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.key), 'utf-8')
  46. }
  47. if (sslOptions.pfx) {
  48. sslOptions.pfx = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.pfx), 'utf-8')
  49. }
  50. } else {
  51. sslOptions = true
  52. }
  53. // Handle inline SSL CA Certificate mode
  54. if (!isEmpty(process.env.DB_SSL_CA)) {
  55. const chunks = []
  56. for (let i = 0, charsLength = process.env.DB_SSL_CA.length; i < charsLength; i += 64) {
  57. chunks.push(process.env.DB_SSL_CA.substring(i, i + 64))
  58. }
  59. dbUseSSL = true
  60. sslOptions = {
  61. rejectUnauthorized: true,
  62. ca: '-----BEGIN CERTIFICATE-----\n' + chunks.join('\n') + '\n-----END CERTIFICATE-----\n'
  63. }
  64. }
  65. if (dbUseSSL && isPlainObject(this.config)) {
  66. this.config.ssl = (sslOptions === true) ? { rejectUnauthorized: true } : sslOptions
  67. }
  68. // Initialize Knex
  69. this.knex = knex({
  70. client: 'pg',
  71. useNullAsDefault: true,
  72. asyncStackTraces: WIKI.IS_DEBUG,
  73. connection: this.config,
  74. searchPath: [WIKI.config.db.schemas.wiki],
  75. pool: {
  76. ...workerMode ? { min: 0, max: 1 } : WIKI.config.pool,
  77. async afterCreate(conn, done) {
  78. // -> Set Connection App Name
  79. if (workerMode) {
  80. await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}'`)
  81. } else {
  82. await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}:MAIN'`)
  83. }
  84. done()
  85. }
  86. },
  87. debug: WIKI.IS_DEBUG
  88. })
  89. Objection.Model.knex(this.knex)
  90. // Load DB Models
  91. WIKI.logger.info('Loading DB models...')
  92. const models = (await import(path.join(WIKI.SERVERPATH, 'models/index.mjs'))).default
  93. // Set init tasks
  94. let conAttempts = 0
  95. const initTasks = {
  96. // -> Attempt initial connection
  97. async connect () {
  98. try {
  99. WIKI.logger.info('Connecting to database...')
  100. await self.knex.raw('SELECT 1 + 1;')
  101. WIKI.logger.info('Database Connection Successful [ OK ]')
  102. } catch (err) {
  103. if (conAttempts < 10) {
  104. if (err.code) {
  105. WIKI.logger.error(`Database Connection Error: ${err.code} ${err.address}:${err.port}`)
  106. } else {
  107. WIKI.logger.error(`Database Connection Error: ${err.message}`)
  108. }
  109. WIKI.logger.warn(`Will retry in 3 seconds... [Attempt ${++conAttempts} of 10]`)
  110. await setTimeout(3000)
  111. await initTasks.connect()
  112. } else {
  113. throw err
  114. }
  115. }
  116. },
  117. // -> Migrate DB Schemas
  118. async syncSchemas () {
  119. WIKI.logger.info('Ensuring DB schema exists...')
  120. await self.knex.raw(`CREATE SCHEMA IF NOT EXISTS ${WIKI.config.db.schemas.wiki}`)
  121. WIKI.logger.info('Ensuring DB migrations have been applied...')
  122. return self.knex.migrate.latest({
  123. tableName: 'migrations',
  124. migrationSource,
  125. schemaName: WIKI.config.db.schemas.wiki
  126. })
  127. },
  128. // -> Migrate DB Schemas from 2.x
  129. async migrateFromLegacy () {
  130. // return migrateFromLegacy.migrate(self.knex)
  131. }
  132. }
  133. // Perform init tasks
  134. this.onReady = workerMode ? Promise.resolve() : (async () => {
  135. await initTasks.connect()
  136. await initTasks.migrateFromLegacy()
  137. await initTasks.syncSchemas()
  138. })()
  139. return {
  140. ...this,
  141. ...models
  142. }
  143. },
  144. /**
  145. * Subscribe to database LISTEN / NOTIFY for multi-instances events
  146. */
  147. async subscribeToNotifications () {
  148. let connSettings = this.knex.client.connectionSettings
  149. if (typeof connSettings === 'string') {
  150. const encodedName = encodeURIComponent(`Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`)
  151. if (connSettings.indexOf('?') > 0) {
  152. connSettings = `${connSettings}&ApplicationName=${encodedName}`
  153. } else {
  154. connSettings = `${connSettings}?ApplicationName=${encodedName}`
  155. }
  156. } else {
  157. connSettings.application_name = `Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`
  158. }
  159. this.listener = new PGPubSub(connSettings, {
  160. log (ev) {
  161. WIKI.logger.debug(ev)
  162. }
  163. })
  164. // -> Outbound events handling
  165. this.listener.addChannel('wiki', payload => {
  166. if (has(payload, 'event') && payload.source !== WIKI.INSTANCE_ID) {
  167. WIKI.logger.info(`Received event ${payload.event} from instance ${payload.source}: [ OK ]`)
  168. WIKI.events.inbound.emit(payload.event, payload.value)
  169. }
  170. })
  171. WIKI.events.outbound.onAny(this.notifyViaDB)
  172. // -> Listen to inbound events
  173. WIKI.auth.subscribeToEvents()
  174. WIKI.configSvc.subscribeToEvents()
  175. WIKI.db.pages.subscribeToEvents()
  176. WIKI.logger.info(`PG PubSub Listener initialized successfully: [ OK ]`)
  177. },
  178. /**
  179. * Unsubscribe from database LISTEN / NOTIFY
  180. */
  181. async unsubscribeToNotifications () {
  182. if (this.listener) {
  183. WIKI.events.outbound.offAny(this.notifyViaDB)
  184. WIKI.events.inbound.removeAllListeners()
  185. this.listener.close()
  186. }
  187. },
  188. /**
  189. * Publish event via database NOTIFY
  190. *
  191. * @param {string} event Event fired
  192. * @param {object} value Payload of the event
  193. */
  194. notifyViaDB (event, value) {
  195. WIKI.db.listener.publish('wiki', {
  196. source: WIKI.INSTANCE_ID,
  197. event,
  198. value
  199. })
  200. }
  201. }