123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- #include "grpcclientthread.h"
- grpcclientthread * ggrpcclient;
- #ifdef Android
- #include "adcintelligentshow.h"
- extern ADCIntelligentShow * gAShow;
- #endif
- void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
- {
- ggrpcclient->UpdateData(strdata,nSize,strmemname);
- // std::cout<<"name is "<<strmemname<<std::endl;
- }
- grpcclientthread::grpcclientthread()
- {
- ggrpcclient = this;
- }
- void grpcclientthread::dec_yaml(const char * stryamlpath)
- {
- YAML::Node config;
- try
- {
- config = YAML::LoadFile(stryamlpath);
- }
- catch(YAML::BadFile e)
- {
- qDebug("load error.");
- return;
- }
- std::vector<std::string> vecmodulename;
- if(config["server"])
- {
- mstrserverip = config["server"].as<std::string>();
- }
- if(config["port"])
- {
- mstrserverport = config["port"].as<std::string>();
- }
- if(config["uploadinterval"])
- {
- mstrqueryinterval = config["uploadinterval"].as<std::string>();
- }
- std::string strmsgname;
- if(config["querymessage"])
- {
- for(YAML::const_iterator it= config["querymessage"].begin(); it != config["querymessage"].end();++it)
- {
- std::string strtitle = it->first.as<std::string>();
- std::cout<<strtitle<<std::endl;
- if(config["querymessage"][strtitle]["msgname"]&&config["querymessage"][strtitle]["buffersize"]&&config["querymessage"][strtitle]["buffercount"])
- {
- iv::rpcmsgunit xmu;
- strmsgname = config["querymessage"][strtitle]["msgname"].as<std::string>();
- strncpy(xmu.mstrmsgname,strmsgname.data(),255);
- xmu.mnBufferSize = config["querymessage"][strtitle]["buffersize"].as<int>();
- xmu.mnBufferCount = config["querymessage"][strtitle]["buffercount"].as<int>();
- mvectorquerymsgunit.push_back(xmu);
- }
- }
- }
- else
- {
- }
- if(config["ctrlmessage"])
- {
- std::string strnodename = "ctrlmessage";
- for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
- {
- std::string strtitle = it->first.as<std::string>();
- std::cout<<strtitle<<std::endl;
- if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
- {
- iv::rpcmsgunit xmu;
- strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
- strncpy(xmu.mstrmsgname,strmsgname.data(),255);
- xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
- xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
- mvectorctrlmsgunit.push_back(xmu);
- }
- }
- }
- else
- {
- }
- return;
- }
- void grpcclientthread::addquerymsgunit(std::string strquerymsg, int nbuffsize, int nbuffcount)
- {
- int i;
- for(i=0;i<mvectorquerymsgunit.size();i++)
- {
- if(strncmp(mvectorquerymsgunit[i].mstrmsgname,strquerymsg.data(),255)==0)
- {
- std::cout<<"grpcclientthread::addquerymsgunit msg "<<strquerymsg<<" is exist."<<std::endl;
- return;
- }
- }
- iv::rpcmsgunit xmu;
- strncpy(xmu.mstrmsgname,strquerymsg.data(),255);
- xmu.mnBufferSize = nbuffsize;
- xmu.mnBufferCount = nbuffcount;
- mvectorquerymsgunit.push_back(xmu);
- }
- void grpcclientthread::addctrlmsgunit(std::string strctrlmsg, int nbuffsize, int nbuffcount)
- {
- int i;
- for(i=0;i<mvectorctrlmsgunit.size();i++)
- {
- if(strncmp(mvectorctrlmsgunit[i].mstrmsgname,strctrlmsg.data(),255)==0)
- {
- std::cout<<"grpcclientthread::addquerymsgunit msg "<<strctrlmsg<<" is exist."<<std::endl;
- return;
- }
- }
- iv::rpcmsgunit xmu;
- strncpy(xmu.mstrmsgname,strctrlmsg.data(),255);
- xmu.mnBufferSize = nbuffsize;
- xmu.mnBufferCount = nbuffcount;
- mvectorctrlmsgunit.push_back(xmu);
- }
- void grpcclientthread::startlisten()
- {
- int i;
- for(i=0;i<mvectorctrlmsgunit.size();i++)
- {
- mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectorctrlmsgunit[i].mstrmsgname,ListenData);
- }
- for(i=0;i<mvectorquerymsgunit.size();i++)
- {
- mvectorquerymsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorquerymsgunit[i].mstrmsgname,mvectorquerymsgunit[i].mnBufferSize,
- mvectorquerymsgunit[i].mnBufferCount);
- }
- }
- void grpcclientthread::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
- {
- int nsize = mvectorctrlmsgunit.size();
- int i;
- for(i=0;i<nsize;i++)
- {
- if(strncmp(strmemname,mvectorctrlmsgunit[i].mstrmsgname,255) == 0)
- {
- mMutexMsg.lock();
- char * strtem = new char[nSize];
- memcpy(strtem,strdata,nSize);
- mvectorctrlmsgunit[i].mpstrmsgdata.reset(strtem);
- mvectorctrlmsgunit[i].mndatasize = nSize;
- mvectorctrlmsgunit[i].mbRefresh = true;
- mMutexMsg.unlock();
- break;
- }
- }
- }
- void grpcclientthread::run()
- {
- int nsize = mvectorquerymsgunit.size();
- int i;
- int ninterval = atoi(mstrqueryinterval.data());
- if(ninterval<=0)ninterval = 10;
- QTime xTime;
- xTime.start();
- int nlastsend = xTime.elapsed();
- std::string target_str = mstrserverip+":";
- target_str = target_str + mstrserverport ;//std::to_string()
- auto cargs = grpc::ChannelArguments();
- cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
- cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
- std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- std::unique_ptr<iv::ivgrpc::Stub> stub_ = iv::ivgrpc::NewStub(channel);
- int nid = 0;
- // Container for the data we expect from the server.
- gpr_timespec timespec;
- timespec.tv_sec = 3;//设置阻塞时间为2秒
- timespec.tv_nsec = 0;
- timespec.clock_type = GPR_TIMESPAN;
- int ndataindex = 0;
- // ClientContext context;
- int nctrlindex = 0;
- int nctrlnid = 0;
- while(!QThread::isInterruptionRequested())
- {
- iv::queryreq request;
- iv::queryReply reply;
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- if((xTime.elapsed()-nlastsend)<ninterval)
- {
- continue;
- }
- request.set_nindex(ndataindex);
- request.set_strdevicename(mstrdevname);
- for(i=0;i<nsize;i++)request.add_strmsgname();
- for(i=0;i<nsize;i++)
- {
- request.set_strmsgname(i,mvectorquerymsgunit[i].mstrmsgname);
- }
- // std::cout<<"size is "<<nsize<<" msg size is "<<request.strmsgname_size()<<std::endl;
- ClientContext context ;
- context.set_deadline(timespec);
- Status status = stub_->query(&context, request, &reply);
- if (status.ok()) {
- // std::cout<<nid<<" upload successfully"<<std::endl;
- ndataindex = reply.nlastindex();
- for(i=0;i<reply.msg_size();i++)
- {
- sharequerymsg(&reply.msg(i));
- // int j;
- // for(j=0;j<reply.msg_size();j++)
- // {
- // sharequerymsg(&reply.msg(j));
- // }
- }
- } else {
- #ifndef Android
- std::cout << status.error_code() << ": " << status.error_message()
- << std::endl;
- std::cout<<"RPC failed"<<std::endl;
- #endif
- if(status.error_code() == 4)
- {
- #ifndef Android
- std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
- #endif
- channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- stub_ = iv::ivgrpc::NewStub(channel);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(900));
- }
- nlastsend = xTime.elapsed();
- iv::ctrlreq ctrlrequest;
- iv::ctrlReply ctrlreply;
- mMutexMsg.lock();
- for(i=0;i<mvectorctrlmsgunit.size();i++)
- {
- if(mvectorctrlmsgunit[i].mbRefresh)
- {
- mvectorctrlmsgunit[i].mbRefresh = false;
- iv::ModuleMsg xctrl;
- xctrl.set_msgname(mvectorctrlmsgunit[i].mstrmsgname);
- xctrl.set_index(nctrlindex);
- nctrlindex++;
- xctrl.set_xdata(mvectorctrlmsgunit[i].mpstrmsgdata.get(),mvectorctrlmsgunit[i].mndatasize);
- xctrl.set_nlen(mvectorctrlmsgunit[i].mndatasize);
- iv::ModuleMsg * pmsg = ctrlrequest.add_msg();
- pmsg->CopyFrom(xctrl);
- }
- }
- mMutexMsg.unlock();
- if(ctrlrequest.msg_size()>0)
- {
- ctrlrequest.set_nid(nctrlnid);
- nctrlnid++;
- ctrlrequest.set_strdevicename(mstrdevname);
- ClientContext contextctrl ;
- contextctrl.set_deadline(timespec);
- Status status2 = stub_->ctrl(&contextctrl, ctrlrequest, &ctrlreply);
- if (status2.ok()) {
- // std::cout<<nid<<" upload successfully"<<std::endl;
- } else {
- std::cout << status2.error_code() << ": " << status2.error_message()
- << std::endl;
- std::cout<<"RPC failed"<<std::endl;
- if(status2.error_code() == 4)
- {
- std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
- channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- stub_ = iv::ivgrpc::NewStub(channel);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(900));
- }
- }
- }
- }
- void grpcclientthread::sharequerymsg(const iv::ModuleMsg *pxmsg)
- {
- int i;
- // std::cout<<"msg is "<<pxmsg->msgname()<<std::endl;
- int nsize = mvectorquerymsgunit.size();
- for(i=0;i<nsize;i++)
- {
- if(strncmp(pxmsg->msgname().data(),mvectorquerymsgunit[i].mstrmsgname,256) == 0)
- {
- // std::cout<<"msg is "<<pxmsg->msgname()<<" size is: "<<pxmsg->xdata().size()<<std::endl;
- if(strncmp(pxmsg->msgname().data(),"tracemap",256) == 0)
- {
- std::cout<<"trace map size is "<<pxmsg->xdata().size()<<std::endl;
- }
- #ifndef Android
- iv::modulecomm::ModuleSendMsg(mvectorquerymsgunit[i].mpa,pxmsg->xdata().data(),pxmsg->xdata().size());
- #else
- gAShow->AndroidSetMsg(pxmsg->msgname().data(),pxmsg->xdata().data(),pxmsg->xdata().size());
- #endif
- break;
- }
- }
- }
- void grpcclientthread::setdevname(std::string strdevname)
- {
- mstrdevname = strdevname;
- }
- void grpcclientthread::setserverip(std::string strip)
- {
- mstrserverip = strip;
- }
- void grpcclientthread::setserverport(std::string strport)
- {
- mstrserverport = strport;
- }
- void grpcclientthread::setqueryinterval(std::string strinterval)
- {
- mstrqueryinterval = strinterval;
- }
|