00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_priority_queue_H
00022 #define __TBB_concurrent_priority_queue_H
00023
00024 #include "atomic.h"
00025 #include "cache_aligned_allocator.h"
00026 #include "tbb_exception.h"
00027 #include "tbb_stddef.h"
00028 #include "tbb_profiling.h"
00029 #include <iterator>
00030 #include <functional>
00031
00032 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
00033 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
00034 #endif
00035
00036 namespace tbb {
00037 namespace interface5 {
00038
00040 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
00041 class concurrent_priority_queue {
00042 public:
00044 typedef T value_type;
00045
00047 typedef T& reference;
00048
00050 typedef const T& const_reference;
00051
00053 typedef size_t size_type;
00054
00056 typedef ptrdiff_t difference_type;
00057
00059 typedef A allocator_type;
00060
00062 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : my_helper(a) {
00063 internal_construct(0);
00064 }
00065
00067 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : my_helper(a) {
00068 internal_construct(init_capacity);
00069 }
00070
00072 template<typename InputIterator>
00073 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : my_helper(a)
00074 {
00075 internal_iterator_construct(begin, end, typename std::iterator_traits<InputIterator>::iterator_category());
00076 }
00077
00079
00080 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a = allocator_type()) : my_helper(a)
00081 {
00082 internal_construct(src.my_size);
00083 my_size = src.my_size;
00084 mark = src.mark;
00085 __TBB_TRY {
00086 internal_copy(src.data, data, my_size);
00087 } __TBB_CATCH(...) {
00088 my_helper.deallocate(data, my_capacity);
00089 __TBB_RETHROW();
00090 }
00091 heapify();
00092 }
00093
00095
00096 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
00097 if (this !=&src) {
00098 concurrent_priority_queue copy(src);
00099 copy.swap(*this);
00100 }
00101 return *this;
00102 }
00103
00105 ~concurrent_priority_queue() { internal_destroy(); }
00106
00108
00109 bool empty() const { return my_size==0; }
00110
00112
00113 size_type size() const { return my_size; }
00114
00116
00117 size_type capacity() const {
00118 return my_capacity;
00119 }
00120
00122 void push(const_reference elem) {
00123 cpq_operation op_data(elem, PUSH_OP);
00124 insert_handle_wait(&op_data);
00125 if (op_data.result == FAILED) {
00126 tbb::internal::throw_exception(tbb::internal::eid_bad_alloc);
00127 }
00128 }
00129
00131
00133 bool try_pop(reference elem) {
00134 cpq_operation op_data(POP_OP);
00135 op_data.elem = &elem;
00136 insert_handle_wait(&op_data);
00137 return op_data.result==SUCCESS;
00138 }
00139
00141 void reserve(size_type new_cap) {
00142 cpq_operation op_data(RESERVE_OP);
00143 op_data.sz = new_cap;
00144 insert_handle_wait(&op_data);
00145 if (op_data.result == FAILED) {
00146 tbb::internal::throw_exception(tbb::internal::eid_bad_alloc);
00147 }
00148 }
00149
00151
00153 void clear() {
00154 for (size_type i=my_size; i>0; --i) {
00155 data[i-1].~value_type();
00156 }
00157 my_size = 0;
00158 mark = 0;
00159 }
00160
00162 void shrink_to_fit() {
00163 internal_reserve(my_size);
00164 }
00165
00167 void swap(concurrent_priority_queue& q) {
00168 std::swap(data, q.data);
00169 std::swap(my_size, q.my_size);
00170 std::swap(my_capacity, q.my_capacity);
00171 std::swap(mark, q.mark);
00172 std::swap(my_helper, q.my_helper);
00173 }
00174
00176 allocator_type get_allocator() const { return my_helper; }
00177
00178 private:
00179 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, RESERVE_OP};
00180 enum operation_status { WAITING = 0, SUCCESS, FAILED };
00181 class cpq_operation {
00182 public:
00183 operation_type type;
00184 uintptr_t result;
00185 union {
00186 value_type *elem;
00187 size_type sz;
00188 };
00189 cpq_operation *next;
00190 cpq_operation(const_reference e, operation_type t) :
00191 type(t), result(WAITING), elem(const_cast<value_type*>(&e)), next(NULL) {}
00192 cpq_operation(operation_type t) : type(t), result(WAITING), next(NULL) {}
00193 };
00194
00196 tbb::atomic<cpq_operation *> operation_list;
00198 char padding1[ tbb::internal::NFS_MaxLineSize - sizeof(tbb::atomic<cpq_operation *>)];
00199
00201 uintptr_t handler_busy;
00203 size_type my_size;
00205 size_type mark;
00207 char padding2[ tbb::internal::NFS_MaxLineSize - sizeof(uintptr_t) - 2*sizeof(size_type)];
00208
00210 size_type my_capacity;
00212
00230 value_type *data;
00232 struct helper_type : public allocator_type {
00234 helper_type(allocator_type const& a) : allocator_type(a) {}
00236 Compare compare;
00237 };
00238 helper_type my_helper;
00239
00241 void internal_construct(size_type init_sz) {
00242 my_size = my_capacity = mark = 0;
00243 data = NULL;
00244 operation_list = NULL;
00245 handler_busy = 0;
00246 internal_reserve(init_sz);
00247 }
00248
00250 template <typename ForwardIterator>
00251 void internal_iterator_construct(ForwardIterator begin, ForwardIterator end, std::forward_iterator_tag) {
00252 internal_construct(std::distance(begin, end));
00253 my_size = my_capacity;
00254 size_type i=0;
00255 __TBB_TRY {
00256 for(; begin != end; ++begin, ++i)
00257 new (&data[i]) value_type(*begin);
00258 } __TBB_CATCH(...) {
00259 my_size = i;
00260 clear();
00261 my_helper.deallocate(data, my_capacity);
00262 __TBB_RETHROW();
00263 }
00264 heapify();
00265 }
00266
00268 template <typename InputIterator>
00269 void internal_iterator_construct(InputIterator begin, InputIterator end, std::input_iterator_tag) {
00270 internal_construct(32);
00271 size_type i=0;
00272 __TBB_TRY {
00273 for(; begin != end; ++begin, ++i) {
00274 if (i>=my_capacity)
00275 internal_reserve(my_capacity<<1);
00276 new (&data[i]) value_type(*begin);
00277 ++my_size;
00278 }
00279 } __TBB_CATCH(...) {
00280 clear();
00281 my_helper.deallocate(data, my_capacity);
00282 __TBB_RETHROW();
00283 }
00284 heapify();
00285 shrink_to_fit();
00286 }
00287
00289 void internal_destroy() {
00290 #if TBB_USE_ASSERT
00291 cpq_operation *op_list = operation_list.fetch_and_store((cpq_operation *)(internal::poisoned_ptr));
00292 __TBB_ASSERT(op_list==NULL,"concurrent_priority_queue destroyed with pending operations.\n");
00293 __TBB_ASSERT(!handler_busy,"concurrent_priority_queue destroyed with pending operations.\n");
00294 #endif
00295 clear();
00296 if (data) my_helper.deallocate(data, my_capacity);
00297 }
00298
00300
00301 void internal_reserve(size_type desired_capacity);
00302
00304
00305 void internal_copy(const value_type *src, value_type *dst, size_type sz) {
00306 size_type i=0;
00307 __TBB_TRY {
00308 for (; i<sz; ++i) new (&dst[i]) value_type(src[i]);
00309 } __TBB_CATCH(...) {
00310
00311 for (; i>0; --i) {
00312 dst[i-1].~value_type();
00313 }
00314 __TBB_RETHROW();
00315 }
00316 }
00317
00319 void heapify();
00320
00322
00323 void reheap();
00324
00326 void handle_operations();
00327
00329 void insert_handle_wait(cpq_operation *op) {
00330 cpq_operation *res = operation_list, *tmp;
00331
00332 using namespace tbb::internal;
00333
00334 __TBB_ASSERT(operation_list!=(cpq_operation *)poisoned_ptr, "Attempt to use destroyed concurrent_priority_queue.\n");
00335
00336 do {
00337 op->next = tmp = res;
00338
00339
00340
00341
00342 call_itt_notify(releasing, &operation_list+1);
00343 } while ((res = operation_list.compare_and_swap(op, tmp)) != tmp);
00344 if (!tmp) {
00345
00346
00347 call_itt_notify(acquired, &operation_list);
00348 handle_operations();
00349 __TBB_ASSERT(op->result, NULL);
00350 }
00351 else {
00352 call_itt_notify(prepare, &(op->result));
00353 spin_wait_while_eq(op->result, uintptr_t(WAITING));
00354 itt_load_word_with_acquire(op->result);
00355 }
00356 }
00357 };
00358
00360 template <typename T, typename Compare, typename A>
00361 void concurrent_priority_queue<T, Compare, A>::internal_reserve(size_type desired_capacity) {
00362 value_type *tmp_data = NULL;
00363
00364
00365 if (desired_capacity<my_size) desired_capacity = my_size;
00366
00367 if (desired_capacity==0) {
00368
00369 __TBB_ASSERT(my_size==0, NULL);
00370 __TBB_ASSERT(mark==0, NULL);
00371 if (data) my_helper.deallocate(data, my_capacity);
00372 data = NULL;
00373 my_capacity = 0;
00374 return;
00375 }
00376
00377 tmp_data = static_cast<value_type*>(my_helper.allocate(desired_capacity));
00378 if( !tmp_data )
00379 tbb::internal::throw_exception(tbb::internal::eid_bad_alloc);
00380 if (data) {
00381
00382 __TBB_TRY {
00383 internal_copy(data, tmp_data, my_size);
00384 } __TBB_CATCH(...) {
00385 my_helper.deallocate(tmp_data, desired_capacity);
00386 __TBB_RETHROW();
00387 }
00388
00389 for (size_type i=my_size; i>0; --i) {
00390 data[i-1].~value_type();
00391 }
00392 my_helper.deallocate(data, my_capacity);
00393 }
00394
00395
00396 data = tmp_data;
00397 my_capacity = desired_capacity;
00398 }
00399
00401 template <typename T, typename Compare, typename A>
00402 void concurrent_priority_queue<T, Compare, A>::heapify() {
00403 value_type *loc = data;
00404
00405 if (!mark) mark = 1;
00406 for (; mark<my_size; ++mark) {
00407 size_type cur_pos = mark;
00408 value_type to_place = loc[mark];
00409 do {
00410 size_type parent = (cur_pos-1)>>1;
00411 if (!my_helper.compare(loc[parent], to_place)) break;
00412 loc[cur_pos] = loc[parent];
00413 cur_pos = parent;
00414 } while( cur_pos );
00415 loc[cur_pos] = to_place;
00416 }
00417 }
00418
00420
00422 template <typename T, typename Compare, typename A>
00423 void concurrent_priority_queue<T, Compare, A>::reheap() {
00424 size_type cur_pos=0, child=1;
00425 value_type *loc = data;
00426
00427 while (child < mark) {
00428 size_type target = child;
00429 if (child+1 < mark && my_helper.compare(loc[child], loc[child+1])) ++target;
00430
00431 if (my_helper.compare(loc[target], loc[my_size])) break;
00432 loc[cur_pos] = loc[target];
00433 cur_pos = target;
00434 child = (cur_pos<<1)+1;
00435 }
00436 loc[cur_pos] = loc[my_size];
00437 }
00438
00439 template <typename T, typename Compare, typename A>
00440 void concurrent_priority_queue<T, Compare, A>::handle_operations() {
00441 cpq_operation *op_list, *tmp, *pop_list=NULL;
00442
00443 using namespace tbb::internal;
00444
00445
00446
00447
00448
00449
00450
00451 call_itt_notify(prepare, &handler_busy);
00452 spin_wait_until_eq(handler_busy, uintptr_t(0));
00453 call_itt_notify(acquired, &handler_busy);
00454
00455 __TBB_store_with_release(handler_busy, 1);
00456
00457
00458
00459
00460
00461 call_itt_notify(releasing, &operation_list);
00462 op_list = operation_list.fetch_and_store(NULL);
00463
00464
00465
00466
00467 call_itt_notify(acquired, &operation_list+1);
00468
00469
00470 while (op_list) {
00471 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
00472 tmp = op_list;
00473 op_list = op_list->next;
00474 if (tmp->type == PUSH_OP) {
00475 __TBB_TRY {
00476 if (my_size >= my_capacity) internal_reserve(my_capacity?my_capacity<<1:1);
00477 new (&data[my_size]) value_type(*(tmp->elem));
00478 ++my_size;
00479 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00480 } __TBB_CATCH(...) {
00481 itt_store_word_with_release(tmp->result, uintptr_t(FAILED));
00482 }
00483 }
00484 else if (tmp->type == POP_OP) {
00485 if (!my_size) {
00486 itt_store_word_with_release(tmp->result, uintptr_t(FAILED));
00487 }
00488 else {
00489 if (mark < my_size && my_helper.compare(data[0], data[my_size-1])) {
00490
00491 *(tmp->elem) = data[my_size-1];
00492 data[my_size-1].~value_type();
00493 --my_size;
00494 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00495 __TBB_ASSERT(mark<=my_size, NULL);
00496 }
00497 else {
00498 tmp->next = pop_list;
00499 pop_list = tmp;
00500 }
00501 }
00502 }
00503 else {
00504 __TBB_ASSERT(tmp->type == RESERVE_OP, NULL);
00505 __TBB_TRY {
00506 internal_reserve(tmp->sz);
00507 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00508 } __TBB_CATCH(...) {
00509 itt_store_word_with_release(tmp->result, uintptr_t(FAILED));
00510 };
00511 }
00512 }
00513
00514
00515 while (pop_list) {
00516 tmp = pop_list;
00517 pop_list = pop_list->next;
00518 __TBB_ASSERT(tmp->type == POP_OP, NULL);
00519 if (!my_size) {
00520 itt_store_word_with_release(tmp->result, uintptr_t(FAILED));
00521 }
00522 else {
00523 __TBB_ASSERT(mark<=my_size, NULL);
00524 if (mark < my_size && my_helper.compare(data[0], data[my_size-1])) {
00525
00526 *(tmp->elem) = data[my_size-1];
00527 --my_size;
00528 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00529 data[my_size].~value_type();
00530 }
00531 else {
00532 *(tmp->elem) = data[0];
00533 if (mark == my_size) --mark;
00534 --my_size;
00535 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00536 data[0] = data[my_size];
00537 if (my_size > 1)
00538 reheap();
00539 data[my_size].~value_type();
00540 }
00541 __TBB_ASSERT(mark<=my_size, NULL);
00542 }
00543 }
00544
00545
00546 if (mark<my_size) heapify();
00547 __TBB_ASSERT(mark<=my_size, NULL);
00548
00549
00550 itt_store_word_with_release(handler_busy, uintptr_t(0));
00551 }
00552
00553 }
00554
00555 using interface5::concurrent_priority_queue;
00556
00557 }
00558
00559
00560 #endif