CFx SDK Documentation  2022 SP0
RxThreadPoolService.h
Go to the documentation of this file.
1 // Copyright (C) 2002-2017, Open Design Alliance (the "Alliance").
3 // All rights reserved.
4 //
5 // This software and its documentation and related materials are owned by
6 // the Alliance. The software may only be incorporated into application
7 // programs owned by members of the Alliance, subject to a signed
8 // Membership Agreement and Supplemental Software License Agreement with the
9 // Alliance. The structure and organization of this software are the valuable
10 // trade secrets of the Alliance and its suppliers. The software is also
11 // protected by copyright law and international treaty provisions. Application
12 // programs incorporating this software must include the following statement
13 // with their copyright notices:
14 //
15 // This application incorporates Teigha(R) software pursuant to a license
16 // agreement with Open Design Alliance.
17 // Teigha(R) Copyright (C) 2002-2017 by Open Design Alliance.
18 // All rights reserved.
19 //
20 // By use of this software, its documentation or related materials, you
21 // acknowledge and accept the above terms.
23 
24 #ifndef _ODRXTHREADPOOLSERVICE_INCLUDED_
25 #define _ODRXTHREADPOOLSERVICE_INCLUDED_ /* { Secret } **/
26 
27 #include "TD_PackPush.h"
28 
29 #include "RxModule.h"
30 #include "ThreadsCounter.h"
31 #include "OdArray.h"
32 #include "StaticRxObject.h"
33 
34 #include <algorithm>
35 
37 
38 typedef ptrdiff_t OdApcParamType;
39 
41 
42 typedef void (*OdApcEntryPointRxObjParam)( OdRxObject* parameter );
43 
50 public:
56  virtual void asyncProcCall( OdApcEntryPointVoidParam ep, OdApcParamType parameter ) = 0;
57 
63  virtual void asyncProcCall( OdApcEntryPointRxObjParam ep, OdRxObject* parameter ) = 0;
64 
68  virtual void wait() = 0;
69 
73  virtual unsigned int getId() const = 0;
74 };
75 
80 
81 
88 public:
92  virtual void apcEntryPoint( OdRxObject* pMessage ) {}
93 
97  virtual void apcEntryPoint( OdApcParamType pMessage ) {}
98 };
99 
104 
105 
111 class OdApcObjectPool : public OdRxObject {
112 public:
116  virtual void reserve( OdUInt32 n ) = 0;
117 
121  virtual OdRxObjectPtr take() = 0;
122 
126  virtual void put( OdRxObject* pObj ) = 0;
127 };
128 
130 
131 
132 class OdApcObjectPool;
133 
140 public:
145 
149  virtual void setAtomPoolRef( OdApcObjectPool* pAtomPool ) = 0;
150 
154  virtual void addEntryPoint( OdApcAtom* pRecipient, OdRxObject* pMessage = 0 ) = 0;
155 
159  virtual void addEntryPoint( OdApcAtom* pRecipient, OdApcParamType pMessage ) = 0;
160 
166  virtual void wait() = 0;
167 
175  virtual void executeMainThreadAction( MainThreadFunc mtFunc, void *pArg ) = 0;
176 };
177 
182 
183 
194 public:
198  virtual void enter() = 0;
199 
203  virtual void leave() = 0;
204 
212  virtual void lock() = 0;
213 
217  virtual void unlock() = 0;
218 
224  virtual void lockFromInside() = 0;
225 
229  virtual void unlockFromInside() = 0;
230 };
231 
236 
237 
244 public:
248  virtual void set() = 0;
249 
253  virtual void reset() = 0;
254 
258  virtual void wait() = 0;
259 
263  virtual void waitAndReset() = 0;
264 };
265 
270 
271 
278 public:
284  virtual void lockByMain( OdUInt32 numThreads ) = 0;
285 
289  virtual void waitByMain() = 0;
290 
294  virtual void unlockByMain() = 0;
295 
297 
301  virtual void passBySecondary() = 0;
302 };
303 
308 
315 public:
316 
319  virtual void init( OdUInt32 numThreads ) = 0;
320 
324  virtual void passByMain() = 0;
325 
331  virtual void waitByMain() = 0;
332 
336  virtual void passByMainNoWait() = 0;
337 
339 
343  virtual void passBySecondary() = 0;
344 };
345 
350 
351 
358 {
359  kMtQueueNoFlags = 0, // Empty MtQueue flags.
360 
361  kMtQueueForceNewThreads = (1 << 0), // Spawn new threads and add them to the pool if there are not enough free threads.
362  kMtQueueAllowExecByMain = (1 << 1), // Allow to use main thread for execution if there are no free threads.
363  kMtQueueForceTopLevel = (1 << 2), // Make MtQueue top level even if other registered threads already run.
364 
366 };
367 
368 
374 public:
376 
380  virtual int numCPUs() const = 0;
381 
385  virtual int numPhysicalCores() const = 0;
386 
390  virtual int numThreads() const = 0;
391 
397  virtual int numFreeThreads() const = 0;
398 
402  virtual OdApcThreadPtr newThread() = 0;
403 
409  virtual OdApcQueuePtr newSTQueue() = 0;
410 
419  virtual OdApcQueuePtr newMTQueue( unsigned nThreadAttributes = ThreadsCounter::kNoAttributes,
420  int numThreads = 0, OdUInt32 nFlags = kMtQueueNoFlags ) = 0;
421 
426 
430  virtual OdApcEventPtr newEvent() = 0;
431 
436 
441 
447  virtual void executeMainThreadAction( MainThreadFunc mtFunc, void *pArg ) = 0;
448 
449  /* Support for external threads manager */
450 
461  virtual void registerExternalThreads( unsigned nThreads, const unsigned* aThreads, unsigned nThreadAttribs = ThreadsCounter::kNoAttributes ) = 0;
471  virtual void unregisterExternalThreads( unsigned nThreads, const unsigned* aThreads ) = 0;
475  virtual void externalThreadStart() = 0;
479  virtual void externalThreadStop() = 0;
484  virtual void setExternalMainThreadFunc( ExecuteMainThreadFunc execFunc ) = 0;
489 
490  //FELIX_CHANGE_BEGIN
491  virtual void init() = 0;
492  //FELIX_CHANGE_END
493 };
494 
496 
497 
502 class OdApcQueueHelper : public OdSmartPtr<OdApcQueue> {
503 public:
505  OdApcQueueHelper( const OdApcQueue* pObject, OdRxObjMod m ) : OdSmartPtr<OdApcQueue>( pObject, m ) {}
506  OdApcQueueHelper( const OdApcQueue* pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
507  OdApcQueueHelper( const OdRxObject* pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
508  OdApcQueueHelper( OdRxObject* pObject, OdRxObjMod m ) : OdSmartPtr<OdApcQueue>( pObject, m ) {}
510  OdApcQueueHelper( const OdRxObjectPtr& pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
511  OdApcQueueHelper( const OdBaseObjectPtr& pObject ) : OdSmartPtr<OdApcQueue>( pObject ) {}
512 
514 
515  void initST( OdRxThreadPoolService* pThreadPool ) {
516  *this = pThreadPool ? pThreadPool->newSTQueue().get() : 0;
517  }
518 
519  void initMT( OdRxThreadPoolService* pThreadPool ) {
520  *this = pThreadPool ? pThreadPool->newMTQueue().get() : 0;
521  }
522 
523  void call( OdApcAtom* pAction, OdRxObject* pParam = 0 ) {
524  if( m_pObject ) {
525  get()->addEntryPoint( pAction, pParam );
526  }
527  else {
528  pAction->apcEntryPoint( pParam );
529  }
530  }
531 
532  void call( OdApcAtom* pAction, OdApcParamType param ) {
533  if( m_pObject ) {
534  get()->addEntryPoint( pAction, param );
535  }
536  else {
537  pAction->apcEntryPoint( param );
538  }
539  }
540 
541  void setAtomPoolRef( OdApcObjectPool* pAtomPool ) {
542  if( m_pObject ) {
543  get()->setAtomPoolRef( pAtomPool );
544  }
545  }
546 
547  void wait() {
548  if( m_pObject ) {
549  get()->wait();
550  }
551  }
552 };
553 
554 
642 class OdApcQueueHelperArray : public OdArray<OdApcQueueHelper> {
643 public:
644  void initST( int n, OdRxThreadPoolService* pTP ) {
645  resize( n );
646  if ( pTP ) {
647  for( int i=0; i<n; ++i ) {
648  setAt( i, pTP->newSTQueue() );
649  }
650  }
651  }
652  void wait() {
653  size_type n = size();
654  for( size_type i=0; i<n; ++i ) {
655  at( i ).wait();
656  }
657  }
658 };
659 
660 
666  void* sync_next( OdMutex* mutex, OdUInt32 threadIndex, OdUInt32& itemIndex ) {
667  OdMutexAutoLock lock( *mutex );
668  return next( threadIndex, itemIndex );
669  }
670 protected:
671  virtual OdMutex* mutexForNext() = 0;
672 
673  virtual void* next( OdUInt32 threadIndex, OdUInt32& itemIndex ) = 0;
674 
675  virtual void doAction( OdUInt32 threadIndex, OdUInt32 itemIndex, void* pItem ) = 0;
676 
677  virtual void apcEntryPoint( OdApcParamType threadIndex ) {
678  OdMutex* mutex = mutexForNext();
679  OdUInt32 itemIndex;
680  while( void* pItem = sync_next( mutex, (OdUInt32)threadIndex, itemIndex ) ) {
681  doAction( (OdUInt32)threadIndex, itemIndex, pItem );
682  }
683  }
684 public:
685  void for_each( OdApcQueue* pQueue, OdUInt32 nThreads = 0 ) {
686  if ( pQueue!=0 ) {
687  OdUInt32 n = nThreads==0 ? pQueue->framework().numCPUs() : nThreads;
688  if( n > 1 ) {
689  while( --n ) {
690  pQueue->addEntryPoint( this, (OdApcParamType)n );
691  }
692  apcEntryPoint( (OdApcParamType)n );
693  pQueue->wait();
694  return;
695  }
696  }
697  apcEntryPoint( 0 );
698  }
699 };
700 
703 template< class It, class Fn >
704 class OdAsyncForEach : public OdStaticRxObject<OdApcAtom> {
705  OdMutex m_mutex;
706  It m_cur, m_last;
707  Fn m_fn;
708 
709  bool next( It& cur ) {
710  OdMutexAutoLock lock( m_mutex );
711  if ( m_cur < m_last ) {
712  cur = m_cur;
713  ++m_cur;
714  return true;
715  }
716  return false;
717  }
718 
719  void apcEntryPoint( OdRxObject* ) {
720  It cur;
721  if( next( cur ) ) {
722  do {
723  m_fn( *cur );
724  }
725  while( next( cur ) );
726  }
727  }
728 public:
729  void for_each( OdApcQueue* pQueue, It first, It last, Fn fn ) {
730  unsigned long n = last-first;
731  if( pQueue && n>1 ) {
732  m_cur = first;
733  m_last = last;
734  m_fn = fn;
735 
736  unsigned long numCPUs = pQueue->framework().numCPUs();
737  n = ( n < numCPUs ) ? n : numCPUs;
738  while( n-- ) {
739  pQueue->addEntryPoint( this );
740  }
741  pQueue->wait();
742  }
743  else {
744  std::for_each( first, last, fn );
745  }
746  }
747 
748 };
749 
750 template< class It, class Fn >
751 void od_async_for_each( OdApcQueue* pQueue, It first, It last, Fn fn ) {
752  ODA_ASSERT_ONCE( first <= last );
753  if( first < last ) {
754  OdAsyncForEach< It, Fn >().for_each( pQueue, first, last, fn );
755  }
756 }
759 template< class TObject >
761  void operator () ( TObject& ) const {
762  // does nothing
763  }
764 };
765 
768 template< class TObject, class TInitFn = OdApcObjectPoolHelperDummyInitFn<TObject> >
770  TObject* m_pEntries;
771  OdUInt32 m_nCount;
772 public:
773  OdApcObjectPoolHelper() : m_pEntries( NULL ), m_nCount(0) {}
774  ~OdApcObjectPoolHelper() { release(); ::delete [] m_pEntries; m_pEntries = NULL; m_nCount=0; }
775  void init( OdRxThreadPoolService* pTP, int n = 0, const TInitFn* pInitFn = 0 ) {
776  if ( pTP ) {
777  if( n==0 ) {
778  n = pTP->numCPUs();
779  }
780  m_nCount = n;
781  m_pEntries = ::new TObject[n];
782  (*(OdApcObjectPoolPtr*)this) = pTP->newObjectPool();
783  while( --n ) {
784  TObject& obj = m_pEntries[ n ];
785  if( pInitFn ) {
786  (*pInitFn)( obj );
787  }
788  get()->put( &obj );
789  }
790  }
791  else {
792  m_nCount = 1;
793  m_pEntries = ::new TObject[1];
794  if( pInitFn ) {
795  (*pInitFn)( *m_pEntries );
796  }
797  }
798  }
799 
800  TObject* take() {
801  if( m_pObject ) {
802  return static_cast<TObject*>( get()->take().get() );
803  }
804  return m_pEntries;
805  }
806 
807  OdUInt32 size() const {
808  return m_nCount;
809  }
810 
811  TObject& at( OdUInt32 i ) const {
812  if( i < m_nCount ) {
813  return m_pEntries[i];
814  }
815  throw OdError_InvalidIndex();
816  }
817 
818  operator OdApcObjectPool* () { return get(); }
819  operator const OdApcObjectPool* () const { return get(); }
820 };
821 
822 #include "TD_PackPop.h"
823 
824 #endif //_ODRXTHREADPOOLSERVICE_INCLUDED_
#define ODA_ASSERT_ONCE(exp)
Definition: DebugStuff.h:51
#define NULL
Definition: GsProperties.h:177
unsigned int OdUInt32
#define ODRX_ABSTRACT
#define FIRSTDLL_EXPORT
Definition: RootExport.h:39
OdRxObjMod
Definition: RxObject.h:56
ptrdiff_t OdApcParamType
OdSmartPtr< OdApcGateway > OdApcGatewayPtr
OdSmartPtr< OdRxThreadPoolService > OdRxThreadPoolServicePtr
void(* OdApcEntryPointVoidParam)(OdApcParamType parameter)
OdApcMtQueueFlags
@ kMtQueueForceTopLevel
@ kMtQueueAllowExecByMain
@ kMtQueueNoFlags
@ kMtQueueLastFlag
@ kMtQueueForceNewThreads
OdSmartPtr< OdApcLoopedGateway > OdApcLoopedGatewayPtr
OdSmartPtr< OdApcObjectPool > OdApcObjectPoolPtr
void od_async_for_each(OdApcQueue *pQueue, It first, It last, Fn fn)
OdSmartPtr< OdApcEvent > OdApcEventPtr
OdSmartPtr< OdApcThread > OdApcThreadPtr
OdSmartPtr< OdApcDataReadWriteDispatcher > OdApcDataReadWriteDispatcherPtr
void(* OdApcEntryPointRxObjParam)(OdRxObject *parameter)
OdSmartPtr< OdApcAtom > OdApcAtomPtr
OdSmartPtr< OdApcQueue > OdApcQueuePtr
void(* ExecuteMainThreadFunc)(MainThreadFunc, void *)
void(* MainThreadFunc)(void *)
virtual void apcEntryPoint(OdRxObject *pMessage)
virtual void apcEntryPoint(OdApcParamType pMessage)
virtual void unlockFromInside()=0
virtual void lockFromInside()=0
virtual void reset()=0
virtual void set()=0
virtual void wait()=0
virtual void waitAndReset()=0
virtual void passBySecondary()=0
virtual void unlockByMain()=0
virtual void waitByMain()=0
virtual void lockByMain(OdUInt32 numThreads)=0
virtual void passByMainNoWait()=0
virtual void init(OdUInt32 numThreads)=0
virtual void waitByMain()=0
virtual void passBySecondary()=0
virtual void passByMain()=0
void init(OdRxThreadPoolService *pTP, int n=0, const TInitFn *pInitFn=0)
TObject & at(OdUInt32 i) const
virtual void put(OdRxObject *pObj)=0
virtual void reserve(OdUInt32 n)=0
virtual OdRxObjectPtr take()=0
void initST(int n, OdRxThreadPoolService *pTP)
OdApcQueueHelper(OdRxObject *pObject, OdRxObjMod m)
void initST(OdRxThreadPoolService *pThreadPool)
OdApcQueueHelper(const OdRxObject *pObject)
void initMT(OdRxThreadPoolService *pThreadPool)
void call(OdApcAtom *pAction, OdApcParamType param)
void call(OdApcAtom *pAction, OdRxObject *pParam=0)
OdApcQueueHelper(const OdApcQueue *pObject, OdRxObjMod m)
void setAtomPoolRef(OdApcObjectPool *pAtomPool)
OdApcQueueHelper(const OdBaseObjectPtr &pObject)
OdApcQueueHelper(const OdRxObjectPtr &pObject)
OdApcQueueHelper(const OdSmartPtr< OdApcQueue > &pObject)
OdApcQueueHelper(const OdApcQueue *pObject)
virtual void wait()=0
virtual void addEntryPoint(OdApcAtom *pRecipient, OdRxObject *pMessage=0)=0
virtual void executeMainThreadAction(MainThreadFunc mtFunc, void *pArg)=0
virtual void addEntryPoint(OdApcAtom *pRecipient, OdApcParamType pMessage)=0
virtual void setAtomPoolRef(OdApcObjectPool *pAtomPool)=0
virtual OdRxThreadPoolService & framework()=0
virtual void asyncProcCall(OdApcEntryPointRxObjParam ep, OdRxObject *parameter)=0
virtual unsigned int getId() const =0
virtual void asyncProcCall(OdApcEntryPointVoidParam ep, OdApcParamType parameter)=0
virtual void wait()=0
A::size_type size_type
Definition: OdArray.h:593
OdArray & setAt(size_type arrayIndex, const OdApcQueueHelper &value)
Definition: OdArray.h:1162
size_type size() const
Definition: OdArray.h:893
OdApcQueueHelper & at(size_type arrayIndex)
Definition: OdArray.h:1140
void resize(size_type logicalLength, const OdApcQueueHelper &value)
Definition: OdArray.h:834
virtual OdMutex * mutexForNext()=0
virtual void * next(OdUInt32 threadIndex, OdUInt32 &itemIndex)=0
virtual void apcEntryPoint(OdApcParamType threadIndex)
void for_each(OdApcQueue *pQueue, OdUInt32 nThreads=0)
virtual void doAction(OdUInt32 threadIndex, OdUInt32 itemIndex, void *pItem)=0
void for_each(OdApcQueue *pQueue, It first, It last, Fn fn)
OdRxObject * m_pObject
Definition: BaseObjectPtr.h:53
virtual OdApcThreadPtr newThread()=0
virtual int numThreads() const =0
virtual OdApcLoopedGatewayPtr newLoopedGateway()=0
virtual int numFreeThreads() const =0
virtual OdApcQueuePtr newSTQueue()=0
virtual int numCPUs() const =0
virtual void init()=0
virtual void externalThreadStop()=0
virtual void registerExternalThreads(unsigned nThreads, const unsigned *aThreads, unsigned nThreadAttribs=ThreadsCounter::kNoAttributes)=0
virtual OdApcObjectPoolPtr newObjectPool()=0
virtual void externalThreadStart()=0
ODRX_DECLARE_MEMBERS(OdRxThreadPoolService)
virtual void executeMainThreadAction(MainThreadFunc mtFunc, void *pArg)=0
virtual void setExternalMainThreadFunc(ExecuteMainThreadFunc execFunc)=0
virtual OdApcEventPtr newEvent()=0
virtual OdApcQueuePtr newMTQueue(unsigned nThreadAttributes=ThreadsCounter::kNoAttributes, int numThreads=0, OdUInt32 nFlags=kMtQueueNoFlags)=0
virtual void unregisterExternalThreads(unsigned nThreads, const unsigned *aThreads)=0
virtual ExecuteMainThreadFunc getExternalMainThreadFunc() const =0
virtual int numPhysicalCores() const =0
virtual OdApcGatewayPtr newGateway()=0
void release()
Definition: SmartPtr.h:264
const T * get() const
Definition: SmartPtr.h:326
typedef void(APIENTRYP PFNGLACTIVETEXTUREPROC)(GLenum texture)