|
@@ -1,6 +1,5 @@
|
|
|
from configparser import RawConfigParser
|
|
|
from binascii import hexlify
|
|
|
-import fcntl
|
|
|
import os
|
|
|
import re
|
|
|
import shutil
|
|
@@ -8,7 +7,7 @@ import struct
|
|
|
from zlib import crc32
|
|
|
|
|
|
from .hashindex import NSIndex
|
|
|
-from .helpers import IntegrityError, read_msgpack, write_msgpack, unhexlify
|
|
|
+from .helpers import IntegrityError, read_msgpack, write_msgpack, unhexlify, UpgradableLock
|
|
|
from .lrucache import LRUCache
|
|
|
|
|
|
MAX_OBJECT_SIZE = 20 * 1024 * 1024
|
|
@@ -39,7 +38,7 @@ class Repository(object):
|
|
|
|
|
|
def __init__(self, path, create=False):
|
|
|
self.io = None
|
|
|
- self.lock_fd = None
|
|
|
+ self.lock = None
|
|
|
if create:
|
|
|
self.create(path)
|
|
|
self.open(path)
|
|
@@ -71,8 +70,7 @@ class Repository(object):
|
|
|
self.path = path
|
|
|
if not os.path.isdir(path):
|
|
|
raise self.DoesNotExist(path)
|
|
|
- self.lock_fd = open(os.path.join(path, 'config'), 'r')
|
|
|
- fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
|
|
|
+ self.lock = UpgradableLock(os.path.join(path, 'config'))
|
|
|
self.config = RawConfigParser()
|
|
|
self.config.read(os.path.join(self.path, 'config'))
|
|
|
if self.config.getint('repository', 'version') != 1:
|
|
@@ -83,10 +81,10 @@ class Repository(object):
|
|
|
self.rollback()
|
|
|
|
|
|
def close(self):
|
|
|
- if self.lock_fd:
|
|
|
+ if self.lock:
|
|
|
self.rollback()
|
|
|
- self.lock_fd.close()
|
|
|
- self.lock_fd = None
|
|
|
+ self.lock.release()
|
|
|
+ self.lock = None
|
|
|
|
|
|
def commit(self, rollback=True):
|
|
|
"""Commit transaction
|
|
@@ -202,6 +200,7 @@ class Repository(object):
|
|
|
self.io.close()
|
|
|
self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
|
|
|
if self.io.head is not None and not os.path.exists(os.path.join(self.path, 'index.%d' % self.io.head)):
|
|
|
+ self.lock.upgrade()
|
|
|
self.recover(self.path)
|
|
|
self.open_index(self.io.head, read_only=True)
|
|
|
|
|
@@ -222,6 +221,7 @@ class Repository(object):
|
|
|
def put(self, id, data, wait=True):
|
|
|
if not self._active_txn:
|
|
|
self._active_txn = True
|
|
|
+ self.lock.upgrade()
|
|
|
self.open_index(self.io.head)
|
|
|
try:
|
|
|
segment, _ = self.index[id]
|
|
@@ -240,6 +240,7 @@ class Repository(object):
|
|
|
def delete(self, id, wait=True):
|
|
|
if not self._active_txn:
|
|
|
self._active_txn = True
|
|
|
+ self.lock.upgrade()
|
|
|
self.open_index(self.io.head)
|
|
|
try:
|
|
|
segment, offset = self.index.pop(id)
|