CFx SDK Documentation 2026 SP0
Loading...
Searching...
No Matches
RxThreadPoolService.h
Go to the documentation of this file.
1
2// Copyright (C) 2002-2024, Open Design Alliance (the "Alliance").
3// All rights reserved.
4//
5// This software and its documentation and related materials are owned by
6// the Alliance. The software may only be incorporated into application
7// programs owned by members of the Alliance, subject to a signed
8// Membership Agreement and Supplemental Software License Agreement with the
9// Alliance. The structure and organization of this software are the valuable
10// trade secrets of the Alliance and its suppliers. The software is also
11// protected by copyright law and international treaty provisions. Application
12// programs incorporating this software must include the following statement
13// with their copyright notices:
14//
15// This application incorporates Open Design Alliance software pursuant to a license
16// agreement with Open Design Alliance.
17// Open Design Alliance Copyright (C) 2002-2024 by Open Design Alliance.
18// All rights reserved.
19//
20// By use of this software, its documentation or related materials, you
21// acknowledge and accept the above terms.
23
24#ifndef _ODRXTHREADPOOLSERVICE_INCLUDED_
25#define _ODRXTHREADPOOLSERVICE_INCLUDED_ /* { Secret } **/
26
27#include "TD_PackPush.h"
28
29#include "RxModule.h"
30#include "ThreadsCounter.h"
31#include "OdArray.h"
32#include "StaticRxObject.h"
33
34#include <algorithm>
35
37
38typedef ptrdiff_t OdApcParamType;
39
41
42typedef void (*OdApcEntryPointRxObjParam)( OdRxObject* parameter );
43
49class ODRX_ABSTRACT OdApcThread : public OdRxObject {
50public:
56 virtual void asyncProcCall( OdApcEntryPointVoidParam ep, OdApcParamType parameter ) = 0;
57
63 virtual void asyncProcCall( OdApcEntryPointRxObjParam ep, OdRxObject* parameter ) = 0;
64
69 virtual void wait(bool bNoThrow = false) = 0;
70
74 virtual unsigned int getId() const = 0;
75
79 virtual bool hasException() const = 0;
80
86 virtual void processException(bool bReThrow = true, bool bClear = true) = 0;
87};
88
93
94
100class ODRX_ABSTRACT OdApcAtom : public OdRxObject {
101public:
105 virtual void apcEntryPoint( OdRxObject* /*pMessage*/ ) {}
106
110 virtual void apcEntryPoint( OdApcParamType /*pMessage*/ ) {}
111};
112
117
118
124class OdApcObjectPool : public OdRxObject {
125public:
129 virtual void reserve( OdUInt32 n ) = 0;
130
134 virtual OdRxObjectPtr take() = 0;
135
139 virtual void put( OdRxObject* pObj ) = 0;
140};
141
143
144
145class OdApcObjectPool;
146
152class ODRX_ABSTRACT OdApcQueue : public OdRxObject {
153public:
158
162 virtual void setAtomPoolRef( OdApcObjectPool* pAtomPool ) = 0;
163
167 virtual void addEntryPoint( OdApcAtom* pRecipient, OdRxObject* pMessage = 0 ) = 0;
168
172 virtual void addEntryPoint( OdApcAtom* pRecipient, OdApcParamType pMessage ) = 0;
173
179 virtual void wait() = 0;
180
188 virtual void executeMainThreadAction( MainThreadFunc mtFunc, void *pArg ) = 0;
189
193 virtual int numThreads() const = 0;
194};
195
200
201
211class OdApcDataReadWriteDispatcher : public OdRxObject {
212public:
216 virtual void enter() = 0;
217
221 virtual void leave() = 0;
222
230 virtual void lock() = 0;
231
235 virtual void unlock() = 0;
236
242 virtual void lockFromInside() = 0;
243
247 virtual void unlockFromInside() = 0;
248};
249
254
255
261class ODRX_ABSTRACT OdApcEvent : public OdRxObject {
262public:
266 virtual void set() = 0;
267
271 virtual void reset() = 0;
272
276 virtual void wait() = 0;
277
281 virtual void waitAndReset() = 0;
282};
283
288
289
295class ODRX_ABSTRACT OdApcGateway : public OdRxObject {
296public:
302 virtual void lockByMain( OdUInt32 numThreads ) = 0;
303
307 virtual void waitByMain() = 0;
308
312 virtual void unlockByMain() = 0;
313
315
319 virtual void passBySecondary() = 0;
320};
321
326
332class ODRX_ABSTRACT OdApcLoopedGateway : public OdRxObject {
333public:
334
337 virtual void init( OdUInt32 numThreads ) = 0;
338
342 virtual void passByMain() = 0;
343
349 virtual void waitByMain() = 0;
350
354 virtual void passByMainNoWait() = 0;
355
357
361 virtual void passBySecondary() = 0;
362};
363
368
369
376{
377 kStQueueNoFlags = 0, // Empty StQueue flags.
378
379 kStQueueExecByMain = (1 << 0), // Use main thread for queue tasks execution.
380 kStQueueForceTopLevel = (1 << 1), // Make StQueue top level even if other registered threads already run.
381
383};
384
385
392{
393 kMtQueueNoFlags = 0, // Empty MtQueue flags.
394
395 kMtQueueForceNewThreads = (1 << 0), // Spawn new threads and add them to the pool if there are not enough free threads.
396 kMtQueueAllowExecByMain = (1 << 1), // Allow to use main thread for execution if there are no free threads.
397 kMtQueueForceTopLevel = (1 << 2), // Make MtQueue top level even if other registered threads already run.
398
400};
401
402
408public:
410
414 virtual int numCPUs() const = 0;
415
419 virtual int numPhysicalCores() const = 0;
420
424 virtual int numThreads() const = 0;
425
431 virtual int numFreeThreads() const = 0;
432
441
447 virtual OdApcQueuePtr newSTQueue( unsigned nThreadAttributes = ThreadsCounter::kNoAttributes,
448 OdUInt32 nFlags = kStQueueNoFlags ) = 0;
449
458 virtual OdApcQueuePtr newMTQueue( unsigned nThreadAttributes = ThreadsCounter::kNoAttributes,
459 int numThreads = 0, OdUInt32 nFlags = kMtQueueNoFlags ) = 0;
460
465
469 virtual OdApcEventPtr newEvent() = 0;
470
475
480
486 virtual void executeMainThreadAction( MainThreadFunc mtFunc, void *pArg ) = 0;
487
488 /* Support for external threads manager */
489
500 virtual void registerExternalThreads( unsigned nThreads, const unsigned* aThreads, unsigned nThreadAttribs = ThreadsCounter::kNoAttributes ) = 0;
510 virtual void unregisterExternalThreads( unsigned nThreads, const unsigned* aThreads ) = 0;
514 virtual void externalThreadStart() = 0;
518 virtual void externalThreadStop() = 0;
528
532 virtual unsigned int getMainThreadId() const = 0;
536 virtual unsigned int getCurrentThreadId() const = 0;
540 inline bool isMainThread() const
541 {
543 }
544
545 //FELIX_CHANGE_BEGIN
546 virtual void init() = 0;
547 //FELIX_CHANGE_END
548};
549
551
552
557class OdApcQueueHelper : public OdSmartPtr<OdApcQueue> {
558public:
560 OdApcQueueHelper( const OdApcQueue* pObject, OdRxObjMod m ) : OdSmartPtr<OdApcQueue>( pObject, m ) {}
561 OdApcQueueHelper( const OdApcQueue* pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
562 OdApcQueueHelper( const OdRxObject* pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
563 OdApcQueueHelper( OdRxObject* pObject, OdRxObjMod m ) : OdSmartPtr<OdApcQueue>( pObject, m ) {}
565 OdApcQueueHelper( const OdRxObjectPtr& pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
566 OdApcQueueHelper( const OdBaseObjectPtr& pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
567
568 //FELIX_CHANGE_BEGIN
573 //FELIX_CHANGE_END
574
575 using OdSmartPtr<OdApcQueue>::operator =;
576
577 void initST( OdRxThreadPoolService* pThreadPool ) {
578 *this = pThreadPool ? pThreadPool->newSTQueue().get() : 0;
579 }
580
581 void initMT( OdRxThreadPoolService* pThreadPool ) {
582 *this = pThreadPool ? pThreadPool->newMTQueue().get() : 0;
583 }
584
585 void call( OdApcAtom* pAction, OdRxObject* pParam = 0 ) {
586 if( m_pObject ) {
587 get()->addEntryPoint( pAction, pParam );
588 }
589 else {
590 pAction->apcEntryPoint( pParam );
591 }
592 }
593
594 void call( OdApcAtom* pAction, OdApcParamType param ) {
595 if( m_pObject ) {
596 get()->addEntryPoint( pAction, param );
597 }
598 else {
599 pAction->apcEntryPoint( param );
600 }
601 }
602
603 void setAtomPoolRef( OdApcObjectPool* pAtomPool ) {
604 if( m_pObject ) {
605 get()->setAtomPoolRef( pAtomPool );
606 }
607 }
608
609 void wait() {
610 if( m_pObject ) {
611 get()->wait();
612 }
613 }
614};
615
616
698
699
704class OdApcQueueHelperArray : public OdArray<OdApcQueueHelper> {
705public:
706 void initST( int n, OdRxThreadPoolService* pTP ) {
707 resize( n );
708 if ( pTP ) {
709 for( int i=0; i<n; ++i ) {
710 setAt( i, pTP->newSTQueue() );
711 }
712 }
713 }
714 void wait() {
715 size_type n = size();
716 for( size_type i=0; i<n; ++i ) {
717 at( i ).wait();
718 }
719 }
720};
721
722
728 void* sync_next( OdMutex* mutex, OdUInt32 threadIndex, OdUInt32& itemIndex ) {
729 OdMutexAutoLock lock( *mutex );
730 return next( threadIndex, itemIndex );
731 }
732protected:
733 virtual OdMutex* mutexForNext() = 0;
734
735 virtual void* next( OdUInt32 threadIndex, OdUInt32& itemIndex ) = 0;
736
737 virtual void doAction( OdUInt32 threadIndex, OdUInt32 itemIndex, void* pItem ) = 0;
738
739 virtual void apcEntryPoint( OdApcParamType threadIndex ) {
740 OdMutex* mutex = mutexForNext();
741 OdUInt32 itemIndex;
742 while( void* pItem = sync_next( mutex, (OdUInt32)threadIndex, itemIndex ) ) {
743 doAction( (OdUInt32)threadIndex, itemIndex, pItem );
744 }
745 }
746public:
747 void for_each( OdApcQueue* pQueue, OdUInt32 nThreads = 0 ) {
748 if ( pQueue!=0 ) {
749 OdUInt32 n = nThreads==0 ? pQueue->framework().numCPUs() : nThreads;
750 if( n > 1 ) {
751 while( --n ) {
752 pQueue->addEntryPoint( this, (OdApcParamType)n );
753 }
755 pQueue->wait();
756 return;
757 }
758 }
759 apcEntryPoint( 0 );
760 }
761};
762
765template< class It, class Fn >
766class OdAsyncForEach : public OdStaticRxObject<OdApcAtom> {
767 OdMutex m_mutex;
768 It m_cur, m_last;
769 Fn m_fn;
770
771 bool next( It& cur ) {
772 OdMutexAutoLock lock( m_mutex );
773 if ( m_cur < m_last ) {
774 cur = m_cur;
775 ++m_cur;
776 return true;
777 }
778 return false;
779 }
780
781 void apcEntryPoint( OdRxObject* ) {
782 It cur;
783 if( next( cur ) ) {
784 do {
785 m_fn( *cur );
786 }
787 while( next( cur ) );
788 }
789 }
790public:
791 void for_each( OdApcQueue* pQueue, It first, It last, Fn fn ) {
792 unsigned long n = static_cast<unsigned long>(last-first);
793 if( pQueue && n>1 ) {
794 m_cur = first;
795 m_last = last;
796 m_fn = fn;
797
798 unsigned long numCPUs = pQueue->framework().numCPUs();
799 n = ( n < numCPUs ) ? n : numCPUs;
800 while( n-- ) {
801 pQueue->addEntryPoint( this );
802 }
803 pQueue->wait();
804 }
805 else {
806 std::for_each( first, last, fn );
807 }
808 }
809
810};
811
812//DOM-IGNORE-BEGIN
813template< class It, class Fn >
814void od_async_for_each( OdApcQueue* pQueue, It first, It last, Fn fn ) {
815 ODA_ASSERT_ONCE( first <= last );
816 if( first < last ) {
817 OdAsyncForEach< It, Fn >().for_each( pQueue, first, last, fn );
818 }
819}
820//DOM-IGNORE-END
821
824template< class TObject >
829 void operator () ( TObject& ) const {
830 // does nothing
831 }
832};
833
836template< class TObject, class TInitFn = OdApcObjectPoolHelperDummyInitFn<TObject> >
838 TObject* m_pEntries;
839 OdUInt32 m_nCount;
840public:
841 OdApcObjectPoolHelper() : m_pEntries( NULL ), m_nCount(0) {}
842 ~OdApcObjectPoolHelper() { release(); ::delete [] m_pEntries; m_pEntries = NULL; m_nCount=0; }
843 void init( OdRxThreadPoolService* pTP, int n = 0, const TInitFn* pInitFn = 0 ) {
844 if ( pTP ) {
845 if( n==0 ) {
846 n = pTP->numCPUs();
847 }
848 m_nCount = n;
849 m_pEntries = ::new TObject[n];
850 (*(OdApcObjectPoolPtr*)this) = pTP->newObjectPool();
851 while( --n ) {
852 TObject& obj = m_pEntries[ n ];
853 if( pInitFn ) {
854 (*pInitFn)( obj );
855 }
856 get()->put( &obj );
857 }
858 }
859 else {
860 m_nCount = 1;
861 m_pEntries = ::new TObject[1];
862 if( pInitFn ) {
863 (*pInitFn)( *m_pEntries );
864 }
865 }
866 }
867
868 TObject* take() {
869 if( m_pObject ) {
870 return static_cast<TObject*>( get()->take().get() );
871 }
872 return m_pEntries;
873 }
874
875 OdUInt32 size() const {
876 return m_nCount;
877 }
878
879 TObject& at( OdUInt32 i ) const {
880 if( i < m_nCount ) {
881 return m_pEntries[i];
882 }
883 throw OdError_InvalidIndex();
884 }
885
886 operator OdApcObjectPool* () { return get(); }
887 operator const OdApcObjectPool* () const { return get(); }
888};
889
890#include "TD_PackPop.h"
891
892#endif //_ODRXTHREADPOOLSERVICE_INCLUDED_
#define ODA_ASSERT_ONCE(exp)
Definition DebugStuff.h:73
unsigned int OdUInt32
#define ODRX_ABSTRACT
#define FIRSTDLL_EXPORT
Definition RootExport.h:39
OdRxObjMod
Definition RxObject.h:56
ptrdiff_t OdApcParamType
OdSmartPtr< OdApcGateway > OdApcGatewayPtr
OdSmartPtr< OdRxThreadPoolService > OdRxThreadPoolServicePtr
void(* OdApcEntryPointVoidParam)(OdApcParamType parameter)
OdApcMtQueueFlags
@ kMtQueueForceTopLevel
@ kMtQueueAllowExecByMain
@ kMtQueueNoFlags
@ kMtQueueLastFlag
@ kMtQueueForceNewThreads
OdSmartPtr< OdApcLoopedGateway > OdApcLoopedGatewayPtr
OdSmartPtr< OdApcObjectPool > OdApcObjectPoolPtr
void od_async_for_each(OdApcQueue *pQueue, It first, It last, Fn fn)
OdApcStQueueFlags
@ kStQueueExecByMain
@ kStQueueLastFlag
@ kStQueueForceTopLevel
@ kStQueueNoFlags
OdSmartPtr< OdApcEvent > OdApcEventPtr
OdSmartPtr< OdApcThread > OdApcThreadPtr
OdSmartPtr< OdApcDataReadWriteDispatcher > OdApcDataReadWriteDispatcherPtr
void(* OdApcEntryPointRxObjParam)(OdRxObject *parameter)
OdSmartPtr< OdApcAtom > OdApcAtomPtr
OdSmartPtr< OdApcQueue > OdApcQueuePtr
void(* ExecuteMainThreadFunc)(MainThreadFunc, void *)
void(* MainThreadFunc)(void *)
virtual void apcEntryPoint(OdRxObject *)
virtual void apcEntryPoint(OdApcParamType)
virtual void unlockFromInside()=0
virtual void lockFromInside()=0
virtual void reset()=0
virtual void set()=0
virtual void wait()=0
virtual void waitAndReset()=0
virtual void passBySecondary()=0
virtual void unlockByMain()=0
virtual void waitByMain()=0
virtual void lockByMain(OdUInt32 numThreads)=0
virtual void passByMainNoWait()=0
virtual void init(OdUInt32 numThreads)=0
virtual void waitByMain()=0
virtual void passBySecondary()=0
virtual void passByMain()=0
TObject & at(OdUInt32 i) const
void init(OdRxThreadPoolService *pTP, int n=0, const TInitFn *pInitFn=0)
virtual void put(OdRxObject *pObj)=0
virtual void reserve(OdUInt32 n)=0
virtual OdRxObjectPtr take()=0
void initST(int n, OdRxThreadPoolService *pTP)
OdApcQueueHelper(OdRxObject *pObject, OdRxObjMod m)
OdApcQueueHelper & operator=(const OdApcQueueHelper &)=default
void initST(OdRxThreadPoolService *pThreadPool)
OdApcQueueHelper(const OdRxObject *pObject)
void initMT(OdRxThreadPoolService *pThreadPool)
void call(OdApcAtom *pAction, OdApcParamType param)
void call(OdApcAtom *pAction, OdRxObject *pParam=0)
OdApcQueueHelper(const OdApcQueueHelper &)=default
OdApcQueueHelper(const OdApcQueue *pObject, OdRxObjMod m)
void setAtomPoolRef(OdApcObjectPool *pAtomPool)
OdApcQueueHelper(const OdBaseObjectPtr &pObject)
OdApcQueueHelper(const OdRxObjectPtr &pObject)
OdApcQueueHelper(const OdSmartPtr< OdApcQueue > &pObject)
OdApcQueueHelper(OdApcQueueHelper &&)=default
OdApcQueueHelper(const OdApcQueue *pObject)
virtual void wait()=0
virtual void addEntryPoint(OdApcAtom *pRecipient, OdRxObject *pMessage=0)=0
virtual void executeMainThreadAction(MainThreadFunc mtFunc, void *pArg)=0
virtual void addEntryPoint(OdApcAtom *pRecipient, OdApcParamType pMessage)=0
virtual void setAtomPoolRef(OdApcObjectPool *pAtomPool)=0
virtual OdRxThreadPoolService & framework()=0
virtual int numThreads() const =0
virtual void asyncProcCall(OdApcEntryPointRxObjParam ep, OdRxObject *parameter)=0
virtual void processException(bool bReThrow=true, bool bClear=true)=0
virtual unsigned int getId() const =0
virtual bool hasException() const =0
virtual void asyncProcCall(OdApcEntryPointVoidParam ep, OdApcParamType parameter)=0
virtual void wait(bool bNoThrow=false)=0
OdArray(size_type physicalLength, int growLength=8)
Definition OdArray.h:1941
size_type size() const
Definition OdArray.h:1254
OdArray & setAt(size_type arrayIndex, const OdApcQueueHelper &value)
Definition OdArray.h:1690
typename A::size_type size_type
Definition OdArray.h:837
OdApcQueueHelper & at(size_type arrayIndex)
Definition OdArray.h:1664
void resize(size_type logicalLength, const OdApcQueueHelper &value)
Definition OdArray.h:1192
virtual void apcEntryPoint(OdApcParamType threadIndex)
virtual void * next(OdUInt32 threadIndex, OdUInt32 &itemIndex)=0
virtual OdMutex * mutexForNext()=0
void for_each(OdApcQueue *pQueue, OdUInt32 nThreads=0)
virtual void doAction(OdUInt32 threadIndex, OdUInt32 itemIndex, void *pItem)=0
void for_each(OdApcQueue *pQueue, It first, It last, Fn fn)
OdRxObject * m_pObject
OdRxObject * get()
Definition RxObject.h:503
virtual OdApcThreadPtr newThread()=0
virtual int numThreads() const =0
virtual OdApcLoopedGatewayPtr newLoopedGateway()=0
virtual int numFreeThreads() const =0
virtual int numCPUs() const =0
virtual void init()=0
virtual void externalThreadStop()=0
virtual void registerExternalThreads(unsigned nThreads, const unsigned *aThreads, unsigned nThreadAttribs=ThreadsCounter::kNoAttributes)=0
virtual OdApcObjectPoolPtr newObjectPool()=0
virtual void externalThreadStart()=0
ODRX_DECLARE_MEMBERS(OdRxThreadPoolService)
virtual void executeMainThreadAction(MainThreadFunc mtFunc, void *pArg)=0
virtual void setExternalMainThreadFunc(ExecuteMainThreadFunc execFunc)=0
virtual OdApcQueuePtr newSTQueue(unsigned nThreadAttributes=ThreadsCounter::kNoAttributes, OdUInt32 nFlags=kStQueueNoFlags)=0
virtual OdApcEventPtr newEvent()=0
virtual OdApcQueuePtr newMTQueue(unsigned nThreadAttributes=ThreadsCounter::kNoAttributes, int numThreads=0, OdUInt32 nFlags=kMtQueueNoFlags)=0
virtual void unregisterExternalThreads(unsigned nThreads, const unsigned *aThreads)=0
virtual unsigned int getCurrentThreadId() const =0
virtual ExecuteMainThreadFunc getExternalMainThreadFunc() const =0
virtual unsigned int getMainThreadId() const =0
virtual int numPhysicalCores() const =0
virtual OdApcGatewayPtr newGateway()=0
const T * get() const
Definition SmartPtr.h:315
typedef void(APIENTRYP PFNGLACTIVETEXTUREPROC)(GLenum texture)