Git Product home page Git Product logo

Comments (2)

gentlegiantJGC avatar gentlegiantJGC commented on June 13, 2024

I still need to work out how this will tie into the history system but this is the implementation so far.
It adds a lock that can be acquired by multiple threads that all use the same token.

from __future__ import annotations

from functools import cached_property
from weakref import proxy, WeakValueDictionary
from threading import Condition, RLock, Lock
from contextlib import contextmanager
from collections import deque
from copy import deepcopy
from contextvars import ContextVar
from typing import Optional
from uuid import uuid4


# Should level objects be editable by default.
DefaultEditMode = True
# Can edit operations run in parallel.
ParallelEditing = False
# Is the history system enabled.
HistoryEnabled = True


class Chunk:
    pass


class LockNotAcquired(RuntimeError):
    pass


class TokenLock:
    """
    A custom lock that allows all threads with the same token to run in parallel.
    This is useful to support serial operations that can have parallel threads.
    """

    def __init__(self):
        self._lock = Lock()
        self._condition = Condition(self._lock)
        self._token: Optional[str] = None
        self._count: int = 0

    def _acquire_shared(self, token: str):
        with self._lock:
            while not (self._token is None or self._token == token):
                # Wait until the lock is not locked or is locked with this token.
                self._condition.wait()

            self._token = token
            self._count += 1

    def _release_shared(self, token: str):
        with self._lock:
            if self._token == token:
                self._count -= 1
                if self._count <= 0:
                    self._token = None
                    self._count = 0
            if not self._token:
                # Wake up any threads waiting to acquire the lock
                self._condition.notify_all()

    @contextmanager
    def lock(self, token: str = None):
        """
        Acquire the lock with a context manager.
        If the lock is not locked it will be locked with the given token.
        If the lock is locked with the same token the thread will continue.
        If the lock is locked with a different token this will block until the lock can be acquired with the given token.

        :param token: The token to use. Defaults to a random UUID.
        """
        if token is None:
            token = str(uuid4())
        self._acquire_shared(token)
        try:
            yield
        finally:
            self._release_shared(token)


class PrivateLevel:
    """Storage of private level data"""
    def __init__(self):
        self.editable_var = ContextVar("edit_mode", default=DefaultEditMode)
        self.edit_lock = TokenLock()

    @property
    def editable(self) -> bool:
        return self.editable_var.get()


