Mailing List Archive

another lock module
i got tired of worrying about whether or not a system does
flock, of thinking that posix fcntl might not work on all
systems due to the differences in the struct flock, and
certainly not nfs, so i took the file locking locking module
from mailman (locks with link() so it _should_ work on NFS,
i think), added shared vs exclusive locking, and made the
interface something i like.

hope someone else finds it handy, and thanx josh for adding
the timeouts!

file lock.py:
----------------------------------------------------------------------
"""
unix portable file locking, reportedly works even on NFS file
systems.

usage: shared and exclusive locking (read and write locks)

import lock
shl = lock.shlock(<resource name>)
shl.release()

exl = lock.exlock(<resource name>)
exl.release()

try:
shl = lock.shlock(<resource name>, <timeout in seconds>)
except lock.TimeoutError:
# SOL, no lock

try:
exl = lock.exlock(<resource name>, <timeout in seconds>)
except lock.TimeoutError:
# no lock

#
# By default, all locks will be stored in files in
# the /tmp directory. this may not be desirable.
# the user can change the lock directory from the
# default '/tmp' using the setlockdir() function.
#
lock.setlockdir("/tmp/lock")

#
# it is recommended that users of this module
# surround code between obtaining and releasing
# a lock with a try: ... finally:. This will
# help guarantee that an application using this
# module will never deadlock. system crashes may
# cause deadlock, but that is fairly unlikely.
#
shl = lock.shlock(<resource name>)
try:
# do stuff with the lock
finally:
shl.release()

"""
#
# system mods
#
import os
from stat import ST_NLINK
import string
import socket
import time

#
# how frequently to retry obtaining locks in seconds
#
retry_intv = 0.3

#
# the default directory in which locks are
# housed.
#
lockdir = "/tmp"

class TimeoutError(StandardError):
pass

def setlockdir(newlockdir):
global lockdir
lockdir = newlockdir

class _lock:

def __init__(self, resource, lockdir):

self.resource = resource
self.lockpath = os.path.join(lockdir,
"%s.lock" % (self.resource))
self.tmpfname = os.path.join(lockdir,
"%s.lock.%s.%d" % (self.resource,
socket.gethostname(),
os.getpid()))
if not os.path.exists(self.lockpath):
open(self.lockpath, "a+").close()
self.type = "unknown"

def release(self):
if not self.locked():
return
if self.type == "ex" \
or os.stat(self.lockpath)[ST_NLINK] == 2:
open(self.lockpath, "w").close() # remove the info in the lock file
os.unlink(self.tmpfname)

def __del__(self):
self.release()

def locked(self):
return os.path.exists(self.tmpfname)

#
# can raise ValueError or IOError, ValueError is handled further up.
#
def _getlockdata(self):
fp = open(self.lockpath, 'r')
try:
raw = fp.read()
ltype, host, pid = string.split(string.strip(raw))
pid = string.atoi(pid)
finally:
fp.close()
return ltype, host, pid

def _setlockdata(self):
s = "%s %s %d\n" % (self.type, socket.gethostname(), os.getpid())
fp = open(self.lockpath, "w")
try:
fp.write(s)
finally:
fp.close()


class exlock(_lock):

def __init__(self, resource, timeout = 0):
_lock.__init__(self, resource, lockdir)
self.type = "ex"
starttime = time.time()
while 1:
os.link(self.lockpath, self.tmpfname)
if os.stat(self.lockpath)[ST_NLINK] == 2:
# we have the exclusive lock
self._setlockdata()
break
os.unlink(self.tmpfname)
time.sleep(retry_intv)
time_elapsed = time.time() - starttime
if (timeout) and (time_elapsed > timeout):
raise TimeoutError("Timeout limit of %d seconds"
" has been reached" % timeout)


class shlock(_lock):

#
# in order to allow a single process
# to acquire multiple shared locks
# on a single resource, we must supplant
# the pid part of the tmpfname with a counter.
# this information in stored in the dict shared_ct
# keyed by resource name and valued by counter.
# this is not thread safe.
shared_ct = {}

def __init__(self, resource, timeout = 0):
_lock.__init__(self, resource, lockdir)
self.type = "sh"
if not self.shared_ct.has_key(resource):
self.shared_ct[resource] = 1
else:
self.shared_ct[resource] = self.shared_ct[resource] + 1
self.tmpfname = self.tmpfname + "-%d" % (self.shared_ct[resource])
starttime = time.time()
while 1:
os.link(self.lockpath, self.tmpfname)
nlink = os.stat(self.lockpath)[ST_NLINK]
if nlink == 2:
# first shared lock
self._setlockdata()
break
elif nlink > 2:
# is it shared or exclusive?
# we have to read from the lock file
# here, so if a read happens after
# another process has truncated the file,
# it'll raise ValueError, in which case we just
# keep on tryin'
try:
ltype, host, pid = self._getlockdata()
except ValueError:
# read after other lock erased lockinfo from file
ltype = "unknown"
if ltype == "sh":
break
os.unlink(self.tmpfname)
time.sleep(retry_intv)
time_elapsed = time.time() - starttime
if (timeout) and (time_elapsed > timeout):
raise TimeoutError("Timeout limit of %d seconds"
"has been reached" % timeout)