+#include <QCoreApplication>
+#include "grpcclient.h"
+#include "ivversion.h"
+#include <yaml-cpp/yaml.h>
+#include <QDateTime>
+#include <iostream>
+#include <vector>
+#include <memory>
+#include <QMutex>
+#include <thread>
+#include "modulecomm.h"
+#include "cloud.pb.h"
+#include <iostream>
+#include <memory>
+#include <string>
+#include <grpcpp/grpcpp.h>
+#include "uploadmsg.grpc.pb.h"
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::Status;
+void test()
+ std::string target_str = "";
+ auto cargs = grpc::ChannelArguments();
+ cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+ cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+ std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+ target_str, grpc::InsecureChannelCredentials(),cargs);
+ std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
+ iv::UploadRequest request;
+ // Container for the data we expect from the server.
+ iv::UploadReply reply;
+ int nid = 0;
+ nid = 1;
+ while(1)
+ {
+ // Context for the client. It could be used to convey extra information to
+ // the server and/or tweak certain RPC behaviors.
+ ClientContext context ;
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+ request.set_id(nid);
+ request.set_ntime(time1);
+ nid++;
+ // The actual RPC.
+ Status status = stub_->upload(&context, request, &reply);
+ if (status.ok()) {
+ std::cout<<nid<<" upload successfully"<<std::endl;
+// qint64 time2;
+// memcpy(&time2,reply.data().data(),8);
+// qint64 time3 = QDateTime::currentMSecsSinceEpoch();
+// std::cout<<"reply data size is "<<reply.data().size()<<std::endl;
+// std::cout<<" latency is "<<(time2 - time1)<<" 2 is "<<(time3 - time2)<<std::endl;
+// return reply.message();
+ } else {
+ std::cout << status.error_code() << ": " << status.error_message()
+ << std::endl;
+ std::cout<<"RPC failed"<<std::endl;
+ std::this_thread::sleep_for(std::chrono::milliseconds(900));
+// delete pcontext;
+// pcontext = new ClientContext;
+// channel = grpc::CreateCustomChannel(
+// target_str, grpc::InsecureChannelCredentials(),cargs);
+// stub_ = iv::Upload::NewStub(channel);
+ }
+ }
+std::string gstrserverip = "";//"";
+std::string gstrserverport = "50051";//"9000";
+std::string gstruploadinterval = "1000";
+void * gpa;
+QMutex gMutexMsg;
+std::thread * guploadthread;
+std::vector<iv::msgunit> mvectormsgunit;
+std::vector<iv::msgunit> mvectorctrlmsgunit;
+std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
+std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";//"5d41402abc4b2a76b9719d911017c592";
+std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
+int gindex = 0;
+void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
+ int nsize = mvectormsgunit.size();
+ int i;
+ for(i=0;i<nsize;i++)
+ {
+ if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
+ {
+ gMutexMsg.lock();
+ char * strtem = new char[nSize];
+ memcpy(strtem,strdata,nSize);
+ mvectormsgunit[i].mpstrmsgdata.reset(strtem);
+ mvectormsgunit[i].mndatasize = nSize;
+ mvectormsgunit[i].mbRefresh = true;
+ gMutexMsg.unlock();
+ break;
+ }
+ }
+void sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
+ int i;
+ int nsize = pxmsg->xclouddata_size();
+ for(i=0;i<nsize;i++)
+ {
+ int j;
+ int nquerysize = mvectorctrlmsgunit.size();
+ for(j=0;j<nquerysize;j++)
+ {
+ if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
+ {
+ // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
+ iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
+ break;
+ }
+ }
+ }
+void threadupload()
+ int nsize = mvectormsgunit.size();
+ int i;
+ int ninterval = atoi(gstruploadinterval.data());
+ if(ninterval<=0)ninterval = 100;
+ QTime xTime;
+ xTime.start();
+ int nlastsend = xTime.elapsed();
+ std::string target_str = gstrserverip+":";
+ target_str = target_str + gstrserverport ;//std::to_string()
+ auto cargs = grpc::ChannelArguments();
+ cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+ cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+ std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+ target_str, grpc::InsecureChannelCredentials(),cargs);
+ std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
+ iv::UploadRequest request;
+ int nid = 0;
+ // Container for the data we expect from the server.
+ iv::UploadReply reply;
+ gpr_timespec timespec;
+ timespec.tv_sec = 30;//设置阻塞时间为2秒
+ timespec.tv_nsec = 0;
+ timespec.clock_type = GPR_TIMESPAN;
+ // ClientContext context;
+ while(true)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ if((xTime.elapsed()-nlastsend)<ninterval)
+ {
+ continue;
+ }
+ bool bImportant = false;
+ int nkeeptime = 0;
+ iv::cloud::cloudmsg xmsg;
+ xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
+ gMutexMsg.lock();
+ for(i=0;i<nsize;i++)
+ {
+ if(mvectormsgunit[i].mbRefresh)
+ {
+ mvectormsgunit[i].mbRefresh = false;
+ if(mvectormsgunit[i].mbImportant)
+ {
+ bImportant = true;
+ }
+ if(mvectormsgunit[i].mnkeeptime > nkeeptime)
+ {
+ nkeeptime = mvectormsgunit[i].mnkeeptime;
+ }
+ iv::cloud::cloudunit xcloudunit;
+ xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
+ xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
+ iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
+ pcu->CopyFrom(xcloudunit);
+ }
+ }
+ gMutexMsg.unlock();
+ int nbytesize = xmsg.ByteSize();
+ char * strbuf = new char[nbytesize];
+ std::shared_ptr<char> pstrbuf;
+ pstrbuf.reset(strbuf);
+ if(xmsg.SerializeToArray(strbuf,nbytesize))
+ {
+ ClientContext context ;
+ context.set_deadline(timespec);
+ qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+ request.set_id(nid);
+ request.set_ntime(time1);
+ request.set_strquerymd5(gstrqueryMD5);
+ request.set_strctrlmd5(gstrctrlMD5);
+ request.set_strvin(gstrVIN);
+ request.set_xdata(strbuf,nbytesize);
+ request.set_kepptime(nkeeptime);
+ request.set_bimportant(bImportant);
+ nid++;
+ // The actual RPC.
+ Status status = stub_->upload(&context, request, &reply);
+ if (status.ok()) {
+ std::cout<<nid<<" upload successfully"<<std::endl;
+ if(reply.nres() == 1)
+ {
+ iv::cloud::cloudmsg xmsg;
+ if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+ {
+ sharectrlmsg(&xmsg);
+ }
+ }
+ } else {
+ std::cout << status.error_code() << ": " << status.error_message()
+ << std::endl;
+ std::cout<<"RPC failed"<<std::endl;
+ if(status.error_code() == 4)
+ {
+ std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
+ channel = grpc::CreateCustomChannel(
+ target_str, grpc::InsecureChannelCredentials(),cargs);
+ stub_ = iv::Upload::NewStub(channel);
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(900));
+ }
+ }
+ nlastsend = xTime.elapsed();
+ }
+void dec_yaml(const char * stryamlpath)
+ YAML::Node config;
+ try
+ {
+ config = YAML::LoadFile(stryamlpath);
+ }
+ catch(YAML::BadFile e)
+ {
+ qDebug("load error.");
+ return;
+ }
+ std::vector<std::string> vecmodulename;
+ if(config["server"])
+ {
+ gstrserverip = config["server"].as<std::string>();
+ }
+ if(config["port"])
+ {
+ gstrserverport = config["port"].as<std::string>();
+ }
+ if(config["uploadinterval"])
+ {
+ gstruploadinterval = config["uploadinterval"].as<std::string>();
+ }
+ if(config["VIN"])
+ {
+ gstrVIN = config["VIN"].as<std::string>();
+ }
+ if(config["queryMD5"])
+ {
+ gstrqueryMD5 = config["queryMD5"].as<std::string>();
+ }
+ else
+ {
+ return;
+ }
+ if(config["ctrlMD5"])
+ {
+ gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
+ }
+ std::string strmsgname;
+ if(config["uploadmessage"])
+ {
+ for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
+ {
+ std::string strtitle = it->first.as<std::string>();
+ std::cout<<strtitle<<std::endl;
+ if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
+ {
+ iv::msgunit xmu;
+ strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
+ strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+ xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
+ xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
+ if(config["uploadmessage"][strtitle]["bimportant"])
+ {
+ std::string strimportant = config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
+ if(strimportant == "true")
+ {
+ xmu.mbImportant = true;
+ }
+ }
+ if(config["uploadmessage"][strtitle]["keeptime"])
+ {
+ std::string strkeep = config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
+ xmu.mnkeeptime = atoi(strkeep.data());
+ }
+ mvectormsgunit.push_back(xmu);
+ }
+ }
+ }
+ else
+ {
+ }
+ if(!config["ctrlMD5"])
+ {
+ return;
+ }
+ if(config["ctrlmessage"])
+ {
+ std::string strnodename = "ctrlmessage";
+ for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
+ {
+ std::string strtitle = it->first.as<std::string>();
+ std::cout<<strtitle<<std::endl;
+ if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
+ {
+ iv::msgunit xmu;
+ strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
+ strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+ xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
+ xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
+ mvectorctrlmsgunit.push_back(xmu);
+ }
+ }
+ }
+ else
+ {
+ }
+ return;
+int main(int argc, char *argv[])
+ showversion("driver_cloud_grpc_client");
+ QCoreApplication a(argc, argv);
+ // std::thread * ptest = new std::thread(test);
+ // return a.exec();
+ char stryamlpath[256];
+ if(argc<2)
+ {
+ snprintf(stryamlpath,255,"driver_cloud_grpc_client.yaml");
+// strncpy(stryamlpath,abs_ymlpath,255);
+ }
+ else
+ {
+ strncpy(stryamlpath,argv[1],255);
+ }
+// dec_yaml(stryamlpath);
+ grpcclient * pgrpcclient = new grpcclient(stryamlpath);
+ pgrpcclient->start();
+// int i;
+// for(i=0;i<mvectormsgunit.size();i++)
+// {
+// mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
+// }
+// for(i=0;i<mvectorctrlmsgunit.size();i++)
+// {
+// mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
+// mvectorctrlmsgunit[i].mnBufferCount);
+// }
+// guploadthread = new std::thread(threadupload);
+ return a.exec();