00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_pipeline_H
00022 #define __TBB_pipeline_H
00023
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include "tbb_allocator.h"
00027 #include <cstddef>
00028
00029 namespace tbb {
00030
00031 class pipeline;
00032 class filter;
00033
00035 namespace internal {
00036
00037
00038 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00039
00040 typedef unsigned long Token;
00041 typedef long tokendiff_t;
00042 class stage_task;
00043 class input_buffer;
00044 class pipeline_root_task;
00045 class pipeline_cleaner;
00046
00047 }
00048
00049 namespace interface5 {
00050 template<typename T, typename U> class filter_t;
00051
00052 namespace internal {
00053 class pipeline_proxy;
00054 }
00055 }
00056
00058
00060
00061 class filter: internal::no_copy {
00062 private:
00064 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00065
00067 static const unsigned char filter_is_serial = 0x1;
00068
00070
00072 static const unsigned char filter_is_out_of_order = 0x1<<4;
00073
00075 static const unsigned char filter_is_bound = 0x1<<5;
00076
00078 static const unsigned char exact_exception_propagation =
00079 #if TBB_USE_CAPTURED_EXCEPTION
00080 0x0;
00081 #else
00082 0x1<<7;
00083 #endif
00084
00085 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00086 static const unsigned char version_mask = 0x7<<1;
00087 public:
00088 enum mode {
00090 parallel = current_version | filter_is_out_of_order,
00092 serial_in_order = current_version | filter_is_serial,
00094 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00096 serial = serial_in_order
00097 };
00098 protected:
00099 filter( bool is_serial_ ) :
00100 next_filter_in_pipeline(not_in_pipeline()),
00101 my_input_buffer(NULL),
00102 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00103 prev_filter_in_pipeline(not_in_pipeline()),
00104 my_pipeline(NULL),
00105 next_segment(NULL)
00106 {}
00107
00108 filter( mode filter_mode ) :
00109 next_filter_in_pipeline(not_in_pipeline()),
00110 my_input_buffer(NULL),
00111 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00112 prev_filter_in_pipeline(not_in_pipeline()),
00113 my_pipeline(NULL),
00114 next_segment(NULL)
00115 {}
00116
00117 public:
00119 bool is_serial() const {
00120 return bool( my_filter_mode & filter_is_serial );
00121 }
00122
00124 bool is_ordered() const {
00125 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00126 }
00127
00129 bool is_bound() const {
00130 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00131 }
00132
00134
00135 virtual void* operator()( void* item ) = 0;
00136
00138
00139 virtual __TBB_EXPORTED_METHOD ~filter();
00140
00141 #if __TBB_TASK_GROUP_CONTEXT
00143
00145 virtual void finalize( void* ) {};
00146 #endif
00147
00148 private:
00150 filter* next_filter_in_pipeline;
00151
00153
00154
00155 bool has_more_work();
00156
00158
00159 internal::input_buffer* my_input_buffer;
00160
00161 friend class internal::stage_task;
00162 friend class internal::pipeline_root_task;
00163 friend class pipeline;
00164 friend class thread_bound_filter;
00165
00167 const unsigned char my_filter_mode;
00168
00170 filter* prev_filter_in_pipeline;
00171
00173 pipeline* my_pipeline;
00174
00176
00177 filter* next_segment;
00178 };
00179
00181
00182 class thread_bound_filter: public filter {
00183 public:
00184 enum result_type {
00185
00186 success,
00187
00188 item_not_available,
00189
00190 end_of_stream
00191 };
00192 protected:
00193 thread_bound_filter(mode filter_mode):
00194 filter(static_cast<mode>(filter_mode | filter::filter_is_bound | filter::exact_exception_propagation))
00195 {}
00196 public:
00198
00203 result_type __TBB_EXPORTED_METHOD try_process_item();
00204
00206
00210 result_type __TBB_EXPORTED_METHOD process_item();
00211
00212 private:
00214 result_type internal_process_item(bool is_blocking);
00215 };
00216
00218
00219 class pipeline {
00220 public:
00222 __TBB_EXPORTED_METHOD pipeline();
00223
00226 virtual __TBB_EXPORTED_METHOD ~pipeline();
00227
00229 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00230
00232 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00233
00234 #if __TBB_TASK_GROUP_CONTEXT
00236 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00237 #endif
00238
00240 void __TBB_EXPORTED_METHOD clear();
00241
00242 private:
00243 friend class internal::stage_task;
00244 friend class internal::pipeline_root_task;
00245 friend class filter;
00246 friend class thread_bound_filter;
00247 friend class internal::pipeline_cleaner;
00248 friend class tbb::interface5::internal::pipeline_proxy;
00249
00251 filter* filter_list;
00252
00254 filter* filter_end;
00255
00257 task* end_counter;
00258
00260 atomic<internal::Token> input_tokens;
00261
00263 atomic<internal::Token> token_counter;
00264
00266 bool end_of_input;
00267
00269 bool has_thread_bound_filters;
00270
00272 void remove_filter( filter& filter_ );
00273
00275 void __TBB_EXPORTED_METHOD inject_token( task& self );
00276
00277 #if __TBB_TASK_GROUP_CONTEXT
00279 void clear_filters();
00280 #endif
00281 };
00282
00283
00284
00285
00286
00287 namespace interface5 {
00288
00289 namespace internal {
00290 template<typename T, typename U, typename Body> class concrete_filter;
00291 }
00292
00294 class flow_control {
00295 bool is_pipeline_stopped;
00296 flow_control() { is_pipeline_stopped = false; }
00297 template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00298 public:
00299 void stop() { is_pipeline_stopped = true; }
00300 };
00301
00303 namespace internal {
00304
00305 template<typename T, typename U, typename Body>
00306 class concrete_filter: public tbb::filter {
00307 const Body& my_body;
00308
00309 typedef typename tbb::tbb_allocator<U> u_allocator;
00310 typedef typename tbb::tbb_allocator<T> t_allocator;
00311
00312 void* operator()(void* input) {
00313 T* temp_input = (T*)input;
00314
00315 U* output_u = u_allocator().allocate(1);
00316 void* output = (void*) new (output_u) U(my_body(*temp_input));
00317 t_allocator().destroy(temp_input);
00318 t_allocator().deallocate(temp_input,1);
00319 return output;
00320 }
00321
00322 public:
00323 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00324 };
00325
00326 template<typename U, typename Body>
00327 class concrete_filter<void,U,Body>: public filter {
00328 const Body& my_body;
00329
00330 typedef typename tbb::tbb_allocator<U> u_allocator;
00331
00332 void* operator()(void*) {
00333 flow_control control;
00334 U* output_u = u_allocator().allocate(1);
00335 (void) new (output_u) U(my_body(control));
00336 if(control.is_pipeline_stopped) {
00337 u_allocator().destroy(output_u);
00338 u_allocator().deallocate(output_u,1);
00339 output_u = NULL;
00340 }
00341 return (void*)output_u;
00342 }
00343 public:
00344 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00345 };
00346
00347 template<typename T, typename Body>
00348 class concrete_filter<T,void,Body>: public filter {
00349 const Body& my_body;
00350
00351 typedef typename tbb::tbb_allocator<T> t_allocator;
00352
00353 void* operator()(void* input) {
00354 T* temp_input = (T*)input;
00355 my_body(*temp_input);
00356 t_allocator().destroy(temp_input);
00357 t_allocator().deallocate(temp_input,1);
00358 return NULL;
00359 }
00360 public:
00361 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00362 };
00363
00364 template<typename Body>
00365 class concrete_filter<void,void,Body>: public filter {
00366 const Body& my_body;
00367
00369 void* operator()(void*) {
00370 flow_control control;
00371 my_body(control);
00372 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
00373 return output;
00374 }
00375 public:
00376 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00377 };
00378
00380
00381 class pipeline_proxy {
00382 tbb::pipeline my_pipe;
00383 public:
00384 pipeline_proxy( const filter_t<void,void>& filter_chain );
00385 ~pipeline_proxy() {
00386 while( filter* f = my_pipe.filter_list )
00387 delete f;
00388 }
00389 tbb::pipeline* operator->() { return &my_pipe; }
00390 };
00391
00393
00394 class filter_node: tbb::internal::no_copy {
00396 tbb::atomic<intptr_t> ref_count;
00397 protected:
00398 filter_node() {
00399 ref_count = 0;
00400 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00401 ++(__TBB_TEST_FILTER_NODE_COUNT);
00402 #endif
00403 }
00404 public:
00406 virtual void add_to( pipeline& ) = 0;
00408 void add_ref() {++ref_count;}
00410 void remove_ref() {
00411 __TBB_ASSERT(ref_count>0,"ref_count underflow");
00412 if( --ref_count==0 )
00413 delete this;
00414 }
00415 virtual ~filter_node() {
00416 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00417 --(__TBB_TEST_FILTER_NODE_COUNT);
00418 #endif
00419 }
00420 };
00421
00423 template<typename T, typename U, typename Body>
00424 class filter_node_leaf: public filter_node {
00425 const tbb::filter::mode mode;
00426 const Body body;
00427 void add_to( pipeline& p ) {
00428 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00429 p.add_filter( *f );
00430 }
00431 public:
00432 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00433 };
00434
00436 class filter_node_join: public filter_node {
00437 friend class filter_node;
00438 filter_node& left;
00439 filter_node& right;
00440 ~filter_node_join() {
00441 left.remove_ref();
00442 right.remove_ref();
00443 }
00444 void add_to( pipeline& p ) {
00445 left.add_to(p);
00446 right.add_to(p);
00447 }
00448 public:
00449 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00450 left.add_ref();
00451 right.add_ref();
00452 }
00453 };
00454
00455 }
00457
00459 template<typename T, typename U, typename Body>
00460 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00461 return new internal::filter_node_leaf<T,U,Body>(mode, body);
00462 }
00463
00464 template<typename T, typename V, typename U>
00465 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00466 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00467 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00468 return new internal::filter_node_join(*left.root,*right.root);
00469 }
00470
00472 template<typename T, typename U>
00473 class filter_t {
00474 typedef internal::filter_node filter_node;
00475 filter_node* root;
00476 filter_t( filter_node* root_ ) : root(root_) {
00477 root->add_ref();
00478 }
00479 friend class internal::pipeline_proxy;
00480 template<typename T_, typename U_, typename Body>
00481 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00482 template<typename T_, typename V_, typename U_>
00483 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00484 public:
00485 filter_t() : root(NULL) {}
00486 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00487 if( root ) root->add_ref();
00488 }
00489 template<typename Body>
00490 filter_t( tbb::filter::mode mode, const Body& body ) :
00491 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00492 root->add_ref();
00493 }
00494
00495 void operator=( const filter_t<T,U>& rhs ) {
00496
00497
00498 filter_node* old = root;
00499 root = rhs.root;
00500 if( root ) root->add_ref();
00501 if( old ) old->remove_ref();
00502 }
00503 ~filter_t() {
00504 if( root ) root->remove_ref();
00505 }
00506 void clear() {
00507
00508 if( root ) {
00509 filter_node* old = root;
00510 root = NULL;
00511 old->remove_ref();
00512 }
00513 }
00514 };
00515
00516 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00517 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
00518 filter_chain.root->add_to(my_pipe);
00519 }
00520
00521 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00522 #if __TBB_TASK_GROUP_CONTEXT
00523 , tbb::task_group_context& context
00524 #endif
00525 ) {
00526 internal::pipeline_proxy pipe(filter_chain);
00527
00528 pipe->run(max_number_of_live_tokens
00529 #if __TBB_TASK_GROUP_CONTEXT
00530 , context
00531 #endif
00532 );
00533 }
00534
00535 #if __TBB_TASK_GROUP_CONTEXT
00536 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00537 tbb::task_group_context context;
00538 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00539 }
00540 #endif // __TBB_TASK_GROUP_CONTEXT
00541
00542 }
00543
00544 using interface5::flow_control;
00545 using interface5::filter_t;
00546 using interface5::make_filter;
00547 using interface5::parallel_pipeline;
00548
00549 }
00550
00551 #endif