Sunday, September 4, 2016

HTTP chunked encoding using C++ and libcurl

HTTP chunked encoding is the way to transfer large amounts of data via HTTP. It is pretty easy to use with libcurl, if you have all the data in advance. In case you don't have all the data available when uploading, things get a bit tricky. This latter scenario is what this post will be focused on.

If you want to enable chunked encoding in libcurl, you just need to add the header "Transfer-Encoding: chunked" to the request, and curl will take care of everything after that.

For the scenario we're discussing, the one where you don't have all the data at hand when making a request with curl, things need to be handled a bit differently.
The function curl_easy_perform() performs a HTTP request. When curl_easy_perform() returns, the HTTP call is finished, all the data that was available before the call will be sent, and the call completes. Curl handles all the encoding part, for chunked encoding, as described here, so we just need to feed it the data.
Now, if we don't have all the data before the call, we will need to get more data during the curl_easy_perform() execution. This means that in the curl read callback, we will have to wait for new data, and send it as it arrives. This requires some manual synchronization.

I will post a simple C++ program which does this synchronization, in order to send a file read from the disk, 1000 bytes at a time, simulating the scenario we are following.

First, the headers and variables we are going to need:
#include <iostream>
#include <curl/curl.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>

static const int TIMEOUT = 2;
const std::string userpwd = "...";
CURL* g_pCurl = 0;
static pthread_cond_t cv_main;
static pthread_cond_t cv_processthread;
static pthread_mutex_t mp;
pthread_mutexattr_t Attr;
static bool bDataReady;
static bool bProcessed;
static bool bEnd;
pthread_t threadProcess;
//shared memory for communication between the curl read callback thread and the main thread
FILE* g_memData;
size_t g_dataRead;
Next is the write callback for curl, which we will need to write the HTTP response in a std::string object:
//curl write callback
static size_t write_data(void *buffer, size_t size, size_t nmemb, void *userp)
{
    std::string* pResponse = 0;
    if(userp != NULL){
        pResponse = reinterpret_cast<std::string*>(userp);
    }
    if(pResponse != NULL){
        *pResponse += (char*)buffer;
    }
    return size*nmemb;
}
After this, the main synchronization and chunked transfer algorithm that I will explain in detail below:
//curl read callback
static size_t read_callback(void *ptr, size_t size, size_t nmemb, void *stream)
{
    size_t retcode;

    do {
        pthread_mutex_lock(&mp);
        if(!bDataReady){
            while(!bDataReady){
                std::cout << "read_callback: waiting for main thread" << std::endl;
                timespec to;
                to.tv_sec = time(NULL) + TIMEOUT;
                to.tv_nsec = 0;
                if( ETIMEDOUT == pthread_cond_timedwait(&cv_processthread, &mp, &to) ){
                    std::cout << "read_callback: timed out" << std::endl;
                }
                if(bEnd){
                    std::cout << "read_callback: exiting" << std::endl;
                    break;
                }
            }
        }
        pthread_mutex_unlock(&mp);
        if(bEnd){
            std::cout << "read_callback: exiting function" << std::endl;
            return 0;
        }

        //read data
        retcode = fread(ptr, size, nmemb, (FILE*)stream);

        if(retcode == 0){
            bDataReady = false;
            bProcessed = true;
            //signal main thread
            pthread_cond_signal(&cv_main);
        }
    }while(retcode == 0);

    return retcode;
}