class ChunkStorage:
    def __init__(self, level: PrivateLevel):
        # Weak pointer to the level to get raw and shared data
        self._level: PrivateLevel = proxy(level)
        # Mapping from chunk location to chunk object. Weakly stored so that we don't need to manually unload.
        self._chunks = WeakValueDictionary[tuple[str, int, int], Chunk]()
        # A deque to keep recently/frequently used chunks loaded
        self._chunk_cache = deque[Chunk](maxlen=100)
        # A lock per chunk
        self._locks = WeakValueDictionary[tuple[str, int, int], RLock]()
        # A lock that must be acquired before touching _locks
        self._locks_lock = Lock()

    def __get_lock(self, key: tuple[str, int, int]) -> RLock:
        with self._locks_lock:
            lock = self._locks.get(key)
            if lock is None:
                lock = self._locks[key] = RLock()
        return lock

    @contextmanager
    def lock(self, dimension: str, cx: int, cz: int, *, blocking: bool = True, timeout: float = -1):
        """
        Lock access to the chunk.

        >>> with level.chunk.lock(dimension, cx, cz):
        >>>     # Do what you need to with the chunk
        >>>     # No other threads are able to edit or set the chunk while in this with block.

        If you want to lock, get and set the chunk data :meth:`edit` is probably a better fit.

        :param dimension: The dimension the chunk is stored in.
        :param cx: The chunk x coordinate.
        :param cz: The chunk z coordinate.
        :param blocking: Should this block until the lock is acquired.
        :param timeout: The amount of time to wait for the lock.
        :raises:
            LockNotAcquired: If the lock was not acquired.
        """
        key = (dimension, cx, cz)
        lock = self.__get_lock(key)
        if not lock.acquire(blocking, timeout):
            # Thread was not acquired
            raise LockNotAcquired("Lock was not acquired.")
        try:
            yield
        finally:
            lock.release()

    @contextmanager
    def edit(self, dimension: str, cx: int, cz: int, blocking: bool = True, timeout: float = -1):
        """
        Lock and edit a chunk.

        >>> with level.chunk.edit(dimension, cx, cz) as chunk:
        >>>     # Edit the chunk data
        >>>     # No other threads are able to edit the chunk while in this with block.
        >>>     # When the with block exits the edited chunk will be automatically set if no exception occurred.
        """
        if not self._level.editable:
            raise RuntimeError("The level is not editable in this context.")
        with self.lock(dimension, cx, cz, blocking=blocking, timeout=timeout):
            chunk = self.get(dimension, cx, cz)
            yield chunk
            # If an exception occurs in user code, this line won't be run.
            self.set(dimension, cx, cz, chunk)

    def get(self, dimension: str, cx: int, cz: int) -> Chunk:
        """
        Get a deep copy of the chunk data.
        If you want to edit the chunk, use :meth:`edit` instead.

        :param dimension: The dimension the chunk is stored in.
        :param cx: The chunk x coordinate.
        :param cz: The chunk z coordinate.
        :return: A unique copy of the chunk data.
        """
        return Chunk()

    def set(self, dimension: str, cx: int, cz: int, chunk: Chunk):
        """
        Overwrite the chunk data.
        You must lock access to the chunk before setting it otherwise an exception may be raised.
        If you want to edit the chunk, use :meth:`edit` instead.

        :param dimension: The dimension the chunk is stored in.
        :param cx: The chunk x coordinate.
        :param cz: The chunk z coordinate.
        :param chunk: The chunk data to set.
        :raises:
            LockNotAcquired: If the chunk is already locked by another thread.
        """
        if not self._level.editable:
            raise RuntimeError("The level is not editable in this context.")
        key = (dimension, cx, cz)
        lock = self.__get_lock(key)
        if lock.acquire(False):
            try:
                chunk = deepcopy(chunk)
                # TODO set the chunk and notify listeners
            finally:
                lock.release()
        else:
            raise LockNotAcquired("Cannot set a chunk if it is locked by another thread.")

    def on_change(self, callback):
        """A notification system for chunk changes."""
        raise NotImplementedError


class Level:
    def __init__(self):
        self._data = PrivateLevel()

    @property
    def editable(self) -> bool:
        """Is the level editable in this context."""
        return self._data.editable

    @contextmanager
    def edit(self, transaction_token: str = None):
        """
        Make the level editable in this context.

        Depending on the configuration this may block until other operations have completed.

        >>> with level.edit():
        >>>     # edit the level
        >>> # Level is no longer editable

        :param transaction_token: Optional UUID string. If an operation uses threads, using the same transaction_token will allow them to run in parallel.
        """
        if ParallelEditing:
            # Operations are configured to allow running in parallel.
            # The token is not used in this context.
            transaction_token = ""
        with self._data.edit_lock.lock(transaction_token):
            token = self._data.editable_var.set(True)
            try:
                yield
            finally:
                self._data.editable_var.reset(token)

    @cached_property
    def chunk(self) -> ChunkStorage:
        return ChunkStorage(self._data)

from amulet-core.

gentlegiantJGC avatar gentlegiantJGC commented on June 13, 2024

@Podshot raised the issue of deadlocking if two or more threads try to acquire locks that the other thread has already acquired.

The only solution I can see is to implement a custom lock that checks for the deadlock condition and raises an exception.
This isn't exactly pretty but it is the best solution I can see.
We should also implement an edit_exclusive context manager in the level so that if an operation needs to do more complex locking it can ensure it is the only operation running.

Here is my current implementation of a deadlock blocking RLock.

from __future__ import annotations
from threading import Thread, RLock, Lock, Condition, get_ident
import time
from typing import Optional
from contextlib import contextmanager


_lock = Lock()
# Map from thread to the locks it has acquired and is waiting on.
_threads: dict[int, ThreadState] = {}


class DeadlockError(Exception):
    pass


class ThreadState:
    locked: set[SafeRLock]  # The locks this thread has acquired
    waiting: Optional[SafeRLock]  # The lock this thread is waiting for. Might be None.

    def __init__(self):
        self.locked = set()
        self.waiting = None


