123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- #include <QCoreApplication>
- #include <QDateTime>
- #include <iostream>
- #include <vector>
- #include "cumsgbuffer.h"
- #include "pcmsgbuffer.h"
- #include <iostream>
- #include <memory>
- #include <string>
- #include <grpcpp/grpcpp.h>
- #include <grpcpp/health_check_service_interface.h>
- #include <grpcpp/ext/proto_server_reflection_plugin.h>
- #include "uploadstreammsg.grpc.pb.h"
- using grpc::Server;
- using grpc::ServerBuilder;
- using grpc::ServerContext;
- using grpc::Status;
- #include <QDateTime>
- #include <thread>
- static cumsgbuffer gcumsgbuf;
- static pcmsgbuffer gpcmsgbuf;
- void uploadsend(::grpc::ServerReaderWriter<iv::UploadReplyStream, iv::UploadRequestStream>* stream,bool * pbrun,
- std::string * pstrvin,std::string * pstrmd5,bool *pbUpdatemd4orvin,QMutex * pmutex,int * preqid,qint64 * pnsendtime,qint64 * recvtime,QMutex * pmutexidtime)
- {
- std::string strvin;
- std::string strmd5;
- pmutex->lock();
- strvin = *pstrmd5;
- strmd5 = *pstrvin;
- pmutex->unlock();
- QTime xTime;
- xTime.start();
- int nlastsend = xTime.elapsed();
- while(*pbrun)
- {
- if(*pbUpdatemd4orvin)
- {
- pmutex->lock();
- strvin = *pstrvin;
- strmd5 = *pstrmd5;
- *pbUpdatemd4orvin = false;
- pmutex->unlock();
- }
- int id;
- qint64 ntime;
- float framerate;
- std::vector<char > xvectorctrldata;
- int nres = gpcmsgbuf.getmsg(strvin,strmd5,id,ntime,&xvectorctrldata,&framerate);
- iv::UploadReplyStream reply;
- pmutexidtime->lock();
- reply.set_nreqsendtime(*pnsendtime);
- reply.set_nreqid(*preqid);
- reply.set_npausetime(QDateTime::currentMSecsSinceEpoch() - *recvtime);
- pmutexidtime->unlock();
- if(nres == 1)
- {
- reply.set_nres(nres);
- reply.set_xdata(xvectorctrldata.data(),xvectorctrldata.size());
- reply.set_framerate(framerate);
- stream->Write(reply);
- nlastsend = xTime.elapsed();
- }
- else
- {
- if(abs(xTime.elapsed() - nlastsend)>1000)
- {
- reply.set_nres(nres);
- reply.set_framerate(0);
- stream->Write(reply);
- nlastsend = xTime.elapsed();
- }
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- }
- qint64 calcserverlatency(std::vector<qint64> & xvectorserverlatency,qint64 nlatency)
- {
- xvectorserverlatency.push_back(nlatency);
- while(xvectorserverlatency.size()>5)xvectorserverlatency.erase(xvectorserverlatency.begin());
- if(xvectorserverlatency.size()<1)return 0;
- int nsize = xvectorserverlatency.size();
- int i;
- qint64 nrtn = 0;
- for(i=0;i<nsize;i++)nrtn = nrtn + xvectorserverlatency.at(i);
- return nrtn/nsize;
- }
- void queryctrlsend(::grpc::ServerReaderWriter<iv::queryReplyStream, iv::queryReqStream>* stream,bool * pbrun,
- std::string * pstrvin,std::string * pstrmd5,bool *pbUpdatemd4orvin,QMutex * pmutex,qint64 * pnserverlatency)
- {
- std::string strvin;
- std::string strmd5;
- pmutex->lock();
- strvin = *pstrmd5;
- strmd5 = *pstrvin;
- pmutex->unlock();
- QTime xTime;
- xTime.start();
- int nlastsend = xTime.elapsed();
- qint64 nlastdatatime = 0;
- qint64 nculatency;
- while(*pbrun)
- {
- if(*pbUpdatemd4orvin)
- {
- pmutex->lock();
- strvin = *pstrvin;
- strmd5 = *pstrmd5;
- *pbUpdatemd4orvin = false;
- pmutex->unlock();
- }
- int id;
- qint64 ntime;
- std::vector<char > xvectorquerydata;
- int nres = gcumsgbuf.getmsg(strvin,strmd5,nlastdatatime,id,ntime,&xvectorquerydata,&nculatency);
- nlastdatatime = ntime;
- iv::queryReplyStream reply;
- reply.set_nres(nres);
- if(nres > 0)
- {
- reply.set_xdata(xvectorquerydata.data(),xvectorquerydata.size());
- reply.set_id(id);
- reply.set_ntime(ntime);
- reply.set_nculatency(nculatency);
- reply.set_nserverlatency(*pnserverlatency);
- reply.set_nsendtime(QDateTime::currentMSecsSinceEpoch());
- stream->Write(reply);
- nlastsend = xTime.elapsed();
- }
- else
- {
- if(abs(xTime.elapsed() - nlastsend)>1000)
- {
- stream->Write(reply);
- nlastsend = xTime.elapsed();
- }
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- }
- // Logic and data behind the server's behavior.
- class UploadServiceImpl final : public iv::UploadStream::Service {
- Status upload(ServerContext* context, ::grpc::ServerReaderWriter<iv::UploadReplyStream, iv::UploadRequestStream>* stream) override {
- iv::UploadRequestStream request;
- bool brun = true;
- std::string strctrlmd5 = "md5";
- std::string strvin = "aaa";
- bool bUpdatemd4orvin = false;
- QMutex uploadmutex;
- int reqid = -1;
- qint64 pnsendtime = 0;
- qint64 nrecvtime = 0;
- QMutex mutexidtime;
- std::thread * pthread = new std::thread(uploadsend,stream,&brun,&strvin,&strctrlmd5,&bUpdatemd4orvin,&uploadmutex,
- &reqid,&pnsendtime,&nrecvtime,&mutexidtime);
- std::cout<<"new connect."<<std::endl;
- while (stream->Read(&request))
- {
- std::cout<<" rec req."<<std::endl;
- std::vector<char> xvectordata;
- qDebug("size is %d",request.xdata().size());
- if((strctrlmd5 != request.strctrlmd5())||(strvin != request.strvin()))
- {
- uploadmutex.lock();
- strctrlmd5 = request.strctrlmd5();
- strvin = request.strvin();
- bUpdatemd4orvin = true;
- uploadmutex.unlock();
- }
- if(request.xdata().size()>0)
- {
- xvectordata.resize(request.xdata().size());
- memcpy(xvectordata.data(),request.xdata().data(),request.xdata().size());
- }
- mutexidtime.lock();
- reqid = request.id();
- pnsendtime = request.nsendtime();
- nrecvtime = QDateTime::currentMSecsSinceEpoch();
- mutexidtime.unlock();
- gcumsgbuf.addmsg(request.id(),request.ntime(),request.strvin(),request.strquerymd5(),request.strctrlmd5(),
- &xvectordata,request.bimportant(),request.kepptime(),request.nlatency());
- // std::cout << "收到请求,类型为" << request.askmsg() <<"\n"<<std::endl;
- }
- std::cout<<" no conn"<<std::endl;
- brun = false;
- pthread->join();
- std::cout<<"dis connect."<<std::endl;
- return Status::OK;
- }
- Status queryctrl(ServerContext* context, ::grpc::ServerReaderWriter<iv::queryReplyStream, iv::queryReqStream>* stream) override {
- iv::queryReqStream request;
- bool brun = true;
- std::string strctrlmd5 = "md5";
- std::string strquerymd5 = "md5";
- std::string strvin = "aaa";
- bool bUpdatemd4orvin = false;
- QMutex uploadmutex;
- qint64 nreplysendtime;
- qint64 npausetime;
- std::vector<qint64> xvectorserverlatency;
- qint64 nserverlatency = 0;
- std::thread * pthread = new std::thread(queryctrlsend,stream,&brun,&strvin,&strquerymd5,&bUpdatemd4orvin,&uploadmutex,&nserverlatency);
- std::cout<<"new connect."<<std::endl;
- while (stream->Read(&request))
- {
- std::cout<<" rec req."<<std::endl;
- std::vector<char> xvectordata;
- qDebug("size is %d",request.xdata().size());
- if((strquerymd5 != request.strquerymd5())||(strvin != request.strvin()))
- {
- uploadmutex.lock();
- strquerymd5 = request.strquerymd5();
- strvin = request.strvin();
- bUpdatemd4orvin = true;
- uploadmutex.unlock();
- }
- if(request.xdata().size()>0)
- {
- xvectordata.resize(request.xdata().size());
- memcpy(xvectordata.data(),request.xdata().data(),request.xdata().size());
- }
- nreplysendtime = request.nreplysendtime();
- npausetime = request.npausetime();
- if(nreplysendtime > 0)
- {
- nserverlatency = calcserverlatency(xvectorserverlatency,QDateTime::currentMSecsSinceEpoch() - nreplysendtime - npausetime);
- }
- static int tempid = 0;
- tempid++;
- int nid = gpcmsgbuf.addmsg(tempid,request.ntime(),request.strvin(),request.strctrlmd5(),&xvectordata,
- request.bimportant(),request.kepptime(),request.nsuggestframerate());
- (void)&nid;
- // std::cout << "收到请求,类型为" << request.askmsg() <<"\n"<<std::endl;
- }
- std::cout<<" no conn"<<std::endl;
- brun = false;
- pthread->join();
- std::cout<<"dis connect."<<std::endl;
- return Status::OK;
- }
- // Status query(ServerContext* context, const iv::queryreq* request,
- // iv::queryReply* reply) override {
- // int id;
- // qint64 ntime;
- // std::vector<char > xvectorquerydata;
- // int nres = gcumsgbuf.getmsg(request->strvin(),request->strquerymd5(),request->nlasttime(),id,ntime,&xvectorquerydata);
- // reply->set_nres(nres);
- // if(nres > 0)
- // {
- // reply->set_data(xvectorquerydata.data(),xvectorquerydata.size());
- // reply->set_id(id);
- // reply->set_ntime(ntime);
- // }
- // return Status::OK;
- // }
- // Status ctrl(ServerContext* context, const iv::ctrlreq* request,
- // iv::ctrlReply * reply) override {
- // std::vector<char> xvectordata;
- // if(request->data().size()>0)
- // {
- // xvectordata.resize(request->data().size());
- // memcpy(xvectordata.data(),request->data().data(),request->data().size());
- // }
- // int nid = gpcmsgbuf.addmsg(request->id(),request->ntime(),request->strvin(),request->strctrlmd5(),&xvectordata,
- // request->bimportant(),request->kepptime());
- // reply->set_nsendid(nid);
- // return Status::OK;
- // }
- };
- void RunServer() {
- std::string server_address("0.0.0.0:50051");
- UploadServiceImpl service;
- grpc::EnableDefaultHealthCheckService(true);
- // grpc::reflection::InitProtoReflectionServerBuilderPlugin();
- ServerBuilder builder;
- // Listen on the given address without any authentication mechanism.
- builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
- builder.SetMaxReceiveMessageSize(300000000);
- // builder.SetMaxMessageSize(100000000);
- // builder.SetMaxSendMessageSize(100000000);
- // Register "service" as the instance through which we'll communicate with
- // clients. In this case it corresponds to an *synchronous* service.
- builder.RegisterService(&service);
- // Finally assemble the server.
- std::unique_ptr<Server> server(builder.BuildAndStart());
- std::cout << "Server listening on " << server_address << std::endl;
- // Wait for the server to shutdown. Note that some other thread must be
- // responsible for shutting down the server for this call to ever return.
- server->Wait();
- }
- int main(int argc, char *argv[])
- {
- QCoreApplication a(argc, argv);
- gpcmsgbuf.start();
- RunServer();
- return a.exec();
- }
|