CFx SDK Documentation 2024 SP0
Loading...
Searching...
No Matches
RxThreadPoolService.h
Go to the documentation of this file.
1
2// Copyright (C) 2002-2022, Open Design Alliance (the "Alliance").
3// All rights reserved.
4//
5// This software and its documentation and related materials are owned by
6// the Alliance. The software may only be incorporated into application
7// programs owned by members of the Alliance, subject to a signed
8// Membership Agreement and Supplemental Software License Agreement with the
9// Alliance. The structure and organization of this software are the valuable
10// trade secrets of the Alliance and its suppliers. The software is also
11// protected by copyright law and international treaty provisions. Application
12// programs incorporating this software must include the following statement
13// with their copyright notices:
14//
15// This application incorporates Open Design Alliance software pursuant to a license
16// agreement with Open Design Alliance.
17// Open Design Alliance Copyright (C) 2002-2022 by Open Design Alliance.
18// All rights reserved.
19//
20// By use of this software, its documentation or related materials, you
21// acknowledge and accept the above terms.
23
24#ifndef _ODRXTHREADPOOLSERVICE_INCLUDED_
25#define _ODRXTHREADPOOLSERVICE_INCLUDED_ /* { Secret } **/
26
27#include "TD_PackPush.h"
28
29#include "RxModule.h"
30#include "ThreadsCounter.h"
31#include "OdArray.h"
32#include "StaticRxObject.h"
33
34#include <algorithm>
35
37
38typedef ptrdiff_t OdApcParamType;
39
41
42typedef void (*OdApcEntryPointRxObjParam)( OdRxObject* parameter );
43
50public:
56 virtual void asyncProcCall( OdApcEntryPointVoidParam ep, OdApcParamType parameter ) = 0;
57
63 virtual void asyncProcCall( OdApcEntryPointRxObjParam ep, OdRxObject* parameter ) = 0;
64
69 virtual void wait(bool bNoThrow = false) = 0;
70
74 virtual unsigned int getId() const = 0;
75
79 virtual bool hasException() const = 0;
80
86 virtual void processException(bool bReThrow = true, bool bClear = true) = 0;
87};
88
93
94
101public:
105 virtual void apcEntryPoint( OdRxObject* pMessage ) {}
106
110 virtual void apcEntryPoint( OdApcParamType pMessage ) {}
111};
112
117
118
125public:
129 virtual void reserve( OdUInt32 n ) = 0;
130
134 virtual OdRxObjectPtr take() = 0;
135
139 virtual void put( OdRxObject* pObj ) = 0;
140};
141
143
144
145class OdApcObjectPool;
146
153public:
158
162 virtual void setAtomPoolRef( OdApcObjectPool* pAtomPool ) = 0;
163
167 virtual void addEntryPoint( OdApcAtom* pRecipient, OdRxObject* pMessage = 0 ) = 0;
168
172 virtual void addEntryPoint( OdApcAtom* pRecipient, OdApcParamType pMessage ) = 0;
173
179 virtual void wait() = 0;
180
188 virtual void executeMainThreadAction( MainThreadFunc mtFunc, void *pArg ) = 0;
189
193 virtual int numThreads() const = 0;
194};
195
200
201
212public:
216 virtual void enter() = 0;
217
221 virtual void leave() = 0;
222
230 virtual void lock() = 0;
231
235 virtual void unlock() = 0;
236
242 virtual void lockFromInside() = 0;
243
247 virtual void unlockFromInside() = 0;
248};
249
254
255
262public:
266 virtual void set() = 0;
267
271 virtual void reset() = 0;
272
276 virtual void wait() = 0;
277
281 virtual void waitAndReset() = 0;
282};
283
288
289
296public:
302 virtual void lockByMain( OdUInt32 numThreads ) = 0;
303
307 virtual void waitByMain() = 0;
308
312 virtual void unlockByMain() = 0;
313
315
319 virtual void passBySecondary() = 0;
320};
321
326
333public:
334
337 virtual void init( OdUInt32 numThreads ) = 0;
338
342 virtual void passByMain() = 0;
343
349 virtual void waitByMain() = 0;
350
354 virtual void passByMainNoWait() = 0;
355
357
361 virtual void passBySecondary() = 0;
362};
363
368
369
376{
377 kStQueueNoFlags = 0, // Empty StQueue flags.
378
379 kStQueueExecByMain = (1 << 0), // Use main thread for queue tasks execution.
380 kStQueueForceTopLevel = (1 << 1), // Make StQueue top level even if other registered threads already run.
381
384
385
392{
393 kMtQueueNoFlags = 0, // Empty MtQueue flags.
394
395 kMtQueueForceNewThreads = (1 << 0), // Spawn new threads and add them to the pool if there are not enough free threads.
396 kMtQueueAllowExecByMain = (1 << 1), // Allow to use main thread for execution if there are no free threads.
397 kMtQueueForceTopLevel = (1 << 2), // Make MtQueue top level even if other registered threads already run.
398
401
402
408public:
410
414 virtual int numCPUs() const = 0;
415
419 virtual int numPhysicalCores() const = 0;
420
424 virtual int numThreads() const = 0;
425
431 virtual int numFreeThreads() const = 0;
432
437
443 virtual OdApcQueuePtr newSTQueue( unsigned nThreadAttributes = ThreadsCounter::kNoAttributes,
444 OdUInt32 nFlags = kStQueueNoFlags ) = 0;
445
454 virtual OdApcQueuePtr newMTQueue( unsigned nThreadAttributes = ThreadsCounter::kNoAttributes,
455 int numThreads = 0, OdUInt32 nFlags = kMtQueueNoFlags ) = 0;
456
461
465 virtual OdApcEventPtr newEvent() = 0;
466
471
476
482 virtual void executeMainThreadAction( MainThreadFunc mtFunc, void *pArg ) = 0;
483
484 /* Support for external threads manager */
485
496 virtual void registerExternalThreads( unsigned nThreads, const unsigned* aThreads, unsigned nThreadAttribs = ThreadsCounter::kNoAttributes ) = 0;
506 virtual void unregisterExternalThreads( unsigned nThreads, const unsigned* aThreads ) = 0;
510 virtual void externalThreadStart() = 0;
514 virtual void externalThreadStop() = 0;
524
528 virtual unsigned int getMainThreadId() const = 0;
532 virtual unsigned int getCurrentThreadId() const = 0;
536 inline bool isMainThread() const
537 {
538 return getMainThreadId() == getCurrentThreadId();
539 }
540
541 //FELIX_CHANGE_BEGIN
542 virtual void init() = 0;
543 //FELIX_CHANGE_END
544};
545
547
548
553class OdApcQueueHelper : public OdSmartPtr<OdApcQueue> {
554public:
556 OdApcQueueHelper( const OdApcQueue* pObject, OdRxObjMod m ) : OdSmartPtr<OdApcQueue>( pObject, m ) {}
557 OdApcQueueHelper( const OdApcQueue* pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
558 OdApcQueueHelper( const OdRxObject* pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
559 OdApcQueueHelper( OdRxObject* pObject, OdRxObjMod m ) : OdSmartPtr<OdApcQueue>( pObject, m ) {}
561 OdApcQueueHelper( const OdRxObjectPtr& pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
562 OdApcQueueHelper( const OdBaseObjectPtr& pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
563
564 //FELIX_CHANGE_BEGIN
569 //FELIX_CHANGE_END
570
571 using OdSmartPtr<OdApcQueue>::operator =;
572
573 void initST( OdRxThreadPoolService* pThreadPool ) {
574 *this = pThreadPool ? pThreadPool->newSTQueue().get() : 0;
575 }
576
577 void initMT( OdRxThreadPoolService* pThreadPool ) {
578 *this = pThreadPool ? pThreadPool->newMTQueue().get() : 0;
579 }
580
581 void call( OdApcAtom* pAction, OdRxObject* pParam = 0 ) {
582 if( m_pObject ) {
583 get()->addEntryPoint( pAction, pParam );
584 }
585 else {
586 pAction->apcEntryPoint( pParam );
587 }
588 }
589
590 void call( OdApcAtom* pAction, OdApcParamType param ) {
591 if( m_pObject ) {
592 get()->addEntryPoint( pAction, param );
593 }
594 else {
595 pAction->apcEntryPoint( param );
596 }
597 }
598
599 void setAtomPoolRef( OdApcObjectPool* pAtomPool ) {
600 if( m_pObject ) {
601 get()->setAtomPoolRef( pAtomPool );
602 }
603 }
604
605 void wait() {
606 if( m_pObject ) {
607 get()->wait();
608 }
609 }
610};
611
612
700class OdApcQueueHelperArray : public OdArray<OdApcQueueHelper> {
701public:
702 void initST( int n, OdRxThreadPoolService* pTP ) {
703 resize( n );
704 if ( pTP ) {
705 for( int i=0; i<n; ++i ) {
706 setAt( i, pTP->newSTQueue() );
707 }
708 }
709 }
710 void wait() {
711 size_type n = size();
712 for( size_type i=0; i<n; ++i ) {
713 at( i ).wait();
714 }
715 }
716};
717
718
724 void* sync_next( OdMutex* mutex, OdUInt32 threadIndex, OdUInt32& itemIndex ) {
725 OdMutexAutoLock lock( *mutex );
726 return next( threadIndex, itemIndex );
727 }
728protected:
729 virtual OdMutex* mutexForNext() = 0;
730
731 virtual void* next( OdUInt32 threadIndex, OdUInt32& itemIndex ) = 0;
732
733 virtual void doAction( OdUInt32 threadIndex, OdUInt32 itemIndex, void* pItem ) = 0;
734
735 virtual void apcEntryPoint( OdApcParamType threadIndex ) {
736 OdMutex* mutex = mutexForNext();
737 OdUInt32 itemIndex;
738 while( void* pItem = sync_next( mutex, (OdUInt32)threadIndex, itemIndex ) ) {
739 doAction( (OdUInt32)threadIndex, itemIndex, pItem );
740 }
741 }
742public:
743 void for_each( OdApcQueue* pQueue, OdUInt32 nThreads = 0 ) {
744 if ( pQueue!=0 ) {
745 OdUInt32 n = nThreads==0 ? pQueue->framework().numCPUs() : nThreads;
746 if( n > 1 ) {
747 while( --n ) {
748 pQueue->addEntryPoint( this, (OdApcParamType)n );
749 }
750 apcEntryPoint( (OdApcParamType)n );
751 pQueue->wait();
752 return;
753 }
754 }
755 apcEntryPoint( 0 );
756 }
757};
758
761template< class It, class Fn >
762class OdAsyncForEach : public OdStaticRxObject<OdApcAtom> {
763 OdMutex m_mutex;
764 It m_cur, m_last;
765 Fn m_fn;
766
767 bool next( It& cur ) {
768 OdMutexAutoLock lock( m_mutex );
769 if ( m_cur < m_last ) {
770 cur = m_cur;
771 ++m_cur;
772 return true;
773 }
774 return false;
775 }
776
777 void apcEntryPoint( OdRxObject* ) {
778 It cur;
779 if( next( cur ) ) {
780 do {
781 m_fn( *cur );
782 }
783 while( next( cur ) );
784 }
785 }
786public:
787 void for_each( OdApcQueue* pQueue, It first, It last, Fn fn ) {
788 unsigned long n = last-first;
789 if( pQueue && n>1 ) {
790 m_cur = first;
791 m_last = last;
792 m_fn = fn;
793
794 unsigned long numCPUs = pQueue->framework().numCPUs();
795 n = ( n < numCPUs ) ? n : numCPUs;
796 while( n-- ) {
797 pQueue->addEntryPoint( this );
798 }
799 pQueue->wait();
800 }
801 else {
802 std::for_each( first, last, fn );
803 }
804 }
805
806};
807
808//DOM-IGNORE-BEGIN
809template< class It, class Fn >
810void od_async_for_each( OdApcQueue* pQueue, It first, It last, Fn fn ) {
811 ODA_ASSERT_ONCE( first <= last );
812 if( first < last ) {
813 OdAsyncForEach< It, Fn >().for_each( pQueue, first, last, fn );
814 }
815}
816//DOM-IGNORE-END
817
820template< class TObject >
825 void operator () ( TObject& ) const {
826 // does nothing
827 }
828};
829
832template< class TObject, class TInitFn = OdApcObjectPoolHelperDummyInitFn<TObject> >
834 TObject* m_pEntries;
835 OdUInt32 m_nCount;
836public:
837 OdApcObjectPoolHelper() : m_pEntries( NULL ), m_nCount(0) {}
838 ~OdApcObjectPoolHelper() { release(); ::delete [] m_pEntries; m_pEntries = NULL; m_nCount=0; }
839 void init( OdRxThreadPoolService* pTP, int n = 0, const TInitFn* pInitFn = 0 ) {
840 if ( pTP ) {
841 if( n==0 ) {
842 n = pTP->numCPUs();
843 }
844 m_nCount = n;
845 m_pEntries = ::new TObject[n];
846 (*(OdApcObjectPoolPtr*)this) = pTP->newObjectPool();
847 while( --n ) {
848 TObject& obj = m_pEntries[ n ];
849 if( pInitFn ) {
850 (*pInitFn)( obj );
851 }
852 get()->put( &obj );
853 }
854 }
855 else {
856 m_nCount = 1;
857 m_pEntries = ::new TObject[1];
858 if( pInitFn ) {
859 (*pInitFn)( *m_pEntries );
860 }
861 }
862 }
863
864 TObject* take() {
865 if( m_pObject ) {
866 return static_cast<TObject*>( get()->take().get() );
867 }
868 return m_pEntries;
869 }
870
871 OdUInt32 size() const {
872 return m_nCount;
873 }
874
875 TObject& at( OdUInt32 i ) const {
876 if( i < m_nCount ) {
877 return m_pEntries[i];
878 }
879 throw OdError_InvalidIndex();
880 }
881
882 operator OdApcObjectPool* () { return get(); }
883 operator const OdApcObjectPool* () const { return get(); }
884};
885
886#include "TD_PackPop.h"
887
888#endif //_ODRXTHREADPOOLSERVICE_INCLUDED_
#define ODA_ASSERT_ONCE(exp)
Definition: DebugStuff.h:73
unsigned int OdUInt32
#define ODRX_ABSTRACT
#define FIRSTDLL_EXPORT
Definition: RootExport.h:39
OdRxObjMod
Definition: RxObject.h:56
ptrdiff_t OdApcParamType
OdSmartPtr< OdApcGateway > OdApcGatewayPtr
OdSmartPtr< OdRxThreadPoolService > OdRxThreadPoolServicePtr
void(* OdApcEntryPointVoidParam)(OdApcParamType parameter)
OdApcMtQueueFlags
@ kMtQueueForceTopLevel
@ kMtQueueAllowExecByMain
@ kMtQueueNoFlags
@ kMtQueueLastFlag
@ kMtQueueForceNewThreads
OdSmartPtr< OdApcLoopedGateway > OdApcLoopedGatewayPtr
OdSmartPtr< OdApcObjectPool > OdApcObjectPoolPtr
void od_async_for_each(OdApcQueue *pQueue, It first, It last, Fn fn)
OdApcStQueueFlags
@ kStQueueExecByMain
@ kStQueueLastFlag
@ kStQueueForceTopLevel
@ kStQueueNoFlags
OdSmartPtr< OdApcEvent > OdApcEventPtr
OdSmartPtr< OdApcThread > OdApcThreadPtr
OdSmartPtr< OdApcDataReadWriteDispatcher > OdApcDataReadWriteDispatcherPtr
void(* OdApcEntryPointRxObjParam)(OdRxObject *parameter)
OdSmartPtr< OdApcAtom > OdApcAtomPtr
OdSmartPtr< OdApcQueue > OdApcQueuePtr
void(* ExecuteMainThreadFunc)(MainThreadFunc, void *)
void(* MainThreadFunc)(void *)
virtual void apcEntryPoint(OdRxObject *pMessage)
virtual void apcEntryPoint(OdApcParamType pMessage)
virtual void unlockFromInside()=0
virtual void lockFromInside()=0
virtual void reset()=0
virtual void set()=0
virtual void wait()=0
virtual void waitAndReset()=0
virtual void passBySecondary()=0
virtual void unlockByMain()=0
virtual void waitByMain()=0
virtual void lockByMain(OdUInt32 numThreads)=0
virtual void passByMainNoWait()=0
virtual void init(OdUInt32 numThreads)=0
virtual void waitByMain()=0
virtual void passBySecondary()=0
virtual void passByMain()=0
TObject & at(OdUInt32 i) const
void init(OdRxThreadPoolService *pTP, int n=0, const TInitFn *pInitFn=0)
virtual void put(OdRxObject *pObj)=0
virtual void reserve(OdUInt32 n)=0
virtual OdRxObjectPtr take()=0
void initST(int n, OdRxThreadPoolService *pTP)
OdApcQueueHelper(OdRxObject *pObject, OdRxObjMod m)
OdApcQueueHelper & operator=(const OdApcQueueHelper &)=default
void initST(OdRxThreadPoolService *pThreadPool)
OdApcQueueHelper(const OdRxObject *pObject)
void initMT(OdRxThreadPoolService *pThreadPool)
void call(OdApcAtom *pAction, OdApcParamType param)
void call(OdApcAtom *pAction, OdRxObject *pParam=0)
OdApcQueueHelper(const OdApcQueueHelper &)=default
OdApcQueueHelper(const OdApcQueue *pObject, OdRxObjMod m)
void setAtomPoolRef(OdApcObjectPool *pAtomPool)
OdApcQueueHelper(const OdBaseObjectPtr &pObject)
OdApcQueueHelper(const OdRxObjectPtr &pObject)
OdApcQueueHelper(const OdSmartPtr< OdApcQueue > &pObject)
OdApcQueueHelper(OdApcQueueHelper &&)=default
OdApcQueueHelper(const OdApcQueue *pObject)
virtual void wait()=0
virtual void addEntryPoint(OdApcAtom *pRecipient, OdRxObject *pMessage=0)=0
virtual void executeMainThreadAction(MainThreadFunc mtFunc, void *pArg)=0
virtual void addEntryPoint(OdApcAtom *pRecipient, OdApcParamType pMessage)=0
virtual void setAtomPoolRef(OdApcObjectPool *pAtomPool)=0
virtual OdRxThreadPoolService & framework()=0
virtual int numThreads() const =0
virtual void asyncProcCall(OdApcEntryPointRxObjParam ep, OdRxObject *parameter)=0
virtual void processException(bool bReThrow=true, bool bClear=true)=0
virtual unsigned int getId() const =0
virtual bool hasException() const =0
virtual void asyncProcCall(OdApcEntryPointVoidParam ep, OdApcParamType parameter)=0
virtual void wait(bool bNoThrow=false)=0
size_type size() const
Definition: OdArray.h:1247
OdArray & setAt(size_type arrayIndex, const OdApcQueueHelper &value)
Definition: OdArray.h:1668
typename A::size_type size_type
Definition: OdArray.h:831
OdApcQueueHelper & at(size_type arrayIndex)
Definition: OdArray.h:1642
void resize(size_type logicalLength, const OdApcQueueHelper &value)
Definition: OdArray.h:1185
virtual void apcEntryPoint(OdApcParamType threadIndex)
virtual void * next(OdUInt32 threadIndex, OdUInt32 &itemIndex)=0
virtual OdMutex * mutexForNext()=0
void for_each(OdApcQueue *pQueue, OdUInt32 nThreads=0)
virtual void doAction(OdUInt32 threadIndex, OdUInt32 itemIndex, void *pItem)=0
void for_each(OdApcQueue *pQueue, It first, It last, Fn fn)
OdRxObject * m_pObject
Definition: BaseObjectPtr.h:54
virtual OdApcThreadPtr newThread()=0
virtual int numThreads() const =0
virtual OdApcLoopedGatewayPtr newLoopedGateway()=0
virtual int numFreeThreads() const =0
virtual int numCPUs() const =0
virtual void init()=0
virtual void externalThreadStop()=0
virtual void registerExternalThreads(unsigned nThreads, const unsigned *aThreads, unsigned nThreadAttribs=ThreadsCounter::kNoAttributes)=0
virtual OdApcObjectPoolPtr newObjectPool()=0
virtual void externalThreadStart()=0
ODRX_DECLARE_MEMBERS(OdRxThreadPoolService)
virtual void executeMainThreadAction(MainThreadFunc mtFunc, void *pArg)=0
virtual void setExternalMainThreadFunc(ExecuteMainThreadFunc execFunc)=0
virtual OdApcQueuePtr newSTQueue(unsigned nThreadAttributes=ThreadsCounter::kNoAttributes, OdUInt32 nFlags=kStQueueNoFlags)=0
virtual OdApcEventPtr newEvent()=0
virtual OdApcQueuePtr newMTQueue(unsigned nThreadAttributes=ThreadsCounter::kNoAttributes, int numThreads=0, OdUInt32 nFlags=kMtQueueNoFlags)=0
virtual void unregisterExternalThreads(unsigned nThreads, const unsigned *aThreads)=0
virtual unsigned int getCurrentThreadId() const =0
virtual ExecuteMainThreadFunc getExternalMainThreadFunc() const =0
virtual unsigned int getMainThreadId() const =0
virtual int numPhysicalCores() const =0
virtual OdApcGatewayPtr newGateway()=0
void release()
Definition: SmartPtr.h:269
const T * get() const
Definition: SmartPtr.h:339
typedef void(APIENTRYP PFNGLACTIVETEXTUREPROC)(GLenum texture)