class SafeRLock:
    """RLock that raises a DeadlockError when acquiring if it would deadlock."""
    _lock: RLock
    _condition: Condition
    _pending: set[int]  # The threads waiting for this lock
    _owner: Optional[int]  # The thread that owns this lock
    _lock_count: int  # How many times the lock has been acquired by its owner

    def __init__(self):
        self._lock = RLock()
        self._condition = Condition(_lock)
        self._owner = None
        self._pending = set()
        self._lock_count = 0

    def _would_deadlock(self) -> bool:
        """If this lock is acquired would it be a deadlock state"""
        this_thread_id = get_ident()

        owner_thread_id = self._owner
        if owner_thread_id == this_thread_id:
            # This lock is locked by this thread. Not a deadlock.
            return False

        lock = self
        while True:
            # Find the thread that owns the lock
            owner_thread_id = lock._owner
            if owner_thread_id is None:
                # This lock is not locked. Not a deadlock.
                return False
            elif owner_thread_id == this_thread_id:
                return True

            # See which lock it is waiting for
            lock = _threads[owner_thread_id].waiting
            if lock is None:
                # The thread is not waiting for a lock. Not a deadlock
                return False

    def acquire(self, blocking=True, timeout=-1):
        with _lock:

            # Try and acquire the lock without blocking
            if self._lock.acquire(blocking=False):
                # If this succeeds then the lock is not in use.
                ident = get_ident()
                self._owner = ident
                self._lock_count += 1
                _threads.setdefault(ident, ThreadState()).locked.add(self)
                return True

            if not blocking:
                # We already tried to acquire without blocking and failed
                return False

            if self._would_deadlock():
                raise DeadlockError("Acquiring this lock would lead to a deadlock.")

            # Wait for the lock
            ident = get_ident()
            self._pending.add(ident)
            thread_state = _threads.setdefault(ident, ThreadState())
            thread_state.waiting = self

            if self._condition.wait(timeout):
                if not self._lock.acquire(blocking=False):
                    # Only one thread should be resumed so this shouldn't happen
                    raise RuntimeError
                # Remove pending state
                self._pending.remove(ident)
                thread_state.waiting = None
                # Add locked state
                self._owner = ident
                self._lock_count += 1
                thread_state.locked.add(self)
                return True
            else:
                # Timed out
                return False

    def release(self):
        with _lock:
            if self._owner != get_ident():
                raise RuntimeError("Lock not owned by thread that tried releasing it.")

            ident = get_ident()
            lock_state = _threads[ident]
            # Remove the locked state
            lock_state.locked.remove(self)
            if not lock_state.locked and lock_state.waiting is None:
                del _threads[ident]

            self._lock_count -= 1
            if self._lock_count < 0:
                raise RuntimeError
            if self._lock_count == 0:
                self._owner = None
            self._lock.release()

            self._condition.notify()

    @contextmanager
    def lock(self, blocking=True, timeout=-1):
        locked = self.acquire(blocking, timeout)
        try:
            yield locked
        finally:
            if locked:
                self.release()


import logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(threadName)s - %(message)s")

l1 = SafeRLock()
l2 = SafeRLock()
l3 = SafeRLock()


def op1():
    logging.info("op1 getting l1")
    with l1.lock():
        logging.info("op1 got l1")
        time.sleep(1)
        logging.info("op1 getting l2")
        with l2.lock():
            logging.info("op1 getting l2")
            time.sleep(1)


def op2():
    logging.info("op2 getting l2")
    with l2.lock():
        logging.info("op2 got l2")
        time.sleep(1)
        logging.info("op2 getting l3")
        with l3.lock():
            logging.info("op2 got l3")
            time.sleep(1)


def op3():
    logging.info("op3 getting l3")
    with l3.lock():
        logging.info("op3 got l3")
        time.sleep(1)
        logging.info("op3 getting l1")
        with l1.lock():
            logging.info("op3 got l1")
            time.sleep(1)


def main():
    t1 = Thread(target=op1)
    t2 = Thread(target=op2)
    t3 = Thread(target=op3)
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()
    logging.info("finished")


if __name__ == '__main__':
    main()

from amulet-core.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.