// D import file generated from 'src/core/sync/condition.d' module core.sync.condition; public import core.sync.exception; public import core.sync.mutex; public import core.time; version (Windows) { private import core.sync.semaphore; private import core.sys.windows.basetsd; private import core.sys.windows.winbase; private import core.sys.windows.windef; private import core.sys.windows.winerror; } else { version (Posix) { private import core.sync.config; private import core.stdc.errno; private import core.sys.posix.pthread; private import core.sys.posix.time; } else { static assert(false, "Platform not supported"); } } class Condition { nothrow @safe this(Mutex m) { this(m, true); } shared nothrow @safe this(shared Mutex m) { this(m, true); } private nothrow @trusted this(this Q, M)(M m, bool _unused_) if (is(Q == Condition) && is(M == Mutex) || is(Q == shared(Condition)) && is(M == shared(Mutex))) { version (Windows) { static if (is(Q == Condition)) { alias HANDLE_TYPE = void*; } else { alias HANDLE_TYPE = shared(void*); } m_blockLock = cast(HANDLE_TYPE)CreateSemaphoreA(null, 1, 1, null); if (m_blockLock == m_blockLock.init) throw new SyncError("Unable to initialize condition"); scope(failure) CloseHandle(cast(void*)m_blockLock); m_blockQueue = cast(HANDLE_TYPE)CreateSemaphoreA(null, 0, (int).max, null); if (m_blockQueue == m_blockQueue.init) throw new SyncError("Unable to initialize condition"); scope(failure) CloseHandle(cast(void*)m_blockQueue); InitializeCriticalSection(cast(RTL_CRITICAL_SECTION*)&m_unblockLock); m_assocMutex = m; } else { version (Posix) { m_assocMutex = m; static if (is(typeof(pthread_condattr_setclock))) { () @trusted { pthread_condattr_t attr = void; int rc = pthread_condattr_init(&attr); if (rc) throw new SyncError("Unable to initialize condition"); rc = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); if (rc) throw new SyncError("Unable to initialize condition"); rc = pthread_cond_init(cast(pthread_cond_t*)&m_hndl, &attr); if (rc) throw new SyncError("Unable to initialize condition"); rc = pthread_condattr_destroy(&attr); if (rc) throw new SyncError("Unable to initialize condition"); } (); } else { int rc = pthread_cond_init(cast(pthread_cond_t*)&m_hndl, null); if (rc) throw new SyncError("Unable to initialize condition"); } } } } ~this(); @property Mutex mutex(); shared @property shared(Mutex) mutex(); final pure nothrow @nogc @property @safe Mutex mutex_nothrow(); final shared pure nothrow @nogc @property @safe shared(Mutex) mutex_nothrow(); void wait(); shared void wait(); void wait(this Q)(bool _unused_) if (is(Q == Condition) || is(Q == shared(Condition))) { version (Windows) { timedWait(INFINITE); } else { version (Posix) { int rc = pthread_cond_wait(cast(pthread_cond_t*)&m_hndl, (cast(Mutex)m_assocMutex).handleAddr()); if (rc) throw new SyncError("Unable to wait for condition"); } } } bool wait(Duration val); shared bool wait(Duration val); bool wait(this Q)(Duration val, bool _unused_) if (is(Q == Condition) || is(Q == shared(Condition))) in { assert(!val.isNegative); } do { version (Windows) { auto maxWaitMillis = dur!"msecs"((uint).max - 1); while (val > maxWaitMillis) { if (timedWait(cast(uint)maxWaitMillis.total!"msecs")) return true; val -= maxWaitMillis; } return timedWait(cast(uint)val.total!"msecs"); } else { version (Posix) { timespec t = void; mktspec(t, val); int rc = pthread_cond_timedwait(cast(pthread_cond_t*)&m_hndl, (cast(Mutex)m_assocMutex).handleAddr(), &t); if (!rc) return true; if (rc == ETIMEDOUT) return false; throw new SyncError("Unable to wait for condition"); } } } void notify(); shared void notify(); void notify(this Q)(bool _unused_) if (is(Q == Condition) || is(Q == shared(Condition))) { version (Windows) { notify_(false); } else { version (Posix) { int rc; do { rc = pthread_cond_signal(cast(pthread_cond_t*)&m_hndl); } while (rc == EAGAIN); if (rc) throw new SyncError("Unable to notify condition"); } } } void notifyAll(); shared void notifyAll(); void notifyAll(this Q)(bool _unused_) if (is(Q == Condition) || is(Q == shared(Condition))) { version (Windows) { notify_(true); } else { version (Posix) { int rc; do { rc = pthread_cond_broadcast(cast(pthread_cond_t*)&m_hndl); } while (rc == EAGAIN); if (rc) throw new SyncError("Unable to notify condition"); } } } private version (Windows) { bool timedWait(this Q)(DWORD timeout) if (is(Q == Condition) || is(Q == shared(Condition))) { static if (is(Q == Condition)) { auto op(string o, T, V1)(ref T val, V1 mod) { return mixin("val " ~ o ~ "mod"); } } else { auto op(string o, T, V1)(ref shared T val, V1 mod) { import core.atomic : atomicOp; return atomicOp!o(val, mod); } } int numSignalsLeft; int numWaitersGone; DWORD rc; rc = WaitForSingleObject(cast(HANDLE)m_blockLock, INFINITE); assert(rc == WAIT_OBJECT_0); op!"+="(m_numWaitersBlocked, 1); rc = ReleaseSemaphore(cast(HANDLE)m_blockLock, 1, null); assert(rc); m_assocMutex.unlock(); scope(failure) m_assocMutex.lock(); rc = WaitForSingleObject(cast(HANDLE)m_blockQueue, timeout); assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT); bool timedOut = rc == WAIT_TIMEOUT; EnterCriticalSection(&m_unblockLock); scope(failure) LeaveCriticalSection(&m_unblockLock); if ((numSignalsLeft = m_numWaitersToUnblock) != 0) { if (timedOut) { if (m_numWaitersBlocked != 0) { op!"-="(m_numWaitersBlocked, 1); numSignalsLeft = 0; } else { m_numWaitersGone = 1; } } if (op!"-="(m_numWaitersToUnblock, 1) == 0) { if (m_numWaitersBlocked != 0) { rc = ReleaseSemaphore(cast(HANDLE)m_blockLock, 1, null); assert(rc); numSignalsLeft = 0; } else if ((numWaitersGone = m_numWaitersGone) != 0) { m_numWaitersGone = 0; } } } else if (op!"+="(m_numWaitersGone, 1) == (int).max / 2) { rc = WaitForSingleObject(cast(HANDLE)m_blockLock, INFINITE); assert(rc == WAIT_OBJECT_0); op!"-="(m_numWaitersBlocked, m_numWaitersGone); rc = ReleaseSemaphore(cast(HANDLE)m_blockLock, 1, null); assert(rc == WAIT_OBJECT_0); m_numWaitersGone = 0; } LeaveCriticalSection(&m_unblockLock); if (numSignalsLeft == 1) { for (; numWaitersGone > 0; --numWaitersGone) { { rc = WaitForSingleObject(cast(HANDLE)m_blockQueue, INFINITE); assert(rc == WAIT_OBJECT_0); } } rc = ReleaseSemaphore(cast(HANDLE)m_blockLock, 1, null); assert(rc); } else if (numSignalsLeft != 0) { rc = ReleaseSemaphore(cast(HANDLE)m_blockQueue, 1, null); assert(rc); } m_assocMutex.lock(); return !timedOut; } void notify_(this Q)(bool all) if (is(Q == Condition) || is(Q == shared(Condition))) { static if (is(Q == Condition)) { auto op(string o, T, V1)(ref T val, V1 mod) { return mixin("val " ~ o ~ "mod"); } } else { auto op(string o, T, V1)(ref shared T val, V1 mod) { import core.atomic : atomicOp; return atomicOp!o(val, mod); } } DWORD rc; EnterCriticalSection(&m_unblockLock); scope(failure) LeaveCriticalSection(&m_unblockLock); if (m_numWaitersToUnblock != 0) { if (m_numWaitersBlocked == 0) { LeaveCriticalSection(&m_unblockLock); return ; } if (all) { op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked); m_numWaitersBlocked = 0; } else { op!"+="(m_numWaitersToUnblock, 1); op!"-="(m_numWaitersBlocked, 1); } LeaveCriticalSection(&m_unblockLock); } else if (m_numWaitersBlocked > m_numWaitersGone) { rc = WaitForSingleObject(cast(HANDLE)m_blockLock, INFINITE); assert(rc == WAIT_OBJECT_0); if (0 != m_numWaitersGone) { op!"-="(m_numWaitersBlocked, m_numWaitersGone); m_numWaitersGone = 0; } if (all) { m_numWaitersToUnblock = m_numWaitersBlocked; m_numWaitersBlocked = 0; } else { m_numWaitersToUnblock = 1; op!"-="(m_numWaitersBlocked, 1); } LeaveCriticalSection(&m_unblockLock); rc = ReleaseSemaphore(cast(HANDLE)m_blockQueue, 1, null); assert(rc); } else { LeaveCriticalSection(&m_unblockLock); } } HANDLE m_blockLock; HANDLE m_blockQueue; Mutex m_assocMutex; CRITICAL_SECTION m_unblockLock; int m_numWaitersGone = 0; int m_numWaitersBlocked = 0; int m_numWaitersToUnblock = 0; } else { version (Posix) { Mutex m_assocMutex; pthread_cond_t m_hndl; } } }