main.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. #include <QCoreApplication>
  2. #include "grpcclient.h"
  3. #include "ivversion.h"
  4. /*
  5. #include <yaml-cpp/yaml.h>
  6. #include <QDateTime>
  7. #include <iostream>
  8. #include <vector>
  9. #include <memory>
  10. #include <QMutex>
  11. #include <thread>
  12. #include "modulecomm.h"
  13. #include "cloud.pb.h"
  14. #include <iostream>
  15. #include <memory>
  16. #include <string>
  17. #include <grpcpp/grpcpp.h>
  18. #include "uploadmsg.grpc.pb.h"
  19. using grpc::Channel;
  20. using grpc::ClientContext;
  21. using grpc::Status;
  22. void test()
  23. {
  24. std::string target_str = "0.0.0.0:50051";
  25. auto cargs = grpc::ChannelArguments();
  26. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  27. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  28. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  29. target_str, grpc::InsecureChannelCredentials(),cargs);
  30. std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
  31. iv::UploadRequest request;
  32. // Container for the data we expect from the server.
  33. iv::UploadReply reply;
  34. int nid = 0;
  35. nid = 1;
  36. while(1)
  37. {
  38. // Context for the client. It could be used to convey extra information to
  39. // the server and/or tweak certain RPC behaviors.
  40. ClientContext context ;
  41. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  42. qint64 time1 = QDateTime::currentMSecsSinceEpoch();
  43. request.set_id(nid);
  44. request.set_ntime(time1);
  45. nid++;
  46. // The actual RPC.
  47. Status status = stub_->upload(&context, request, &reply);
  48. if (status.ok()) {
  49. std::cout<<nid<<" upload successfully"<<std::endl;
  50. // qint64 time2;
  51. // memcpy(&time2,reply.data().data(),8);
  52. // qint64 time3 = QDateTime::currentMSecsSinceEpoch();
  53. // std::cout<<"reply data size is "<<reply.data().size()<<std::endl;
  54. // std::cout<<" latency is "<<(time2 - time1)<<" 2 is "<<(time3 - time2)<<std::endl;
  55. // return reply.message();
  56. } else {
  57. std::cout << status.error_code() << ": " << status.error_message()
  58. << std::endl;
  59. std::cout<<"RPC failed"<<std::endl;
  60. std::this_thread::sleep_for(std::chrono::milliseconds(900));
  61. // delete pcontext;
  62. // pcontext = new ClientContext;
  63. // channel = grpc::CreateCustomChannel(
  64. // target_str, grpc::InsecureChannelCredentials(),cargs);
  65. // stub_ = iv::Upload::NewStub(channel);
  66. }
  67. }
  68. }
  69. std::string gstrserverip = "0.0.0.0";//"123.57.212.138";
  70. std::string gstrserverport = "50051";//"9000";
  71. std::string gstruploadinterval = "1000";
  72. void * gpa;
  73. QMutex gMutexMsg;
  74. std::thread * guploadthread;
  75. std::vector<iv::msgunit> mvectormsgunit;
  76. std::vector<iv::msgunit> mvectorctrlmsgunit;
  77. std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
  78. std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";//"5d41402abc4b2a76b9719d911017c592";
  79. std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
  80. int gindex = 0;
  81. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  82. {
  83. int nsize = mvectormsgunit.size();
  84. int i;
  85. for(i=0;i<nsize;i++)
  86. {
  87. if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
  88. {
  89. gMutexMsg.lock();
  90. char * strtem = new char[nSize];
  91. memcpy(strtem,strdata,nSize);
  92. mvectormsgunit[i].mpstrmsgdata.reset(strtem);
  93. mvectormsgunit[i].mndatasize = nSize;
  94. mvectormsgunit[i].mbRefresh = true;
  95. gMutexMsg.unlock();
  96. break;
  97. }
  98. }
  99. }
  100. void sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
  101. {
  102. int i;
  103. int nsize = pxmsg->xclouddata_size();
  104. for(i=0;i<nsize;i++)
  105. {
  106. int j;
  107. int nquerysize = mvectorctrlmsgunit.size();
  108. for(j=0;j<nquerysize;j++)
  109. {
  110. if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
  111. {
  112. // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
  113. iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
  114. break;
  115. }
  116. }
  117. }
  118. }
  119. void threadupload()
  120. {
  121. int nsize = mvectormsgunit.size();
  122. int i;
  123. int ninterval = atoi(gstruploadinterval.data());
  124. if(ninterval<=0)ninterval = 100;
  125. QTime xTime;
  126. xTime.start();
  127. int nlastsend = xTime.elapsed();
  128. std::string target_str = gstrserverip+":";
  129. target_str = target_str + gstrserverport ;//std::to_string()
  130. auto cargs = grpc::ChannelArguments();
  131. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  132. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  133. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  134. target_str, grpc::InsecureChannelCredentials(),cargs);
  135. std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
  136. iv::UploadRequest request;
  137. int nid = 0;
  138. // Container for the data we expect from the server.
  139. iv::UploadReply reply;
  140. gpr_timespec timespec;
  141. timespec.tv_sec = 30;//设置阻塞时间为2秒
  142. timespec.tv_nsec = 0;
  143. timespec.clock_type = GPR_TIMESPAN;
  144. // ClientContext context;
  145. while(true)
  146. {
  147. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  148. if((xTime.elapsed()-nlastsend)<ninterval)
  149. {
  150. continue;
  151. }
  152. bool bImportant = false;
  153. int nkeeptime = 0;
  154. iv::cloud::cloudmsg xmsg;
  155. xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
  156. gMutexMsg.lock();
  157. for(i=0;i<nsize;i++)
  158. {
  159. if(mvectormsgunit[i].mbRefresh)
  160. {
  161. mvectormsgunit[i].mbRefresh = false;
  162. if(mvectormsgunit[i].mbImportant)
  163. {
  164. bImportant = true;
  165. }
  166. if(mvectormsgunit[i].mnkeeptime > nkeeptime)
  167. {
  168. nkeeptime = mvectormsgunit[i].mnkeeptime;
  169. }
  170. iv::cloud::cloudunit xcloudunit;
  171. xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
  172. xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
  173. iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
  174. pcu->CopyFrom(xcloudunit);
  175. }
  176. }
  177. gMutexMsg.unlock();
  178. int nbytesize = xmsg.ByteSize();
  179. char * strbuf = new char[nbytesize];
  180. std::shared_ptr<char> pstrbuf;
  181. pstrbuf.reset(strbuf);
  182. if(xmsg.SerializeToArray(strbuf,nbytesize))
  183. {
  184. ClientContext context ;
  185. context.set_deadline(timespec);
  186. qint64 time1 = QDateTime::currentMSecsSinceEpoch();
  187. request.set_id(nid);
  188. request.set_ntime(time1);
  189. request.set_strquerymd5(gstrqueryMD5);
  190. request.set_strctrlmd5(gstrctrlMD5);
  191. request.set_strvin(gstrVIN);
  192. request.set_xdata(strbuf,nbytesize);
  193. request.set_kepptime(nkeeptime);
  194. request.set_bimportant(bImportant);
  195. nid++;
  196. // The actual RPC.
  197. Status status = stub_->upload(&context, request, &reply);
  198. if (status.ok()) {
  199. std::cout<<nid<<" upload successfully"<<std::endl;
  200. if(reply.nres() == 1)
  201. {
  202. iv::cloud::cloudmsg xmsg;
  203. if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
  204. {
  205. sharectrlmsg(&xmsg);
  206. }
  207. }
  208. } else {
  209. std::cout << status.error_code() << ": " << status.error_message()
  210. << std::endl;
  211. std::cout<<"RPC failed"<<std::endl;
  212. if(status.error_code() == 4)
  213. {
  214. std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
  215. channel = grpc::CreateCustomChannel(
  216. target_str, grpc::InsecureChannelCredentials(),cargs);
  217. stub_ = iv::Upload::NewStub(channel);
  218. }
  219. std::this_thread::sleep_for(std::chrono::milliseconds(900));
  220. }
  221. }
  222. nlastsend = xTime.elapsed();
  223. }
  224. }
  225. void dec_yaml(const char * stryamlpath)
  226. {
  227. YAML::Node config;
  228. try
  229. {
  230. config = YAML::LoadFile(stryamlpath);
  231. }
  232. catch(YAML::BadFile e)
  233. {
  234. qDebug("load error.");
  235. return;
  236. }
  237. std::vector<std::string> vecmodulename;
  238. if(config["server"])
  239. {
  240. gstrserverip = config["server"].as<std::string>();
  241. }
  242. if(config["port"])
  243. {
  244. gstrserverport = config["port"].as<std::string>();
  245. }
  246. if(config["uploadinterval"])
  247. {
  248. gstruploadinterval = config["uploadinterval"].as<std::string>();
  249. }
  250. if(config["VIN"])
  251. {
  252. gstrVIN = config["VIN"].as<std::string>();
  253. }
  254. if(config["queryMD5"])
  255. {
  256. gstrqueryMD5 = config["queryMD5"].as<std::string>();
  257. }
  258. else
  259. {
  260. return;
  261. }
  262. if(config["ctrlMD5"])
  263. {
  264. gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
  265. }
  266. std::string strmsgname;
  267. if(config["uploadmessage"])
  268. {
  269. for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
  270. {
  271. std::string strtitle = it->first.as<std::string>();
  272. std::cout<<strtitle<<std::endl;
  273. if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
  274. {
  275. iv::msgunit xmu;
  276. strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
  277. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  278. xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
  279. xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
  280. if(config["uploadmessage"][strtitle]["bimportant"])
  281. {
  282. std::string strimportant = config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
  283. if(strimportant == "true")
  284. {
  285. xmu.mbImportant = true;
  286. }
  287. }
  288. if(config["uploadmessage"][strtitle]["keeptime"])
  289. {
  290. std::string strkeep = config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
  291. xmu.mnkeeptime = atoi(strkeep.data());
  292. }
  293. mvectormsgunit.push_back(xmu);
  294. }
  295. }
  296. }
  297. else
  298. {
  299. }
  300. if(!config["ctrlMD5"])
  301. {
  302. return;
  303. }
  304. if(config["ctrlmessage"])
  305. {
  306. std::string strnodename = "ctrlmessage";
  307. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  308. {
  309. std::string strtitle = it->first.as<std::string>();
  310. std::cout<<strtitle<<std::endl;
  311. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  312. {
  313. iv::msgunit xmu;
  314. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  315. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  316. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  317. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  318. mvectorctrlmsgunit.push_back(xmu);
  319. }
  320. }
  321. }
  322. else
  323. {
  324. }
  325. return;
  326. }
  327. */
  328. int main(int argc, char *argv[])
  329. {
  330. showversion("driver_cloud_grpc_client");
  331. QCoreApplication a(argc, argv);
  332. // std::thread * ptest = new std::thread(test);
  333. // return a.exec();
  334. char stryamlpath[256];
  335. if(argc<2)
  336. {
  337. snprintf(stryamlpath,255,"driver_cloud_grpc_client.yaml");
  338. // strncpy(stryamlpath,abs_ymlpath,255);
  339. }
  340. else
  341. {
  342. strncpy(stryamlpath,argv[1],255);
  343. }
  344. // dec_yaml(stryamlpath);
  345. grpcclient * pgrpcclient = new grpcclient(stryamlpath);
  346. pgrpcclient->start();
  347. // int i;
  348. // for(i=0;i<mvectormsgunit.size();i++)
  349. // {
  350. // mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
  351. // }
  352. // for(i=0;i<mvectorctrlmsgunit.size();i++)
  353. // {
  354. // mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
  355. // mvectorctrlmsgunit[i].mnBufferCount);
  356. // }
  357. // guploadthread = new std::thread(threadupload);
  358. return a.exec();
  359. }