//separate curl thread funcyion for chunked transfer
static void* perform_work(void* argument)
{
    std::cout << "perform_work: thread started" << std::endl;

    //wait for the first chunk
    pthread_mutex_lock(&(mp));
    while(!bDataReady){
        std::cout << "perform_work: waiting for data" << std::endl;
        pthread_cond_wait(&cv_processthread, &mp);
    }
    pthread_mutex_unlock(&mp);

    //continue with the other chunk in the curl read callback
    std::string response;
    std::string header;
    if(g_pCurl)
    {
        CURLcode cc;
        curl_slist *slist = NULL;
        slist = curl_slist_append(slist, "Accept: text/xml");
        slist = curl_slist_append(slist, "Connection: keep-alive");
        slist = curl_slist_append(slist, "Transfer-Encoding: chunked");
        slist = curl_slist_append(slist, "Content-Type: application/octet-stream");

        std::string strUrl = "...";

        curl_easy_setopt(g_pCurl, CURLOPT_READFUNCTION, read_callback);
        curl_easy_setopt(g_pCurl, CURLOPT_UPLOAD, 1L);
        curl_easy_setopt(g_pCurl, CURLOPT_READDATA, g_memData);
        curl_easy_setopt(g_pCurl, CURLOPT_PUT, 1L);

        curl_easy_setopt(g_pCurl, CURLOPT_VERBOSE, 1);
        curl_easy_setopt(g_pCurl, CURLOPT_URL, strUrl.c_str());
        curl_easy_setopt(g_pCurl, CURLOPT_HTTPHEADER, slist);
        curl_easy_setopt(g_pCurl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
        curl_easy_setopt(g_pCurl, CURLOPT_USERPWD, userpwd.c_str());
        curl_easy_setopt(g_pCurl, CURLOPT_WRITEFUNCTION, write_data);
        curl_easy_setopt(g_pCurl, CURLOPT_WRITEDATA, &response);
        curl_easy_setopt(g_pCurl, CURLOPT_HEADER, 1);
        curl_easy_setopt(g_pCurl, CURLOPT_HEADERDATA, &header);

        cc = curl_easy_perform(g_pCurl);

        long errorCode = 0;
        curl_easy_getinfo (g_pCurl, CURLINFO_RESPONSE_CODE, &errorCode);
        if(CURLE_OK != cc || ( errorCode >= 200 ||
                               errorCode < 300 ) ){
            std::cout << "Error during curl_easy_perform: " << curl_easy_strerror(cc) << std::endl;
            return 0;
        }

        curl_slist_free_all(slist);
        curl_easy_reset(g_pCurl);
    }
    else{
        std::cout<< "curl object not initialized" << std::endl;
        return 0;
    }
    return 0;
}


int main()
{
    curl_global_init(CURL_GLOBAL_SSL);
    g_pCurl = curl_easy_init();

    const std::string testFile = "...";
    FILE* f = fopen(testFile.c_str(), "rb");
    if(f != NULL){
        static const size_t chunkSize = 1000;
        char buf[chunkSize];
        memset(buf, 0, chunkSize*sizeof(char));

        //initialize synchronization data
        {
            g_dataRead = 0;
            g_memData = 0;
            bDataReady = false;
            bProcessed = false;
            bEnd = false;

            //init cond variables
            int result_code = pthread_cond_init(&cv_main, NULL);
            if(result_code != 0){
                std::cout << "failed to create condition variable" << std::endl;
                curl_easy_cleanup(g_pCurl);
                curl_global_cleanup();
                return 1;
            }

            result_code = pthread_cond_init(&cv_processthread, NULL);
            if(result_code != 0){
                std::cout << "failed to create condition variable" << std::endl;
                pthread_cond_destroy(&cv_main);
                curl_easy_cleanup(g_pCurl);
                curl_global_cleanup();
                return 1;
            }

            pthread_mutexattr_init(&Attr);
            pthread_mutexattr_settype(&Attr, PTHREAD_MUTEX_RECURSIVE);

            pthread_mutex_init(&mp, &Attr);

            //create sender thread
            result_code = pthread_create(&threadProcess, NULL, perform_work, NULL);
            if(result_code != 0){
                std::cout << "main: failed to create thread" << std::endl;
                pthread_cond_destroy(&cv_main);
                pthread_cond_destroy(&cv_processthread);
                return 1;
            }
        }

        while( (g_dataRead = fread(buf, sizeof(char), chunkSize, f)) > 0){
            //using fmemopen in order to user fread in the curl callback
            g_memData = fmemopen(buf, chunkSize, "rb");
            if(NULL == g_memData){
                std::cout << "Could not open data in memory" << std::endl;
                return 1;
            }

            bDataReady = true;
            pthread_cond_signal(&cv_processthread);
            std::cout << " waiting for data to be processed" << std::endl;

            pthread_mutex_lock(&mp);
            while(!bProcessed){
                timespec to;
                to.tv_sec = time(NULL) + TIMEOUT;
                to.tv_nsec = 0;
                if( ETIMEDOUT == pthread_cond_timedwait(&cv_main, &mp, &to) ){
                    std::cout << " main: timed out" << std::endl;
                }
            }
            //reset all data
            bDataReady = false;
            bProcessed = false;
            fclose(g_memData);
            g_memData = 0;
            pthread_mutex_unlock(&mp);
        }

        //cleanup and wait for threads to exit
        bEnd = true;

        pthread_cond_signal(&cv_main);
        pthread_cond_signal(&cv_processthread);

        //wait for everyone
        pthread_join(threadProcess, NULL);
        pthread_cond_destroy(&cv_main);
        pthread_cond_destroy(&cv_processthread);
        pthread_mutex_destroy(&mp);

        fclose(f);
    }
    else{
        std::cout << "could not open file " << testFile << std::endl;
    }

    curl_easy_cleanup(g_pCurl);
    curl_global_cleanup();

    return 0;
}
The code should be self-explanatory, so I'll try to give only a high level explanation.
The function curl_easy_perform() requires a separate thread in order for it to block in waiting for data from the main thread.
The program above creates a new thread for curl_easy_perform() and then waits for main() to supply the data. When data is available, the read_callback() thread is unblocked and data is transferred, after which the read_callback() thread waits some more. This continues until there is no more data to send, at which point, the read_callback() function will return 0, and curl_easy_perform() will exit, ending the HTTP call.

No comments:

Post a Comment