grpcpc.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. #include "grpcpc.h"
  2. static grpcpc * ggrpcpc;
  3. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  4. {
  5. ggrpcpc->UpdateData(strdata,nSize,strmemname);
  6. }
  7. grpcpc::grpcpc(std::string stryamlpath)
  8. {
  9. ggrpcpc = this;
  10. dec_yaml(stryamlpath.data());
  11. int i;
  12. for(i=0;i<mvectormsgunit.size();i++)
  13. {
  14. mvectormsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectormsgunit[i].mstrmsgname,mvectormsgunit[i].mnBufferSize,mvectormsgunit[i].mnBufferCount);
  15. }
  16. for(i=0;i<mvectorctrlmsgunit.size();i++)
  17. {
  18. mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectorctrlmsgunit[i].mstrmsgname,ListenData);
  19. }
  20. }
  21. void grpcpc::setframerate(float framerate)
  22. {
  23. mframerate = framerate;
  24. }
  25. void grpcpc::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::queryReqStream, iv::queryReplyStream> > writer, bool *pbrun)
  26. {
  27. int nctrlsize = mvectorctrlmsgunit.size();
  28. int i;
  29. int ninterval = atoi(gstruploadinterval.data());
  30. if(ninterval<=0)ninterval = 100;
  31. QTime xTime;
  32. xTime.start();
  33. int nlastsend = xTime.elapsed();
  34. int nid= 0;
  35. while(*pbrun)
  36. {
  37. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  38. if((xTime.elapsed()-nlastsend)<ninterval)
  39. {
  40. continue;
  41. }
  42. iv::queryReqStream request;
  43. bool bImportant = false;
  44. int nkeeptime = 0;
  45. iv::cloud::cloudmsg xmsg;
  46. xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
  47. nlastsend = xTime.elapsed();
  48. gMutexMsg.lock();
  49. // std::vector<iv::msgunit> xv = mvectorctrlmsgunit;
  50. for(i=0;i<nctrlsize;i++)
  51. {
  52. if(mvectorctrlmsgunit[i].mbRefresh)
  53. {
  54. mvectorctrlmsgunit[i].mbRefresh = false;
  55. if(mvectorctrlmsgunit[i].mbImportant)
  56. {
  57. bImportant = true;
  58. }
  59. if(mvectorctrlmsgunit[i].mnkeeptime > nkeeptime)
  60. {
  61. nkeeptime = mvectorctrlmsgunit[i].mnkeeptime;
  62. }
  63. iv::cloud::cloudunit xcloudunit;
  64. xcloudunit.set_msgname(mvectorctrlmsgunit[i].mstrmsgname);
  65. xcloudunit.set_data(mvectorctrlmsgunit[i].mpstrmsgdata.get(),mvectorctrlmsgunit[i].mndatasize);
  66. iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
  67. pcu->CopyFrom(xcloudunit);
  68. }
  69. }
  70. gMutexMsg.unlock();
  71. request.set_strquerymd5(gstrqueryMD5);
  72. request.set_strvin(gstrVIN);
  73. // request.set_ntime(QDateTime::currentMSecsSinceEpoch());
  74. request.set_ntype(0);
  75. if(xmsg.xclouddata_size()>0)
  76. {
  77. int nbytesize = xmsg.ByteSize();
  78. std::vector<char> pvectordata;
  79. pvectordata.resize(nbytesize);
  80. if(xmsg.SerializeToArray(pvectordata.data(),nbytesize))
  81. {
  82. // request.set_id(nctrlid);nctrlid++;
  83. request.set_strctrlmd5(gstrctrlMD5);
  84. request.set_nsuggestframerate(mframerate);
  85. request.set_xdata(pvectordata.data(),pvectordata.size());
  86. request.set_bimportant(bImportant);
  87. request.set_kepptime(nkeeptime);
  88. request.set_ntype(1);
  89. }
  90. }
  91. nlastsend = xTime.elapsed();
  92. bool bsend = writer->Write(request);
  93. std::cout<<"send msg. rtn is "<<bsend<<std::endl;
  94. }
  95. }
  96. void grpcpc::run()
  97. {
  98. int nsize = mvectormsgunit.size();
  99. int nctrlsize = mvectorctrlmsgunit.size();
  100. int i;
  101. qint64 nlasttime = 0;
  102. int ninterval = atoi(gstruploadinterval.data());
  103. if(ninterval<=0)ninterval = 100;
  104. QTime xTime;
  105. xTime.start();
  106. int nlastsend = xTime.elapsed();
  107. std::string target_str = gstrserverip+":";
  108. target_str = target_str + gstrserverport ;//std::to_string()
  109. auto cargs = grpc::ChannelArguments();
  110. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  111. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  112. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  113. target_str, grpc::InsecureChannelCredentials(),cargs);
  114. std::unique_ptr<iv::UploadStream::Stub> stub_ = iv::UploadStream::NewStub(channel);
  115. int nfail = 0;
  116. while(!QThread::isInterruptionRequested())
  117. {
  118. ClientContext context ;
  119. std::shared_ptr<::grpc::ClientReaderWriter<iv::queryReqStream, iv::queryReplyStream> > writerRead(stub_->queryctrl(&context));
  120. bool bRun = true;
  121. std::thread * pthread = new std::thread(&grpcpc::threadsend,this,writerRead,&bRun);
  122. (void )pthread;
  123. iv::queryReplyStream reply;
  124. while (writerRead->Read(&reply)) {
  125. nfail = 0;
  126. // std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
  127. if(reply.nres() == 1)
  128. {
  129. std::cout<<"data size is "<<reply.xdata().size()<<"time is"<<QDateTime::currentMSecsSinceEpoch()<<std::endl;
  130. iv::cloud::cloudmsg xmsg;
  131. if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
  132. {
  133. sharequerymsg(&xmsg);
  134. }
  135. }
  136. std::cout<<"read data from server. nres :"<< reply.nres()<<std::endl;
  137. }
  138. bRun = false;
  139. pthread->join();
  140. nfail++;
  141. if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
  142. else std::this_thread::sleep_for(std::chrono::milliseconds(100));
  143. std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
  144. }
  145. }
  146. void grpcpc::dec_yaml(const char *stryamlpath)
  147. {
  148. YAML::Node config;
  149. try
  150. {
  151. config = YAML::LoadFile(stryamlpath);
  152. }
  153. catch(YAML::BadFile e)
  154. {
  155. qDebug("load error.");
  156. return;
  157. }
  158. std::vector<std::string> vecmodulename;
  159. if(config["server"])
  160. {
  161. gstrserverip = config["server"].as<std::string>();
  162. }
  163. if(config["port"])
  164. {
  165. gstrserverport = config["port"].as<std::string>();
  166. }
  167. if(config["uploadinterval"])
  168. {
  169. gstruploadinterval = config["uploadinterval"].as<std::string>();
  170. }
  171. if(config["VIN"])
  172. {
  173. gstrVIN = config["VIN"].as<std::string>();
  174. }
  175. if(config["queryMD5"])
  176. {
  177. gstrqueryMD5 = config["queryMD5"].as<std::string>();
  178. }
  179. if(config["ctrlMD5"])
  180. {
  181. gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
  182. }
  183. std::string strmsgname;
  184. if(config["querymessage"])
  185. {
  186. for(YAML::const_iterator it= config["querymessage"].begin(); it != config["querymessage"].end();++it)
  187. {
  188. std::string strtitle = it->first.as<std::string>();
  189. std::cout<<strtitle<<std::endl;
  190. if(config["querymessage"][strtitle]["msgname"]&&config["querymessage"][strtitle]["buffersize"]&&config["querymessage"][strtitle]["buffercount"])
  191. {
  192. iv::msgunit xmu;
  193. strmsgname = config["querymessage"][strtitle]["msgname"].as<std::string>();
  194. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  195. xmu.mnBufferSize = config["querymessage"][strtitle]["buffersize"].as<int>();
  196. xmu.mnBufferCount = config["querymessage"][strtitle]["buffercount"].as<int>();
  197. mvectormsgunit.push_back(xmu);
  198. }
  199. }
  200. }
  201. else
  202. {
  203. }
  204. if(config["ctrlmessage"])
  205. {
  206. std::string strnodename = "ctrlmessage";
  207. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  208. {
  209. std::string strtitle = it->first.as<std::string>();
  210. std::cout<<strtitle<<std::endl;
  211. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  212. {
  213. iv::msgunit xmu;
  214. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  215. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  216. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  217. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  218. if(config[strnodename][strtitle]["bimportant"])
  219. {
  220. std::string strimportant = config[strnodename][strtitle]["bimportant"].as<std::string>();
  221. if(strimportant == "true")
  222. {
  223. xmu.mbImportant = true;
  224. }
  225. }
  226. if(config[strnodename][strtitle]["keeptime"])
  227. {
  228. std::string strkeep = config[strnodename][strtitle]["keeptime"].as<std::string>();
  229. xmu.mnkeeptime = atoi(strkeep.data());
  230. }
  231. mvectorctrlmsgunit.push_back(xmu);
  232. }
  233. }
  234. }
  235. else
  236. {
  237. }
  238. return;
  239. }
  240. void grpcpc::sharequerymsg(iv::cloud::cloudmsg *pxmsg)
  241. {
  242. int i;
  243. int nsize = pxmsg->xclouddata_size();
  244. for(i=0;i<nsize;i++)
  245. {
  246. int j;
  247. int nquerysize = mvectormsgunit.size();
  248. for(j=0;j<nquerysize;j++)
  249. {
  250. if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectormsgunit[j].mstrmsgname,255) == 0)
  251. {
  252. qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
  253. iv::modulecomm::ModuleSendMsg(mvectormsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
  254. break;
  255. }
  256. }
  257. }
  258. }
  259. void grpcpc::UpdateData(const char *strdata, const unsigned int nSize,const char * strmemname)
  260. {
  261. int nsize = mvectorctrlmsgunit.size();
  262. int i;
  263. for(i=0;i<nsize;i++)
  264. {
  265. if(strncmp(strmemname,mvectorctrlmsgunit[i].mstrmsgname,255) == 0)
  266. {
  267. gMutexMsg.lock();
  268. char * strtem = new char[nSize];
  269. memcpy(strtem,strdata,nSize);
  270. mvectorctrlmsgunit[i].mpstrmsgdata.reset(strtem);
  271. mvectorctrlmsgunit[i].mndatasize = nSize;
  272. mvectorctrlmsgunit[i].mbRefresh = true;
  273. gMutexMsg.unlock();
  274. break;
  275. }
  276. }
  277. }
  278. std::string grpcpc::GetVIN()
  279. {
  280. return gstrVIN;
  281. }
  282. float grpcpc::getframerate()
  283. {
  284. return mframerate;
  285. }