123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- #include <QCoreApplication>
- #include <yaml-cpp/yaml.h>
- #include <QDateTime>
- #include <iostream>
- #include <vector>
- #include <memory>
- #include <QMutex>
- #include <thread>
- #include "modulecomm.h"
- #include "rpc_client.hpp"
- #include "cloud.pb.h"
- std::string gstrserverip = "123.57.212.138";
- std::string gstrserverport = "9000";
- std::string gstruploadinterval = "1000";
- void * gpa;
- QMutex gMutexMsg;
- std::thread * guploadthread;
- namespace iv {
- struct msgunit
- {
- char mstrmsgname[256];
- int mnBufferSize = 10000;
- int mnBufferCount = 1;
- void * mpa;
- std::shared_ptr<char> mpstrmsgdata;
- int mndatasize = 0;
- bool mbRefresh = false;
- };
- }
- std::vector<iv::msgunit> mvectormsgunit;
- std::vector<iv::msgunit> mvectorctrlmsgunit;
- std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
- std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c592";
- std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c592";
- struct uploadresponsemsg {
- int nres; //0 no message 1 have message
- int id;
- qint64 ntime;
- std::string strVIN;
- std::string strctrlMD5;
- std::vector<char> xdata;
- MSGPACK_DEFINE(nres,id, ntime,strVIN, strctrlMD5,xdata);
- };
- int gindex = 0;
- void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
- {
- int nsize = mvectormsgunit.size();
- int i;
- for(i=0;i<nsize;i++)
- {
- if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
- {
- gMutexMsg.lock();
- char * strtem = new char[nSize];
- memcpy(strtem,strdata,nSize);
- mvectormsgunit[i].mpstrmsgdata.reset(strtem);
- mvectormsgunit[i].mndatasize = nSize;
- mvectormsgunit[i].mbRefresh = true;
- gMutexMsg.unlock();
- break;
- }
- }
- }
- void sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
- {
- int i;
- int nsize = pxmsg->xclouddata_size();
- for(i=0;i<nsize;i++)
- {
- int j;
- int nquerysize = mvectorctrlmsgunit.size();
- for(j=0;j<nquerysize;j++)
- {
- if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
- {
- // qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
- iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
- break;
- }
- }
- }
- }
- void threadupload()
- {
- int nsize = mvectormsgunit.size();
- int i;
- int ninterval = atoi(gstruploadinterval.data());
- if(ninterval<=0)ninterval = 100;
- QTime xTime;
- xTime.start();
- int nlastsend = xTime.elapsed();
- bool r;
- rest_rpc::rpc_client client;
- client.enable_auto_reconnect(); //automatic reconnect
- client.enable_auto_heartbeat(); //automatic heartbeat
- // bool r = client.connect("140.143.237.38", 9000);
- r = client.connect(gstrserverip, atoi(gstrserverport.data()));
- int count = 0;
- CT:
- while (true) {
- if (client.has_connected()) {
- std::cout << "connected ok\n";
- break;
- }
- else {
- std::cout << "connected failed: "<< count++<<"\n";
- }
- std::this_thread::sleep_for(std::chrono::seconds(1));
- }
- while(true)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- if((xTime.elapsed()-nlastsend)<ninterval)
- {
- continue;
- }
- try
- {
- iv::cloud::cloudmsg xmsg;
- xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
- gMutexMsg.lock();
- for(i=0;i<nsize;i++)
- {
- if(mvectormsgunit[i].mbRefresh)
- {
- mvectormsgunit[i].mbRefresh = false;
- iv::cloud::cloudunit xcloudunit;
- xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
- xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
- iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
- pcu->CopyFrom(xcloudunit);
- }
- }
- gMutexMsg.unlock();
- int nbytesize = xmsg.ByteSize();
- std::vector<char> pvectordata;
- pvectordata.resize(nbytesize);
- if(xmsg.SerializeToArray(pvectordata.data(),nbytesize))
- {
- uploadresponsemsg result = client.call<60000,uploadresponsemsg>("get_clientresp",gindex,QDateTime::currentMSecsSinceEpoch(),
- gstrVIN,gstrqueryMD5,gstrctrlMD5,pvectordata);
- // qDebug("res is %d ",result.nres);
- if(result.nres == 1)
- {
- iv::cloud::cloudmsg xmsg;
- if(xmsg.ParseFromArray(result.xdata.data(),result.xdata.size()))
- {
- sharectrlmsg(&xmsg);
- }
- }
- gindex++;
- }
- nlastsend = xTime.elapsed();
- pvectordata.clear();
- }
- catch (const std::exception & e) {
- std::cout << e.what() << std::endl;
- // client.close();
- goto CT;
- }
- }
- }
- void dec_yaml(const char * stryamlpath)
- {
- YAML::Node config;
- try
- {
- config = YAML::LoadFile(stryamlpath);
- }
- catch(YAML::BadFile e)
- {
- qDebug("load error.");
- return;
- }
- std::vector<std::string> vecmodulename;
- if(config["server"])
- {
- gstrserverip = config["server"].as<std::string>();
- }
- if(config["port"])
- {
- gstrserverport = config["port"].as<std::string>();
- }
- if(config["uploadinterval"])
- {
- gstruploadinterval = config["uploadinterval"].as<std::string>();
- }
- std::string strmsgname;
- if(config["uploadmessage"])
- {
- for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
- {
- std::string strtitle = it->first.as<std::string>();
- std::cout<<strtitle<<std::endl;
- if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
- {
- iv::msgunit xmu;
- strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
- strncpy(xmu.mstrmsgname,strmsgname.data(),255);
- xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
- xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
- mvectormsgunit.push_back(xmu);
- }
- }
- }
- else
- {
- }
- if(config["ctrlmessage"])
- {
- std::string strnodename = "ctrlmessage";
- for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
- {
- std::string strtitle = it->first.as<std::string>();
- std::cout<<strtitle<<std::endl;
- if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
- {
- iv::msgunit xmu;
- strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
- strncpy(xmu.mstrmsgname,strmsgname.data(),255);
- xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
- xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
- mvectorctrlmsgunit.push_back(xmu);
- }
- }
- }
- else
- {
- }
- return;
- }
- int main(int argc, char *argv[])
- {
- QCoreApplication a(argc, argv);
- char stryamlpath[256];
- if(argc<2)
- {
- snprintf(stryamlpath,255,"driver_cloud_client.yaml");
- // strncpy(stryamlpath,abs_ymlpath,255);
- }
- else
- {
- strncpy(stryamlpath,argv[1],255);
- }
- dec_yaml(stryamlpath);
- int i;
- for(i=0;i<mvectormsgunit.size();i++)
- {
- mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
- }
- for(i=0;i<mvectorctrlmsgunit.size();i++)
- {
- mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
- mvectorctrlmsgunit[i].mnBufferCount);
- }
- guploadthread = new std::thread(threadupload);
- return a.exec();
- }
|