If you want to enable chunked encoding in libcurl, you just need to add the header "Transfer-Encoding: chunked" to the request, and curl will take care of everything after that.
For the scenario we're discussing, the one where you don't have all the data at hand when making a request with curl, things need to be handled a bit differently.
The function curl_easy_perform() performs a HTTP request. When curl_easy_perform() returns, the HTTP call is finished, all the data that was available before the call will be sent, and the call completes. Curl handles all the encoding part, for chunked encoding, as described here, so we just need to feed it the data.
Now, if we don't have all the data before the call, we will need to get more data during the curl_easy_perform() execution. This means that in the curl read callback, we will have to wait for new data, and send it as it arrives. This requires some manual synchronization.
I will post a simple C++ program which does this synchronization, in order to send a file read from the disk, 1000 bytes at a time, simulating the scenario we are following.
First, the headers and variables we are going to need:
#include <iostream> #include <curl/curl.h> #include <string.h> #include <pthread.h> #include <time.h> #include <errno.h> static const int TIMEOUT = 2; const std::string userpwd = "..."; CURL* g_pCurl = 0; static pthread_cond_t cv_main; static pthread_cond_t cv_processthread; static pthread_mutex_t mp; pthread_mutexattr_t Attr; static bool bDataReady; static bool bProcessed; static bool bEnd; pthread_t threadProcess; //shared memory for communication between the curl read callback thread and the main thread FILE* g_memData; size_t g_dataRead;Next is the write callback for curl, which we will need to write the HTTP response in a std::string object:
//curl write callback static size_t write_data(void *buffer, size_t size, size_t nmemb, void *userp) { std::string* pResponse = 0; if(userp != NULL){ pResponse = reinterpret_cast<std::string*>(userp); } if(pResponse != NULL){ *pResponse += (char*)buffer; } return size*nmemb; }After this, the main synchronization and chunked transfer algorithm that I will explain in detail below:
//curl read callback static size_t read_callback(void *ptr, size_t size, size_t nmemb, void *stream) { size_t retcode; do { pthread_mutex_lock(&mp); if(!bDataReady){ while(!bDataReady){ std::cout << "read_callback: waiting for main thread" << std::endl; timespec to; to.tv_sec = time(NULL) + TIMEOUT; to.tv_nsec = 0; if( ETIMEDOUT == pthread_cond_timedwait(&cv_processthread, &mp, &to) ){ std::cout << "read_callback: timed out" << std::endl; } if(bEnd){ std::cout << "read_callback: exiting" << std::endl; break; } } } pthread_mutex_unlock(&mp); if(bEnd){ std::cout << "read_callback: exiting function" << std::endl; return 0; } //read data retcode = fread(ptr, size, nmemb, (FILE*)stream); if(retcode == 0){ bDataReady = false; bProcessed = true; //signal main thread pthread_cond_signal(&cv_main); } }while(retcode == 0); return retcode; } //separate curl thread funcyion for chunked transfer static void* perform_work(void* argument) { std::cout << "perform_work: thread started" << std::endl; //wait for the first chunk pthread_mutex_lock(&(mp)); while(!bDataReady){ std::cout << "perform_work: waiting for data" << std::endl; pthread_cond_wait(&cv_processthread, &mp); } pthread_mutex_unlock(&mp); //continue with the other chunk in the curl read callback std::string response; std::string header; if(g_pCurl) { CURLcode cc; curl_slist *slist = NULL; slist = curl_slist_append(slist, "Accept: text/xml"); slist = curl_slist_append(slist, "Connection: keep-alive"); slist = curl_slist_append(slist, "Transfer-Encoding: chunked"); slist = curl_slist_append(slist, "Content-Type: application/octet-stream"); std::string strUrl = "..."; curl_easy_setopt(g_pCurl, CURLOPT_READFUNCTION, read_callback); curl_easy_setopt(g_pCurl, CURLOPT_UPLOAD, 1L); curl_easy_setopt(g_pCurl, CURLOPT_READDATA, g_memData); curl_easy_setopt(g_pCurl, CURLOPT_PUT, 1L); curl_easy_setopt(g_pCurl, CURLOPT_VERBOSE, 1); curl_easy_setopt(g_pCurl, CURLOPT_URL, strUrl.c_str()); curl_easy_setopt(g_pCurl, CURLOPT_HTTPHEADER, slist); curl_easy_setopt(g_pCurl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); curl_easy_setopt(g_pCurl, CURLOPT_USERPWD, userpwd.c_str()); curl_easy_setopt(g_pCurl, CURLOPT_WRITEFUNCTION, write_data); curl_easy_setopt(g_pCurl, CURLOPT_WRITEDATA, &response); curl_easy_setopt(g_pCurl, CURLOPT_HEADER, 1); curl_easy_setopt(g_pCurl, CURLOPT_HEADERDATA, &header); cc = curl_easy_perform(g_pCurl); long errorCode = 0; curl_easy_getinfo (g_pCurl, CURLINFO_RESPONSE_CODE, &errorCode); if(CURLE_OK != cc || ( errorCode >= 200 || errorCode < 300 ) ){ std::cout << "Error during curl_easy_perform: " << curl_easy_strerror(cc) << std::endl; return 0; } curl_slist_free_all(slist); curl_easy_reset(g_pCurl); } else{ std::cout<< "curl object not initialized" << std::endl; return 0; } return 0; } int main() { curl_global_init(CURL_GLOBAL_SSL); g_pCurl = curl_easy_init(); const std::string testFile = "..."; FILE* f = fopen(testFile.c_str(), "rb"); if(f != NULL){ static const size_t chunkSize = 1000; char buf[chunkSize]; memset(buf, 0, chunkSize*sizeof(char)); //initialize synchronization data { g_dataRead = 0; g_memData = 0; bDataReady = false; bProcessed = false; bEnd = false; //init cond variables int result_code = pthread_cond_init(&cv_main, NULL); if(result_code != 0){ std::cout << "failed to create condition variable" << std::endl; curl_easy_cleanup(g_pCurl); curl_global_cleanup(); return 1; } result_code = pthread_cond_init(&cv_processthread, NULL); if(result_code != 0){ std::cout << "failed to create condition variable" << std::endl; pthread_cond_destroy(&cv_main); curl_easy_cleanup(g_pCurl); curl_global_cleanup(); return 1; } pthread_mutexattr_init(&Attr); pthread_mutexattr_settype(&Attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&mp, &Attr); //create sender thread result_code = pthread_create(&threadProcess, NULL, perform_work, NULL); if(result_code != 0){ std::cout << "main: failed to create thread" << std::endl; pthread_cond_destroy(&cv_main); pthread_cond_destroy(&cv_processthread); return 1; } } while( (g_dataRead = fread(buf, sizeof(char), chunkSize, f)) > 0){ //using fmemopen in order to user fread in the curl callback g_memData = fmemopen(buf, chunkSize, "rb"); if(NULL == g_memData){ std::cout << "Could not open data in memory" << std::endl; return 1; } bDataReady = true; pthread_cond_signal(&cv_processthread); std::cout << " waiting for data to be processed" << std::endl; pthread_mutex_lock(&mp); while(!bProcessed){ timespec to; to.tv_sec = time(NULL) + TIMEOUT; to.tv_nsec = 0; if( ETIMEDOUT == pthread_cond_timedwait(&cv_main, &mp, &to) ){ std::cout << " main: timed out" << std::endl; } } //reset all data bDataReady = false; bProcessed = false; fclose(g_memData); g_memData = 0; pthread_mutex_unlock(&mp); } //cleanup and wait for threads to exit bEnd = true; pthread_cond_signal(&cv_main); pthread_cond_signal(&cv_processthread); //wait for everyone pthread_join(threadProcess, NULL); pthread_cond_destroy(&cv_main); pthread_cond_destroy(&cv_processthread); pthread_mutex_destroy(&mp); fclose(f); } else{ std::cout << "could not open file " << testFile << std::endl; } curl_easy_cleanup(g_pCurl); curl_global_cleanup(); return 0; }The code should be self-explanatory, so I'll try to give only a high level explanation.
The function curl_easy_perform() requires a separate thread in order for it to block in waiting for data from the main thread.
The program above creates a new thread for curl_easy_perform() and then waits for main() to supply the data. When data is available, the read_callback() thread is unblocked and data is transferred, after which the read_callback() thread waits some more. This continues until there is no more data to send, at which point, the read_callback() function will return 0, and curl_easy_perform() will exit, ending the HTTP call.
No comments:
Post a Comment