Return to httpclientcxx.cpp CVS log | Up to [OMI] / omi / deprecated / httpclientcxx |
File: [OMI] / omi / deprecated / httpclientcxx / httpclientcxx.cpp
(download)
Revision: 1.1.1.1 (vendor branch), Wed May 30 21:47:49 2012 UTC (12 years, 1 month ago) by mike Branch: TOG CVS Tags: OMI_1_0_2_Branch, OMI_1_0_1_PRE, OMI_1_0_1, OMI_1_0_0 Changes since 1.1: +0 -0 lines Initial Import |
/* **============================================================================== ** ** Open Management Infrastructure (OMI) ** ** Copyright (c) Microsoft Corporation ** ** Licensed under the Apache License, Version 2.0 (the "License"); you may not ** use this file except in compliance with the License. You may obtain a copy ** of the License at ** ** http://www.apache.org/licenses/LICENSE-2.0 ** ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABLITY OR NON-INFRINGEMENT. ** ** See the Apache 2 License for the specific language governing permissions ** and limitations under the License. ** **============================================================================== */ #include "httpclientcxx.h" #include <common.h> #include <base/atomic.h> #include <base/time.h> #include <protocol/httpclient.h> #include <protocol/thread.h> using namespace std; // callback for thread (run function) BEGIN_EXTERNC static void* MI_CALL _proc(void* ); // callback for user's funcitons executed in background thread static void threadDelegation(void* self, Message* message); // http callbacks static void httpClientCallbackOnStatus( ::HttpClient* http, void* callbackData, MI_Result result); static MI_Boolean httpClientCallbackOnResponse( ::HttpClient* http, void* callbackData, const HttpClientResponseHeader* headers, MI_Sint64 contentSize, MI_Boolean lastChunk, Page** data); END_EXTERNC HTTPCLIENT_BEGIN /* helper strucutre used to post messages from calling thread to background processing thread */ class NotifyItem { public: // data generic for all calls enum Type { CONNECT, START_REQUEST, DELETE_HTTP, SET_TIMEOUT } _type; class HttpClientRep* _rep; Message* _msg; /* Connect-specific */ std::string _host; unsigned short _port; bool _secure; /* start-request- specific */ std::string _verb; std::string _uri; std::map< std::string, std::string > _extraHeaders; std::vector< unsigned char > _data; // connect request NotifyItem( class HttpClientRep* rep, const char* host, unsigned short port, bool secure): _type(CONNECT), _rep(rep), _host(host), _port(port), _secure(secure) { _InitMsg(); } // start-request NotifyItem( class HttpClientRep* rep, const char* verb, const char* uri, const std::map< std::string, std::string >& extraHeaders, const std::vector< unsigned char >& data): _type(START_REQUEST), _rep(rep), _port(0), _secure(false), _verb(verb), _uri(uri), _extraHeaders(extraHeaders), _data(data) { _InitMsg(); } // delete-http item NotifyItem( class HttpClientRep* rep): _type(DELETE_HTTP), _rep(rep), _port(0), _secure(false) { _InitMsg(); } // set Timeout item NotifyItem( class HttpClientRep* rep, int /*timeout*/): _type(SET_TIMEOUT), _rep(rep), _port(0), _secure(false) { _InitMsg(); } ~NotifyItem() { Message_Release(_msg); } private: NotifyItem(const NotifyItem&); void operator=(const NotifyItem&); void _InitMsg() { _msg = __Message_New( NoOpReqTag, sizeof(NoOpReq), 0, 0); _msg->clientID = PtrToUint64(this); } }; /* Helper class - background thread operations */ class IOThread { public: IOThread(); ~IOThread(); bool Start(); /* delegate work to background thread */ bool PostItem(NotifyItem*item); //private: // not supported IOThread(const IOThread&); void operator = (const IOThread&); // impl // mirror funcitons for public API. // note: these functions are always called form context of background thread void ConnectTh(NotifyItem* item); void StartRequestTh(NotifyItem* item); void DeleteHttpTh(NotifyItem* item); void SetTimeoutTh(NotifyItem* item); // data ThreadHandle _th; Selector _selector; }; /* thread handle - used to store ref-counted pointer to IOThread */ class IOThreadHandle { struct Item { AtomicInt ref; IOThread t; Item() :ref(0){} }; Item* _p; void _AddRef() { if (_p) AtomicInc(&_p->ref); } void _Release() { if (_p && AtomicDec(&_p->ref)) { delete _p; } } public: // full set of ctors/dtors/assign operators ~IOThreadHandle() {_Release();} IOThreadHandle() : _p(0) {} IOThreadHandle(const IOThreadHandle& x) : _p(x._p) {_AddRef();} IOThreadHandle& operator =(const IOThreadHandle& x) { if (_p != x._p) { _Release(); _p = x._p; _AddRef(); } return *this; } // accessor IOThread* operator ->() { return &_p->t; } // allocator void Alloc() { _Release(); _p = new Item; _AddRef(); } }; // forward declaration static IOThreadHandle _GetThreadObj(); /* impl class */ class HttpClientRep { public: HttpClientRep(HttpClientCallback* callback) : _callback(callback), _timeoutMS(90 * 1000), _httpClient(0), _notify(false), _destroyed(false) { _th = _GetThreadObj(); } ~HttpClientRep() { if (_httpClient) HttpClient_Delete(_httpClient); } // data HttpClientCallback* _callback; IOThreadHandle _th; int _timeoutMS; ::HttpClient* _httpClient; bool _notify; bool _destroyed; }; /* ****************************************************** */ IOThread::IOThread() : _th(0) { } IOThread::~IOThread() { // notify about stopping! Selector_StopRunning(&_selector); if (_th) Thread_Destroy(_th, MI_TRUE); // clean up Selector_RemoveAllHandlers(&_selector); Selector_Destroy(&_selector); } bool IOThread::Start() { if (MI_RESULT_OK != Selector_Init(&_selector)) return false; Selector_SetAllowEmptyFlag(&_selector, MI_TRUE); if (MI_RESULT_OK != Thread_Create(_proc, this, &_th)) return false; return true; } bool IOThread::PostItem(NotifyItem*item) { MI_Result res = Selector_CallInIOThread(&_selector, threadDelegation, this, item->_msg); return res == MI_RESULT_OK; } void IOThread::ConnectTh(NotifyItem* item) { MI_Result res = HttpClient_New_Connector( &item->_rep->_httpClient, &_selector, item->_host.c_str(), item->_port, item->_secure, httpClientCallbackOnStatus, httpClientCallbackOnResponse, item->_rep); if (MI_RESULT_OK != res) { item->_rep->_callback->OnStatus(httpclient::FAILED); } HttpClient_SetTimeout(item->_rep->_httpClient, ((MI_Uint64)item->_rep->_timeoutMS) * 1000); } void IOThread::StartRequestTh(NotifyItem* item) { Page* c_data = 0; const char* c_verb = item->_verb.c_str(); HttpClientRequestHeaders c_headers; std::vector< std::string > headers_strings; std::vector< const char* > headers_pointers; memset(&c_headers, 0, sizeof(c_headers)); if (!item->_data.empty() > 0) { c_data = (Page*)malloc(item->_data.size() + sizeof(Page)); /* clear header */ memset(c_data, 0, sizeof(Page)); c_data->u.s.size = (unsigned int)item->_data.size(); memcpy(c_data+1, &item->_data[0], item->_data.size()); } if (!item->_extraHeaders.empty()) { // create array of strings for (std::map< std::string, std::string >::const_iterator it = item->_extraHeaders.begin(); it != item->_extraHeaders.end(); it++) { std::string s = it->first; s += ": "; s += it->second; headers_strings.push_back(s); } // create array of pointers for (size_t i = 0; i < headers_strings.size(); i++) { headers_pointers.push_back(headers_strings[i].c_str()); } // initialize c-struct: c_headers.size = headers_pointers.size(); c_headers.data = &headers_pointers[0]; } MI_Result res = HttpClient_StartRequest( item->_rep->_httpClient, c_verb, item->_uri.c_str(), &c_headers, &c_data); if (c_data) free(c_data); if (MI_RESULT_OK != res) { item->_rep->_callback->OnStatus(httpclient::FAILED); } } void IOThread::DeleteHttpTh(NotifyItem* item) { HttpClient_Delete(item->_rep->_httpClient); item->_rep->_httpClient = 0; item->_rep->_destroyed = true; } void IOThread::SetTimeoutTh(NotifyItem* item) { HttpClient_SetTimeout(item->_rep->_httpClient, ((MI_Uint64)item->_rep->_timeoutMS) * 1000); } static IOThreadHandle _GetThreadObj() { static IOThreadHandle s_obj; static int s_init = 0; static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER; /* check if we may need to init */ if (!s_init) { pthread_mutex_lock(&s_mutex); /* check if we really need to init or get here by race-condition */ if (!s_init) { s_obj.Alloc(); s_obj->Start(); s_init = 1; } pthread_mutex_unlock(&s_mutex); } return s_obj; } /* ******************************************** */ HttpClient::~HttpClient() { NotifyItem* item = new NotifyItem(_rep); if (_rep->_th->PostItem(item)) { /* wait for thread to complete operation */ while (!_rep->_destroyed) Time_Sleep(50); } delete _rep; } HttpClient::HttpClient(HttpClientCallback* callback) { _rep = new HttpClientRep(callback); } Result HttpClient::Connect( const char* host, unsigned short port, bool secure) { NotifyItem* item = new NotifyItem(_rep, host, port, secure); if (!_rep->_th->PostItem(item)) { delete item; return httpclient::FAILED; } return httpclient::OKAY; } Result HttpClient::StartRequest( const char* verb, const char* uri, const std::map< std::string, std::string >& extraHeaders, const std::vector< unsigned char >& data, bool blockUntilCompleted) { NotifyItem* item = new NotifyItem(_rep, verb, uri, extraHeaders, data); if (!_rep->_th->PostItem(item)) { delete item; return httpclient::FAILED; } /* wait for thread to complete operation */ while (blockUntilCompleted && !_rep->_notify) Time_Sleep(50); if (blockUntilCompleted) _rep->_notify = false; return OKAY; } void HttpClient::SetOperationTimeout( int timeoutMS) { _rep->_timeoutMS = timeoutMS; NotifyItem* item = new NotifyItem(_rep, timeoutMS); if (!_rep->_th->PostItem(item)) { delete item; } } HTTPCLIENT_END static void* _proc(void* self) { httpclient::IOThread* pThis = (httpclient::IOThread*)self; // keep runnning until terminated Selector_Run(&pThis->_selector, TIME_NEVER); return 0; } static void threadDelegation(void* self, Message* message) { httpclient::IOThread* pThis = (httpclient::IOThread*)self; httpclient::NotifyItem* item =(httpclient::NotifyItem*)Uint64ToPtr(message->clientID); switch (item->_type) { case httpclient::NotifyItem::CONNECT: pThis->ConnectTh(item); break; case httpclient::NotifyItem::START_REQUEST: pThis->StartRequestTh(item); break; case httpclient::NotifyItem::DELETE_HTTP: pThis->DeleteHttpTh(item); break; case httpclient::NotifyItem::SET_TIMEOUT: pThis->SetTimeoutTh(item); break; default: assert(!"unexpected item type"); break; } delete item; } static void httpClientCallbackOnStatus( ::HttpClient* http, void* callbackData, MI_Result result) { httpclient::HttpClientRep* rep = (httpclient::HttpClientRep*)callbackData; httpclient::Result user_res= httpclient::FAILED; if (MI_RESULT_OK == result) user_res= httpclient::OKAY; else if (MI_RESULT_TIME_OUT == result) user_res= httpclient::TIMEOUT; rep->_callback->OnStatus(user_res); rep->_notify = true; } static MI_Boolean httpClientCallbackOnResponse( ::HttpClient* http, void* callbackData, const HttpClientResponseHeader* headers, MI_Sint64 contentSize, MI_Boolean lastChunk, Page** data) { httpclient::HttpClientRep* rep = (httpclient::HttpClientRep*)callbackData; if (headers) { std::map< std::string, std::string > user_headers; for (MI_Uint32 i = 0; i < headers->sizeHeaders; i++) { user_headers[headers->headers[i].name] = headers->headers[i].value; } if (!rep->_callback->OnResponseHeader( headers->httpError, user_headers, (int)contentSize)) return MI_FALSE; } if (data && *data) { std::vector< unsigned char > user_data( (unsigned char*) ((*data)+1), ((unsigned char*)((*data)+1)) + (*data)->u.s.size ); if (!rep->_callback->OnResponseData( user_data, lastChunk)) return MI_FALSE; } return MI_TRUE; }
ViewCVS 0.9.2 |