Sogo.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. import requests
  2. import urllib3
  3. import os
  4. from uuid import uuid4
  5. from collections import defaultdict
  6. class Sogo:
  7. def __init__(self, username, password=""):
  8. self.apiUrl = "/SOGo/so"
  9. self.davUrl = "/SOGo/dav"
  10. self.ignore_ssl_errors = True
  11. self.baseUrl = f"https://{os.getenv('IPv4_NETWORK', '172.22.1')}.247:{os.getenv('HTTPS_PORT', '443')}"
  12. self.host = os.getenv("MAILCOW_HOSTNAME", "")
  13. if self.ignore_ssl_errors:
  14. urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
  15. self.username = username
  16. self.password = password
  17. def addCalendar(self, calendar_name):
  18. """
  19. Add a new calendar to the sogo instance.
  20. :param calendar_name: Name of the calendar to be created
  21. :return: Response from the sogo API.
  22. """
  23. res = self.post(f"/{self.username}/Calendar/createFolder", {
  24. "name": calendar_name
  25. })
  26. try:
  27. return res.json()
  28. except ValueError:
  29. return res.text
  30. def getCalendarIdByName(self, calendar_name):
  31. """
  32. Get the calendar ID by its name.
  33. :param calendar_name: Name of the calendar to find
  34. :return: Calendar ID if found, otherwise None.
  35. """
  36. res = self.get(f"/{self.username}/Calendar/calendarslist")
  37. try:
  38. for calendar in res.json()["calendars"]:
  39. if calendar['name'] == calendar_name:
  40. return calendar['id']
  41. except ValueError:
  42. return None
  43. return None
  44. def getCalendar(self):
  45. """
  46. Get calendar list.
  47. :return: Response from SOGo API.
  48. """
  49. res = self.get(f"/{self.username}/Calendar/calendarslist")
  50. try:
  51. return res.json()
  52. except ValueError:
  53. return res.text
  54. def deleteCalendar(self, calendar_id):
  55. """
  56. Delete a calendar.
  57. :param calendar_id: ID of the calendar to be deleted
  58. :return: Response from SOGo API.
  59. """
  60. res = self.get(f"/{self.username}/Calendar/{calendar_id}/delete")
  61. return res.status_code == 204
  62. def importCalendar(self, calendar_name, ics_file):
  63. """
  64. Import a calendar from an ICS file.
  65. :param calendar_name: Name of the calendar to import into
  66. :param ics_file: Path to the ICS file to import
  67. :return: Response from SOGo API.
  68. """
  69. try:
  70. with open(ics_file, "rb") as f:
  71. pass
  72. except Exception as e:
  73. print(f"Could not open ICS file '{ics_file}': {e}")
  74. return {"status": "error", "message": str(e)}
  75. new_calendar = self.addCalendar(calendar_name)
  76. selected_calendar = new_calendar.json()["id"]
  77. url = f"{self.baseUrl}{self.apiUrl}/{self.username}/Calendar/{selected_calendar}/import"
  78. auth = (self.username, self.password)
  79. with open(ics_file, "rb") as f:
  80. files = {'icsFile': (ics_file, f, 'text/calendar')}
  81. res = requests.post(
  82. url,
  83. files=files,
  84. auth=auth,
  85. verify=not self.ignore_ssl_errors
  86. )
  87. try:
  88. return res.json()
  89. except ValueError:
  90. return res.text
  91. return None
  92. def setCalendarACL(self, calendar_id, sharee_email, acl="r", subscribe=False):
  93. """
  94. Set CalDAV calendar permissions for a user (sharee).
  95. :param calendar_id: ID of the calendar to share
  96. :param sharee_email: Email of the user to share with
  97. :param acl: "w" for write, "r" for read-only or combination "rw" for read-write
  98. :param subscribe: True will scubscribe the sharee to the calendar
  99. :return: None
  100. """
  101. # Access rights
  102. if acl == "" or len(acl) > 2:
  103. return "Invalid acl level specified. Use 'w', 'r' or combinations like 'rw'."
  104. rights = [{
  105. "c_email": sharee_email,
  106. "uid": sharee_email,
  107. "userClass": "normal-user",
  108. "rights": {
  109. "Public": "None",
  110. "Private": "None",
  111. "Confidential": "None",
  112. "canCreateObjects": 0,
  113. "canEraseObjects": 0
  114. }
  115. }]
  116. if "w" in acl:
  117. rights[0]["rights"]["canCreateObjects"] = 1
  118. rights[0]["rights"]["canEraseObjects"] = 1
  119. if "r" in acl:
  120. rights[0]["rights"]["Public"] = "Viewer"
  121. rights[0]["rights"]["Private"] = "Viewer"
  122. rights[0]["rights"]["Confidential"] = "Viewer"
  123. r_add = self.get(f"/{self.username}/Calendar/{calendar_id}/addUserInAcls?uid={sharee_email}")
  124. if r_add.status_code < 200 or r_add.status_code > 299:
  125. try:
  126. return r_add.json()
  127. except ValueError:
  128. return r_add.text
  129. r_save = self.post(f"/{self.username}/Calendar/{calendar_id}/saveUserRights", rights)
  130. if r_save.status_code < 200 or r_save.status_code > 299:
  131. try:
  132. return r_save.json()
  133. except ValueError:
  134. return r_save.text
  135. if subscribe:
  136. r_subscribe = self.get(f"/{self.username}/Calendar/{calendar_id}/subscribeUsers?uids={sharee_email}")
  137. if r_subscribe.status_code < 200 or r_subscribe.status_code > 299:
  138. try:
  139. return r_subscribe.json()
  140. except ValueError:
  141. return r_subscribe.text
  142. return r_save.status_code == 200
  143. def getCalendarACL(self, calendar_id):
  144. """
  145. Get CalDAV calendar permissions for a user (sharee).
  146. :param calendar_id: ID of the calendar to get ACL from
  147. :return: Response from SOGo API.
  148. """
  149. res = self.get(f"/{self.username}/Calendar/{calendar_id}/acls")
  150. try:
  151. return res.json()
  152. except ValueError:
  153. return res.text
  154. def deleteCalendarACL(self, calendar_id, sharee_email):
  155. """
  156. Delete a calendar ACL for a user (sharee).
  157. :param calendar_id: ID of the calendar to delete ACL from
  158. :param sharee_email: Email of the user whose ACL to delete
  159. :return: Response from SOGo API.
  160. """
  161. res = self.get(f"/{self.username}/Calendar/{calendar_id}/removeUserFromAcls?uid={sharee_email}")
  162. return res.status_code == 204
  163. def addAddressbook(self, addressbook_name):
  164. """
  165. Add a new addressbook to the sogo instance.
  166. :param addressbook_name: Name of the addressbook to be created
  167. :return: Response from the sogo API.
  168. """
  169. res = self.post(f"/{self.username}/Contacts/createFolder", {
  170. "name": addressbook_name
  171. })
  172. try:
  173. return res.json()
  174. except ValueError:
  175. return res.text
  176. def getAddressbookIdByName(self, addressbook_name):
  177. """
  178. Get the addressbook ID by its name.
  179. :param addressbook_name: Name of the addressbook to find
  180. :return: Addressbook ID if found, otherwise None.
  181. """
  182. res = self.get(f"/{self.username}/Contacts/addressbooksList")
  183. try:
  184. for addressbook in res.json()["addressbooks"]:
  185. if addressbook['name'] == addressbook_name:
  186. return addressbook['id']
  187. except ValueError:
  188. return None
  189. return None
  190. def deleteAddressbook(self, addressbook_id):
  191. """
  192. Delete an addressbook.
  193. :param addressbook_id: ID of the addressbook to be deleted
  194. :return: Response from SOGo API.
  195. """
  196. res = self.get(f"/{self.username}/Contacts/{addressbook_id}/delete")
  197. return res.status_code == 204
  198. def getAddressbookList(self):
  199. """
  200. Get addressbook list.
  201. :return: Response from SOGo API.
  202. """
  203. res = self.get(f"/{self.username}/Contacts/addressbooksList")
  204. try:
  205. return res.json()
  206. except ValueError:
  207. return res.text
  208. def setAddressbookACL(self, addressbook_id, sharee_email, acl="r", subscribe=False):
  209. """
  210. Set CalDAV addressbook permissions for a user (sharee).
  211. :param addressbook_id: ID of the addressbook to share
  212. :param sharee_email: Email of the user to share with
  213. :param acl: "w" for write, "r" for read-only or combination "rw" for read-write
  214. :param subscribe: True will subscribe the sharee to the addressbook
  215. :return: None
  216. """
  217. # Access rights
  218. if acl == "" or len(acl) > 2:
  219. print("Invalid acl level specified. Use 's', 'w', 'r' or combinations like 'rws'.")
  220. return "Invalid acl level specified. Use 'w', 'r' or combinations like 'rw'."
  221. rights = [{
  222. "c_email": sharee_email,
  223. "uid": sharee_email,
  224. "userClass": "normal-user",
  225. "rights": {
  226. "canCreateObjects": 0,
  227. "canEditObjects": 0,
  228. "canEraseObjects": 0,
  229. "canViewObjects": 0,
  230. }
  231. }]
  232. if "w" in acl:
  233. rights[0]["rights"]["canCreateObjects"] = 1
  234. rights[0]["rights"]["canEditObjects"] = 1
  235. rights[0]["rights"]["canEraseObjects"] = 1
  236. if "r" in acl:
  237. rights[0]["rights"]["canViewObjects"] = 1
  238. r_add = self.get(f"/{self.username}/Contacts/{addressbook_id}/addUserInAcls?uid={sharee_email}")
  239. if r_add.status_code < 200 or r_add.status_code > 299:
  240. try:
  241. return r_add.json()
  242. except ValueError:
  243. return r_add.text
  244. r_save = self.post(f"/{self.username}/Contacts/{addressbook_id}/saveUserRights", rights)
  245. if r_save.status_code < 200 or r_save.status_code > 299:
  246. try:
  247. return r_save.json()
  248. except ValueError:
  249. return r_save.text
  250. if subscribe:
  251. r_subscribe = self.get(f"/{self.username}/Contacts/{addressbook_id}/subscribeUsers?uids={sharee_email}")
  252. if r_subscribe.status_code < 200 or r_subscribe.status_code > 299:
  253. try:
  254. return r_subscribe.json()
  255. except ValueError:
  256. return r_subscribe.text
  257. return r_save.status_code == 200
  258. def getAddressbookACL(self, addressbook_id):
  259. """
  260. Get CalDAV addressbook permissions for a user (sharee).
  261. :param addressbook_id: ID of the addressbook to get ACL from
  262. :return: Response from SOGo API.
  263. """
  264. res = self.get(f"/{self.username}/Contacts/{addressbook_id}/acls")
  265. try:
  266. return res.json()
  267. except ValueError:
  268. return res.text
  269. def deleteAddressbookACL(self, addressbook_id, sharee_email):
  270. """
  271. Delete an addressbook ACL for a user (sharee).
  272. :param addressbook_id: ID of the addressbook to delete ACL from
  273. :param sharee_email: Email of the user whose ACL to delete
  274. :return: Response from SOGo API.
  275. """
  276. res = self.get(f"/{self.username}/Contacts/{addressbook_id}/removeUserFromAcls?uid={sharee_email}")
  277. return res.status_code == 204
  278. def getAddressbookNewGuid(self, addressbook_id):
  279. """
  280. Request a new GUID for a SOGo addressbook.
  281. :param addressbook_id: ID of the addressbook
  282. :return: JSON response from SOGo or None if not found
  283. """
  284. res = self.get(f"/{self.username}/Contacts/{addressbook_id}/newguid")
  285. try:
  286. return res.json()
  287. except ValueError:
  288. return res.text
  289. def addAddressbookContact(self, addressbook_id, contact_name, contact_email):
  290. """
  291. Save a vCard as a contact in the specified addressbook.
  292. :param addressbook_id: ID of the addressbook
  293. :param contact_name: Name of the contact
  294. :param contact_email: Email of the contact
  295. :return: JSON response from SOGo or None if not found
  296. """
  297. vcard_id = self.getAddressbookNewGuid(addressbook_id)
  298. contact_data = {
  299. "id": vcard_id["id"],
  300. "pid": vcard_id["pid"],
  301. "c_cn": contact_name,
  302. "emails": [{
  303. "type": "pref",
  304. "value": contact_email
  305. }],
  306. "isNew": True,
  307. "c_component": "vcard",
  308. }
  309. endpoint = f"/{self.username}/Contacts/{addressbook_id}/{vcard_id['id']}/saveAsContact"
  310. res = self.post(endpoint, contact_data)
  311. try:
  312. return res.json()
  313. except ValueError:
  314. return res.text
  315. def getAddressbookContacts(self, addressbook_id, contact_email=None):
  316. """
  317. Get all contacts from the specified addressbook.
  318. :param addressbook_id: ID of the addressbook
  319. :return: JSON response with contacts or None if not found
  320. """
  321. res = self.get(f"/{self.username}/Contacts/{addressbook_id}/view")
  322. try:
  323. res_json = res.json()
  324. headers = res_json.get("headers", [])
  325. if not headers or len(headers) < 2:
  326. return []
  327. field_names = headers[0]
  328. contacts = []
  329. for row in headers[1:]:
  330. contact = dict(zip(field_names, row))
  331. contacts.append(contact)
  332. if contact_email:
  333. contact = {}
  334. for c in contacts:
  335. if c["c_mail"] == contact_email or c["c_cn"] == contact_email:
  336. contact = c
  337. break
  338. return contact
  339. return contacts
  340. except ValueError:
  341. return res.text
  342. def addAddressbookContactList(self, addressbook_id, contact_name, contact_email=None):
  343. """
  344. Add a new contact list to the addressbook.
  345. :param addressbook_id: ID of the addressbook
  346. :param contact_name: Name of the contact list
  347. :param contact_email: Comma-separated emails to include in the list
  348. :return: Response from SOGo API.
  349. """
  350. gal_domain = self.username.split("@")[-1]
  351. vlist_id = self.getAddressbookNewGuid(addressbook_id)
  352. contact_emails = contact_email.split(",") if contact_email else []
  353. contacts = self.getAddressbookContacts(addressbook_id)
  354. refs = []
  355. for contact in contacts:
  356. if contact['c_mail'] in contact_emails:
  357. refs.append({
  358. "refs": [],
  359. "categories": [],
  360. "c_screenname": contact.get("c_screenname", ""),
  361. "pid": contact.get("pid", vlist_id["pid"]),
  362. "id": contact.get("id", ""),
  363. "notes": [""],
  364. "empty": " ",
  365. "hasphoto": contact.get("hasphoto", 0),
  366. "c_cn": contact.get("c_cn", ""),
  367. "c_uid": contact.get("c_uid", None),
  368. "containername": contact.get("containername", f"GAL {gal_domain}"), # or your addressbook name
  369. "sourceid": contact.get("sourceid", gal_domain),
  370. "c_component": contact.get("c_component", "vcard"),
  371. "c_sn": contact.get("c_sn", ""),
  372. "c_givenname": contact.get("c_givenname", ""),
  373. "c_name": contact.get("c_name", contact.get("id", "")),
  374. "c_telephonenumber": contact.get("c_telephonenumber", ""),
  375. "fn": contact.get("fn", ""),
  376. "c_mail": contact.get("c_mail", ""),
  377. "emails": contact.get("emails", []),
  378. "c_o": contact.get("c_o", ""),
  379. "reference": contact.get("id", ""),
  380. "birthday": contact.get("birthday", "")
  381. })
  382. contact_data = {
  383. "refs": refs,
  384. "categories": [],
  385. "c_screenname": None,
  386. "pid": vlist_id["pid"],
  387. "c_component": "vlist",
  388. "notes": [""],
  389. "empty": " ",
  390. "isNew": True,
  391. "id": vlist_id["id"],
  392. "c_cn": contact_name,
  393. "birthday": ""
  394. }
  395. endpoint = f"/{self.username}/Contacts/{addressbook_id}/{vlist_id['id']}/saveAsList"
  396. res = self.post(endpoint, contact_data)
  397. try:
  398. return res.json()
  399. except ValueError:
  400. return res.text
  401. def deleteAddressbookItem(self, addressbook_id, contact_name):
  402. """
  403. Delete an addressbook item by its ID.
  404. :param addressbook_id: ID of the addressbook item to delete
  405. :param contact_name: Name of the contact to delete
  406. :return: Response from SOGo API.
  407. """
  408. res = self.getAddressbookContacts(addressbook_id, contact_name)
  409. if "id" not in res:
  410. print(f"Contact '{contact_name}' not found in addressbook '{addressbook_id}'.")
  411. return None
  412. res = self.post(f"/{self.username}/Contacts/{addressbook_id}/batchDelete", {
  413. "uids": [res["id"]],
  414. })
  415. return res.status_code == 204
  416. def get(self, endpoint, params=None):
  417. """
  418. Make a GET request to the mailcow API.
  419. :param endpoint: The API endpoint to get.
  420. :param params: Optional parameters for the GET request.
  421. :return: Response from the mailcow API.
  422. """
  423. url = f"{self.baseUrl}{self.apiUrl}{endpoint}"
  424. auth = (self.username, self.password)
  425. headers = {"Host": self.host}
  426. response = requests.get(
  427. url,
  428. params=params,
  429. auth=auth,
  430. headers=headers,
  431. verify=not self.ignore_ssl_errors
  432. )
  433. return response
  434. def post(self, endpoint, data):
  435. """
  436. Make a POST request to the mailcow API.
  437. :param endpoint: The API endpoint to post to.
  438. :param data: Data to be sent in the POST request.
  439. :return: Response from the mailcow API.
  440. """
  441. url = f"{self.baseUrl}{self.apiUrl}{endpoint}"
  442. auth = (self.username, self.password)
  443. headers = {"Host": self.host}
  444. response = requests.post(
  445. url,
  446. json=data,
  447. auth=auth,
  448. headers=headers,
  449. verify=not self.ignore_ssl_errors
  450. )
  451. return response