grpcclient.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. #include "grpcclient.h"
  2. grpcclient * ggrpcclient;
  3. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  4. {
  5. ggrpcclient->UpdateData(strdata,nSize,strmemname);
  6. }
  7. void ListenPicData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  8. {
  9. ggrpcclient->UpdatePicData(strdata,nSize,strmemname);
  10. }
  11. grpcclient::grpcclient(std::string stryamlpath)
  12. {
  13. ggrpcclient = this;
  14. dec_yaml(stryamlpath.data());
  15. mstrpicmsgname[0] = "PicFront";
  16. mstrpicmsgname[1] = "PicRear";
  17. mstrpicmsgname[2] = "PicLeft";
  18. mstrpicmsgname[3] = "PicRight";
  19. unsigned int i;
  20. for(i=0;i<mvectormsgunit.size();i++)
  21. {
  22. mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
  23. }
  24. for(i=0;i<mvectorctrlmsgunit.size();i++)
  25. {
  26. mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
  27. mvectorctrlmsgunit[i].mnBufferCount);
  28. }
  29. for(i=0;i<NUM_CAM;i++)
  30. {
  31. mpaPic[i] = iv::modulecomm::RegisterRecv(mstrpicmsgname[i].data(),ListenPicData);
  32. }
  33. for(i=0;i<NUM_CAM;i++)
  34. {
  35. unsigned int j;
  36. for(j=0;j<NUM_THREAD_PERCAM;j++)
  37. {
  38. mpThread[i*NUM_THREAD_PERCAM + j] = new std::thread(&grpcclient::threadpicupload,this,i);
  39. }
  40. }
  41. }
  42. void grpcclient::run()
  43. {
  44. int nsize = mvectormsgunit.size();
  45. int i;
  46. int ninterval = atoi(gstruploadinterval.data());
  47. if(ninterval<=0)ninterval = 100;
  48. QTime xTime;
  49. xTime.start();
  50. int nlastsend = xTime.elapsed();
  51. std::string target_str = gstrserverip+":";
  52. target_str = target_str + gstrserverport ;//std::to_string()
  53. auto cargs = grpc::ChannelArguments();
  54. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  55. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  56. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  57. target_str, grpc::InsecureChannelCredentials(),cargs);
  58. std::unique_ptr<iv::UploadThread::Stub> stub_ = iv::UploadThread::NewStub(channel);
  59. iv::UploadRequestThread request;
  60. int nid = 0;
  61. // Container for the data we expect from the server.
  62. iv::UploadReplyThread reply;
  63. gpr_timespec timespec;
  64. timespec.tv_sec = 30;//设置阻塞时间为2秒
  65. timespec.tv_nsec = 0;
  66. timespec.clock_type = GPR_TIMESPAN;
  67. // ClientContext context;
  68. while(!QThread::isInterruptionRequested())
  69. {
  70. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  71. if((xTime.elapsed()-nlastsend)<ninterval)
  72. {
  73. continue;
  74. }
  75. bool bImportant = false;
  76. int nkeeptime = 0;
  77. iv::cloud::cloudmsg xmsg;
  78. xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
  79. gMutexMsg.lock();
  80. for(i=0;i<nsize;i++)
  81. {
  82. if(mvectormsgunit[i].mbRefresh)
  83. {
  84. mvectormsgunit[i].mbRefresh = false;
  85. if(mvectormsgunit[i].mbImportant)
  86. {
  87. bImportant = true;
  88. }
  89. if(mvectormsgunit[i].mnkeeptime > nkeeptime)
  90. {
  91. nkeeptime = mvectormsgunit[i].mnkeeptime;
  92. }
  93. iv::cloud::cloudunit xcloudunit;
  94. xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
  95. xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
  96. iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
  97. pcu->CopyFrom(xcloudunit);
  98. }
  99. }
  100. gMutexMsg.unlock();
  101. int nbytesize = xmsg.ByteSize();
  102. char * strbuf = new char[nbytesize];
  103. std::shared_ptr<char> pstrbuf;
  104. pstrbuf.reset(strbuf);
  105. if(xmsg.SerializeToArray(strbuf,nbytesize))
  106. {
  107. ClientContext context ;
  108. context.set_deadline(timespec);
  109. qint64 time1 = QDateTime::currentMSecsSinceEpoch();
  110. request.set_id(nid);
  111. request.set_ntime(time1);
  112. request.set_strquerymd5(gstrqueryMD5);
  113. request.set_strctrlmd5(gstrctrlMD5);
  114. request.set_strvin(gstrVIN);
  115. request.set_xdata(strbuf,nbytesize);
  116. request.set_kepptime(nkeeptime);
  117. request.set_bimportant(bImportant);
  118. nid++;
  119. nlastsend = xTime.elapsed();
  120. // The actual RPC.
  121. Status status = stub_->uploaddata(&context, request, &reply);
  122. if (status.ok()) {
  123. std::cout<<" data size is "<<nbytesize<<std::endl;
  124. std::cout<<nid<<" upload successfully"<<std::endl;
  125. if(reply.nres() == 1)
  126. {
  127. iv::cloud::cloudmsg xmsg;
  128. if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
  129. {
  130. sharectrlmsg(&xmsg);
  131. }
  132. }
  133. } else {
  134. std::cout << status.error_code() << ": " << status.error_message()
  135. << std::endl;
  136. std::cout<<"RPC failed"<<std::endl;
  137. if(status.error_code() == 4)
  138. {
  139. std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
  140. channel = grpc::CreateCustomChannel(
  141. target_str, grpc::InsecureChannelCredentials(),cargs);
  142. stub_ = iv::UploadThread::NewStub(channel);
  143. }
  144. std::this_thread::sleep_for(std::chrono::milliseconds(900));
  145. }
  146. }
  147. }
  148. }
  149. void grpcclient::dec_yaml(const char * stryamlpath)
  150. {
  151. YAML::Node config;
  152. try
  153. {
  154. config = YAML::LoadFile(stryamlpath);
  155. }
  156. catch(YAML::BadFile e)
  157. {
  158. qDebug("load error.");
  159. return;
  160. }
  161. std::vector<std::string> vecmodulename;
  162. if(config["server"])
  163. {
  164. gstrserverip = config["server"].as<std::string>();
  165. }
  166. if(config["port"])
  167. {
  168. gstrserverport = config["port"].as<std::string>();
  169. }
  170. if(config["uploadinterval"])
  171. {
  172. gstruploadinterval = config["uploadinterval"].as<std::string>();
  173. }
  174. if(config["VIN"])
  175. {
  176. gstrVIN = config["VIN"].as<std::string>();
  177. }
  178. if(config["queryMD5"])
  179. {
  180. gstrqueryMD5 = config["queryMD5"].as<std::string>();
  181. }
  182. else
  183. {
  184. return;
  185. }
  186. if(config["ctrlMD5"])
  187. {
  188. gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
  189. }
  190. std::string strmsgname;
  191. if(config["uploadmessage"])
  192. {
  193. for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
  194. {
  195. std::string strtitle = it->first.as<std::string>();
  196. std::cout<<strtitle<<std::endl;
  197. if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
  198. {
  199. iv::msgunit xmu;
  200. strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
  201. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  202. xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
  203. xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
  204. if(config["uploadmessage"][strtitle]["bimportant"])
  205. {
  206. std::string strimportant = config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
  207. if(strimportant == "true")
  208. {
  209. xmu.mbImportant = true;
  210. }
  211. }
  212. if(config["uploadmessage"][strtitle]["keeptime"])
  213. {
  214. std::string strkeep = config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
  215. xmu.mnkeeptime = atoi(strkeep.data());
  216. }
  217. mvectormsgunit.push_back(xmu);
  218. }
  219. }
  220. }
  221. else
  222. {
  223. }
  224. if(!config["ctrlMD5"])
  225. {
  226. return;
  227. }
  228. if(config["ctrlmessage"])
  229. {
  230. std::string strnodename = "ctrlmessage";
  231. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  232. {
  233. std::string strtitle = it->first.as<std::string>();
  234. std::cout<<strtitle<<std::endl;
  235. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  236. {
  237. iv::msgunit xmu;
  238. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  239. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  240. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  241. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  242. mvectorctrlmsgunit.push_back(xmu);
  243. }
  244. }
  245. }
  246. else
  247. {
  248. }
  249. return;
  250. }
  251. void grpcclient::sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
  252. {
  253. int i;
  254. int nsize = pxmsg->xclouddata_size();
  255. for(i=0;i<nsize;i++)
  256. {
  257. int j;
  258. int nquerysize = mvectorctrlmsgunit.size();
  259. for(j=0;j<nquerysize;j++)
  260. {
  261. if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
  262. {
  263. // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
  264. iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
  265. break;
  266. }
  267. }
  268. }
  269. }
  270. void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
  271. {
  272. int nsize = mvectormsgunit.size();
  273. int i;
  274. for(i=0;i<nsize;i++)
  275. {
  276. if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
  277. {
  278. gMutexMsg.lock();
  279. char * strtem = new char[nSize];
  280. memcpy(strtem,strdata,nSize);
  281. mvectormsgunit[i].mpstrmsgdata.reset(strtem);
  282. mvectormsgunit[i].mndatasize = nSize;
  283. mvectormsgunit[i].mbRefresh = true;
  284. gMutexMsg.unlock();
  285. break;
  286. }
  287. }
  288. }
  289. void grpcclient::UpdatePicData(const char *strdata, const unsigned int nSize, const char *strmemname)
  290. {
  291. int npos = -1;
  292. unsigned int i;
  293. for(i=0;i<NUM_CAM;i++)
  294. {
  295. if(strncmp(strmemname,mstrpicmsgname[i].data(),255) == 0)
  296. {
  297. npos = i;
  298. break;
  299. }
  300. }
  301. if(npos<0)
  302. {
  303. std::cout<<"grpcclient::UpdatePicData not found pic. msg name is "<<strmemname<<std::endl;
  304. return;
  305. }
  306. if(npos>= NUM_CAM)
  307. {
  308. std::cout<<"Camera count is "<<NUM_CAM<<" NOW camear is "<<npos<<std::endl;
  309. return;
  310. }
  311. mpicbuf[npos].mMutex.lock();
  312. mpicbuf[npos].mnMsgTime = QDateTime::currentMSecsSinceEpoch();
  313. mpicbuf[npos].mbRefresh = true;
  314. mpicbuf[npos].mpstrmsgdata = std::shared_ptr<char>(new char[nSize]);
  315. mpicbuf[npos].mDataSize = nSize;
  316. memcpy(mpicbuf[npos].mpstrmsgdata.get(),strdata,nSize);
  317. mpicbuf[npos].mMutex.unlock();
  318. }
  319. void grpcclient::threadpicupload(int nCamPos)
  320. {
  321. std::cout<<"thread cam "<<nCamPos<<"run"<<std::endl;
  322. int nsize = mvectormsgunit.size();
  323. int i;
  324. int ninterval = atoi(gstruploadinterval.data());
  325. if(ninterval<=0)ninterval = 100;
  326. QTime xTime;
  327. xTime.start();
  328. int nlastsend = xTime.elapsed();
  329. std::string target_str = gstrserverip+":";
  330. target_str = target_str + gstrserverport ;//std::to_string()
  331. auto cargs = grpc::ChannelArguments();
  332. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  333. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  334. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  335. target_str, grpc::InsecureChannelCredentials(),cargs);
  336. std::unique_ptr<iv::UploadThread::Stub> stub_ = iv::UploadThread::NewStub(channel);
  337. iv::PicUpRequestThread request;
  338. int nid = 0;
  339. // Container for the data we expect from the server.
  340. iv::PicUpReplyThread reply;
  341. gpr_timespec timespec;
  342. timespec.tv_sec = 30;//设置阻塞时间为2秒
  343. timespec.tv_nsec = 0;
  344. timespec.clock_type = GPR_TIMESPAN;
  345. // ClientContext context;
  346. while(true)
  347. {
  348. std::shared_ptr<char> pstr_ptr;
  349. if((nCamPos<0)||(nCamPos >= NUM_CAM))
  350. {
  351. std::cout<<"Cam Pos Error. "<<"Pos: "<<nCamPos<<" TOTAL:"<<NUM_CAM<<std::endl;
  352. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  353. continue;
  354. }
  355. bool bUpdate = false;
  356. qint64 nMsgTime = 0;
  357. int nSize = 0;
  358. mpicbuf[nCamPos].mMutex.lock();
  359. bUpdate = mpicbuf[nCamPos].mbRefresh;
  360. if(bUpdate)
  361. {
  362. nMsgTime = mpicbuf[nCamPos].mnMsgTime;
  363. mpicbuf[nCamPos].mbRefresh = false;
  364. pstr_ptr = mpicbuf[nCamPos].mpstrmsgdata;
  365. nSize = mpicbuf[nCamPos].mDataSize;
  366. }
  367. mpicbuf[nCamPos].mMutex.unlock();
  368. if(bUpdate == false)
  369. {
  370. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  371. continue;
  372. }
  373. ClientContext context ;
  374. context.set_deadline(timespec);
  375. qint64 time1 = QDateTime::currentMSecsSinceEpoch();
  376. request.set_npictime(nMsgTime);
  377. request.set_ncampos(nCamPos);
  378. request.set_strvin(gstrVIN);
  379. request.set_xdata(pstr_ptr.get(),nSize);
  380. nid++;
  381. nlastsend = xTime.elapsed();
  382. // The actual RPC.
  383. Status status = stub_->uploadpic(&context, request, &reply);
  384. if (status.ok()) {
  385. if(reply.nres() == 1)
  386. {
  387. // iv::cloud::cloudmsg xmsg;
  388. // if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
  389. // {
  390. // sharectrlmsg(&xmsg);
  391. // }
  392. }
  393. } else {
  394. std::cout << status.error_code() << ": " << status.error_message()
  395. << std::endl;
  396. std::cout<<"RPC failed"<<std::endl;
  397. if(status.error_code() == 4)
  398. {
  399. std::cout<<nCamPos<<" RPC Exceed Time, Create New stub_"<<std::endl;
  400. channel = grpc::CreateCustomChannel(
  401. target_str, grpc::InsecureChannelCredentials(),cargs);
  402. stub_ = iv::UploadThread::NewStub(channel);
  403. }
  404. std::this_thread::sleep_for(std::chrono::milliseconds(900));
  405. }
  406. }
  407. }