Source code for sirepo.file_lock
"""file locking
:copyright: Copyright (c) 2023 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp
import asyncio
import fcntl
import os
import pykern.pkconfig
import pykern.pkio
import time
_LOOP_SLEEP = None
_LOOP_COUNT = None
class _Base:
"""Lock a file for global mutex
Args:
path (py.path.local): base name for lock
"""
def __init__(self, path):
self._lock = None
p = pykern.pkio.py_path(path)
if p.check(dir=True):
p = p.join("lock")
else:
p += ".lock"
self._path = str(p)
def _enter(self):
for _ in range(_LOOP_COUNT):
try:
f = None
f = os.open(self._path, os.O_RDWR | os.O_CREAT | os.O_TRUNC)
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
if self._verify_lock_path(f):
self._lock = f
return
except (IOError, OSError, FileNotFoundError):
pass
self._unchecked_close(f)
# Not asyncio.sleep: not in coroutine, probably should to be (simulation_db.user_lock)
yield
raise RuntimeError(f"fail to flock path={self._path} timeout={_cfg.timeout}")
def _exit(self, *args, **kwargs):
if self._lock:
os.unlink(self._path)
os.close(self._lock)
self._lock = None
return False
def _unchecked_close(self, handle):
"""Close lock path ignoring errors
We want to ensure the loop continues, and there's
nothing we can do at this point so close ignoring
exceptions.
Args:
handle (IO): possibly opened file
"""
if not handle:
return
try:
os.close(handle)
except Exception:
pass
def _verify_lock_path(self, handle):
"""Verify open file and path on disk are same file
There's a race condition between the open and the flock.
Two processes might not lock the same file. It's a
complicated race condition that requires process A to
speed through all of `__enter__` and `__exit__` during the
time that process B opens the file before it opens the
lock. Since our locking is for very small operations
(typically), this could happen. See `<https://stackoverflow.com/a/18745264>`_
Args:
handle (IO): handle to open, locked file
Return:
bool: True if `handle` and `self._path` are same inode
"""
return os.stat(handle).st_ino == os.stat(self._path).st_ino
[docs]
class AsyncFileLock(_Base):
"""Lock a file for global mutex
Args:
path (py.path.local): base name for lock
qcall (sirepo.quest.API): to ensure not re-entrant
"""
def __init__(self, path, qcall):
super().__init__(path)
# Do here so simpler code below
if qcall.bucket_unchecked_get(self._path):
raise AssertionError(f"attempt to relock path={self._path}")
qcall.bucket_set(self._path, True)
async def __aenter__(self):
for _ in self._enter():
await asyncio.sleep(_LOOP_SLEEP)
async def __aexit__(self, *args, **kwargs):
return self._exit(*args, **kwargs)
[docs]
class FileLock(_Base):
def __enter__(self):
for _ in self._enter():
time.sleep(_LOOP_SLEEP)
def __exit__(self, *args, **kwargs):
return self._exit(*args, **kwargs)
def _init():
global _cfg, _LOOP_SLEEP, _LOOP_COUNT
_cfg = pykern.pkconfig.init(
timeout=(60, pykern.pkconfig.parse_seconds, "how long to wait on flock"),
)
ms = 50
_LOOP_COUNT = _cfg.timeout * (1000 // ms)
_LOOP_SLEEP = ms / 1000.0
_init()