30 #include "BESInternalError.h"
33 #include "DmrppRequestHandler.h"
34 #include "CurlHandlePool.h"
35 #include "DmrppArray.h"
36 #include "DmrppNames.h"
38 #include "SuperChunk.h"
40 #define prolog std::string("SuperChunk::").append(__func__).append("() - ")
42 #define SUPER_CHUNK_MODULE "dmrpp:3"
44 using std::stringstream;
51 std::mutex chunk_processing_thread_pool_mtx;
52 atomic_uint chunk_processing_thread_counter(0);
53 #define COMPUTE_THREADS "compute_threads"
73 void process_one_chunk(shared_ptr<Chunk> chunk, DmrppArray *array,
const vector<unsigned long long> &constrained_array_shape)
75 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
80 if (!array->is_filters_empty())
81 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
83 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
84 vector<unsigned long long> chunk_source_address(array->dimensions(), 0);
86 array->insert_chunk(0 , &target_element_address, &chunk_source_address, chunk,
87 constrained_array_shape);
89 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
111 void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk,
const vector<unsigned long long> &chunk_shape,
112 DmrppArray *array,
const vector<unsigned long long> &array_shape)
114 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
119 if (!array->is_filters_empty())
120 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
122 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
124 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
133 bool one_chunk_compute_thread(unique_ptr<one_chunk_args> args)
136 #if DMRPP_ENABLE_THREAD_TIMERS
137 stringstream timer_tag;
138 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
139 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" parent_sc: " << args->parent_super_chunk_id;
141 sw.start(timer_tag.str());
144 process_one_chunk(args->chunk, args->array, args->array_shape);
153 bool one_chunk_unconstrained_compute_thread(unique_ptr<one_chunk_unconstrained_args> args)
156 #if DMRPP_ENABLE_THREAD_TIMERS
157 stringstream timer_tag;
158 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
159 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" parent_sc: " << args->parent_super_chunk_id ;
161 sw.start(timer_tag.str());
163 process_one_chunk_unconstrained(args->chunk, args->chunk_shape, args->array, args->array_shape);
177 bool start_one_chunk_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_args> args) {
179 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
180 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads <<
" chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
181 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
182 chunk_processing_thread_counter++;
183 futures.push_back(std::async(std::launch::async, one_chunk_compute_thread, std::move(args)));
185 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Got std::future '" << futures.size() <<
186 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
201 bool start_one_chunk_unconstrained_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
203 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
204 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
205 futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread, std::move(args)));
206 chunk_processing_thread_counter++;
208 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Got std::future '" << futures.size() <<
209 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
236 void process_chunks_concurrent(
237 const string &super_chunk_id,
238 queue<shared_ptr<Chunk>> &chunks,
240 const vector<unsigned long long> &constrained_array_shape ){
243 list<future<bool>> futures;
246 bool future_finished =
true;
250 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
254 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
256 if (!chunks.empty()){
258 bool thread_started =
true;
259 while(thread_started && !chunks.empty()) {
260 auto chunk = chunks.front();
261 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Starting thread for " << chunk->to_string() << endl);
263 auto args = unique_ptr<one_chunk_args>(
new one_chunk_args(super_chunk_id, chunk, array, constrained_array_shape));
264 thread_started = start_one_chunk_compute_thread(futures, std::move(args));
266 if (thread_started) {
268 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"STARTED thread for " << chunk->to_string() << endl);
271 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Thread not started. args deleted, Chunk remains in queue.) " <<
272 "chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
" futures.size(): " << futures.size() << endl);
281 future_finished =
false;
286 while(!futures.empty()){
287 if(futures.back().valid())
288 futures.back().get();
321 void process_chunks_unconstrained_concurrent(
322 const string &super_chunk_id,
323 queue<shared_ptr<Chunk>> &chunks,
324 const vector<unsigned long long> &chunk_shape,
326 const vector<unsigned long long> &array_shape){
329 list<future<bool>> futures;
332 bool future_finished =
true;
336 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
340 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
342 if (!chunks.empty()){
344 bool thread_started =
true;
345 while(thread_started && !chunks.empty()) {
346 auto chunk = chunks.front();
347 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Starting thread for " << chunk->to_string() << endl);
349 auto args = unique_ptr<one_chunk_unconstrained_args>(
350 new one_chunk_unconstrained_args(super_chunk_id, chunk, array, array_shape, chunk_shape) );
351 thread_started = start_one_chunk_unconstrained_compute_thread(futures, std::move(args));
353 if (thread_started) {
355 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"STARTED thread for " << chunk->to_string() << endl);
358 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
359 " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
360 " futures.size(): " << futures.size() << endl);
369 future_finished =
false;
374 while(!futures.empty()){
375 if(futures.back().valid())
376 futures.back().get();
401 bool chunk_was_added =
false;
402 if(d_chunks.empty()){
403 d_chunks.push_back(candidate_chunk);
404 d_offset = candidate_chunk->get_offset();
405 d_size = candidate_chunk->get_size();
406 d_data_url = candidate_chunk->get_data_url();
407 chunk_was_added =
true;
409 else if(is_contiguous(candidate_chunk) ){
410 this->d_chunks.push_back(candidate_chunk);
411 d_size += candidate_chunk->get_size();
412 chunk_was_added =
true;
414 return chunk_was_added;
430 bool SuperChunk::is_contiguous(
const std::shared_ptr<Chunk> candidate_chunk) {
432 bool contiguous = candidate_chunk->get_data_url()->str() == d_data_url->str();
435 contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
448 void SuperChunk::map_chunks_to_buffer()
450 unsigned long long bindex = 0;
451 for(
const auto &chunk : d_chunks){
452 chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0,
false);
453 bindex += chunk->get_size();
456 msg <<
"ERROR The computed buffer index, " << bindex <<
" is larger than expected size of the SuperChunk. ";
457 msg <<
"d_size: " << d_size;
468 void SuperChunk::read_aggregate_bytes()
473 Chunk chunk(d_data_url,
"NOT_USED", d_size, d_offset);
475 chunk.set_read_buffer(d_read_buffer, d_size,0,
false);
477 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
479 throw BESInternalError(prolog +
"No more libcurl handles.", __FILE__, __LINE__);
483 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
486 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
491 if (d_size != chunk.get_bytes_read()) {
493 oss <<
"Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() <<
", expected: " << d_size;
505 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"SuperChunk (" << (
void **)
this <<
") has already been read! Returning." << endl);
512 d_read_buffer =
new char[d_size];
518 map_chunks_to_buffer();
523 read_aggregate_bytes();
529 for(
auto chunk : d_chunks){
530 chunk->set_is_read(
true);
531 chunk->set_bytes_read(chunk->get_size());
541 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
544 vector<unsigned long long> constrained_array_shape = d_parent_array->
get_shape(
true);
545 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ?
"true" :
"false") << endl);
546 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
548 if(!DmrppRequestHandler::d_use_compute_threads){
549 #if DMRPP_ENABLE_THREAD_TIMERS
551 sw.
start(prolog+
"Serial Chunk Processing. id: " + d_id);
553 for(
const auto &chunk :get_chunks()){
554 process_one_chunk(chunk,d_parent_array,constrained_array_shape);
558 #if DMRPP_ENABLE_THREAD_TIMERS
559 stringstream timer_name;
560 timer_name << prolog <<
"Concurrent Chunk Processing. id: " << d_id;
562 sw.
start(timer_name.str());
564 queue<shared_ptr<Chunk>> chunks_to_process;
565 for(
const auto &chunk:get_chunks())
566 chunks_to_process.push(chunk);
568 process_chunks_concurrent(d_id, chunks_to_process, d_parent_array, constrained_array_shape);
570 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
580 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
584 const vector<unsigned long long> array_shape = d_parent_array->
get_shape(
true);
588 if(!DmrppRequestHandler::d_use_compute_threads){
589 #if DMRPP_ENABLE_THREAD_TIMERS
591 sw.
start(prolog +
"Serial Chunk Processing. sc_id: " + d_id );
593 for(
auto &chunk :get_chunks()){
594 process_one_chunk_unconstrained(chunk, chunk_shape, d_parent_array, array_shape);
598 #if DMRPP_ENABLE_THREAD_TIMERS
599 stringstream timer_name;
600 timer_name << prolog <<
"Concurrent Chunk Processing. sc_id: " << d_id;
602 sw.
start(timer_name.str());
604 queue<shared_ptr<Chunk>> chunks_to_process;
605 for (
auto &chunk:get_chunks())
606 chunks_to_process.push(chunk);
608 process_chunks_unconstrained_concurrent(d_id,chunks_to_process, chunk_shape, d_parent_array, array_shape);
621 msg <<
"[SuperChunk: " << (
void **)
this;
622 msg <<
" offset: " << d_offset;
623 msg <<
" size: " << d_size ;
624 msg <<
" chunk_count: " << d_chunks.size();
629 for (
auto chunk: d_chunks) {
630 msg << chunk->to_string() << endl;
exception thrown if internal error encountered
virtual bool start(std::string name)
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
virtual void retrieve_data()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.
virtual void process_child_chunks()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...
virtual void process_child_chunks_unconstrained()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...