Browse Source

change driver_cloud_grpc_..._stream.

yuchuli 4 years ago
parent
commit
0dcf273d20

+ 112 - 132
src/driver/driver_cloud_grpc_client_stream/grpcclient.cpp

@@ -11,6 +11,7 @@ void ListenData(const char * strdata,const unsigned int nSize,const unsigned int
 
 grpcclient::grpcclient(std::string stryamlpath)
 {
+
     ggrpcclient = this;
     dec_yaml(stryamlpath.data());
 
@@ -27,14 +28,19 @@ grpcclient::grpcclient(std::string stryamlpath)
     }
 }
 
-void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,bool * pbrun)
+
+void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
 {
+    std::cout<<"threadsend start "<<std::endl;
     int nsize = mvectormsgunit.size();
     int i;
 
     int ninterval = atoi(gstruploadinterval.data());
     if(ninterval<=0)ninterval = 100;
 
+    int nrawinterval = ninterval;
+    int nok = 0;
+
     QTime xTime;
     xTime.start();
     int nlastsend = xTime.elapsed();
@@ -98,28 +104,75 @@ void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::Uploa
 
             nlastsend = xTime.elapsed();
 
-            bool bsend = writer->Write(request);
-            std::cout<<"send msg. rtn is "<<bsend<<std::endl;
+            QTime xt;
+            xt.start();
+            ::grpc::WriteOptions wo;
+//            wo.set_write_through();
+//            wo.clear_buffer_hint();
+    //        writer->Write(request,(void * )2);
+
+    //        bool bsend = true;
+            bool bsend = writer->Write(request,wo);
+
+            *nlastreftime = QDateTime::currentMSecsSinceEpoch();
+
+            if(xt.elapsed()>10)
+            {
+                nok = 0;
+                if(ninterval < 1000)ninterval = ninterval * 11/10;
+                qDebug("send ela is %d ninterval is %d",xt.elapsed(),ninterval);
+            }
+            else
+            {
+                nok++;
+                if((ninterval > nrawinterval)&&(nok>10))
+                {
+                    nok = 0;
+
+                    ninterval = ninterval*10/11;
+                    std::cout<<"ninterval is "<<ninterval<<std::endl;
+                }
+            }
+            if(bsend == false)std::cout<<"send msg. rtn is "<<bsend<<std::endl;
 
 
         }
 
 
     }
+    std::cout<<"thread send end."<<std::endl;
 }
 
-void grpcclient::run()
+
+
+void grpcclient::threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer, std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime)
 {
-    int nsize = mvectormsgunit.size();
-    int i;
+    std::cout<<"threadrecv start"<<std::endl;
+    iv::UploadReplyStream reply;
+    while (writer->Read(&reply)) {
+
+        *nlastreftime = QDateTime::currentMSecsSinceEpoch();
+//        nfail = 0;
+//        std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
+        if(reply.nres() == 1)
+        {
+            iv::cloud::cloudmsg xmsg;
+            if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+            {
+                sharectrlmsg(&xmsg);
+            }
+        }
 
-    int ninterval = atoi(gstruploadinterval.data());
-    if(ninterval<=0)ninterval = 100;
+//            std::cout<<"read data from server."<<std::endl;
+    }
+    std::cout<<"threadrecv end."<<std::endl;
+    *pbrun = false;
+}
 
-    QTime xTime;
-    xTime.start();
-    int nlastsend = xTime.elapsed();
 
+void grpcclient::threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun)
+{
+    std::cout<<"threadrpc start."<<std::endl;
     std::string target_str = gstrserverip+":";
     target_str = target_str + gstrserverport ;//std::to_string()
     auto cargs = grpc::ChannelArguments();
@@ -132,151 +185,78 @@ void grpcclient::run()
     std::unique_ptr<iv::UploadStream::Stub> stub_ = iv::UploadStream::NewStub(channel);
 
 
-    iv::UploadRequestStream request;
-
     int nfail = 0;
 
     while(!QThread::isInterruptionRequested())
     {
         ClientContext context ;
+//        std::shared_ptr<ClientContext> pcontext(new ClientContext);
+        std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writerRead(stub_->upload(&context));
 
 
+//        std::shared_ptr<bool> pbRun(new bool);
+        *pbRun = true;
+        std::shared_ptr<qint64> pntime = pnrpctime ;
+        *pntime = QDateTime::currentMSecsSinceEpoch();
+        std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,pbRun,pntime);
+        (void )pthread;
+        std::thread * precvthread = new std::thread(&grpcclient::threadrecv,this,writerRead,pbRun,pntime);
+        (void)precvthread;
 
+        pthread->join();
+        precvthread->join();
 
-        std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writerRead(stub_->upload(&context));
+ //       std::cout<<"threadRPC end"<<std::endl;
+ //       *pbRun = false;
 
-        bool bRun = true;
-        std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,&bRun);
-        (void )pthread;
-        iv::UploadReplyStream reply;
-        while (writerRead->Read(&reply)) {
-            nfail = 0;
-    //        std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
-            if(reply.nres() == 1)
-            {
-                iv::cloud::cloudmsg xmsg;
-                if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
-                {
-                    sharectrlmsg(&xmsg);
-                }
-            }
-            std::cout<<"read data from server."<<std::endl;
-        }
-        bRun = false;
-        pthread->join();
-        nfail++;
-        if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
-        else std::this_thread::sleep_for(std::chrono::milliseconds(100));
-        std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
-    }
+//        while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<3000)&&(*pbRun))
+//        {
+//            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+//        }
+
+//        *pbRun = false;
 
-    int nid = 0;
 
-    // Container for the data we expect from the server.
-//    iv::UploadReply reply;
 
-    gpr_timespec timespec;
-      timespec.tv_sec = 30;//设置阻塞时间为2秒
-      timespec.tv_nsec = 0;
-      timespec.clock_type = GPR_TIMESPAN;
+        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
 
- //   ClientContext context;
+        channel = grpc::CreateCustomChannel(
+                 target_str, grpc::InsecureChannelCredentials(),cargs);
+        stub_ = iv::UploadStream::NewStub(channel);
+        nfail++;
+//        if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
+//        else std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
+    }
+}
 
+void grpcclient::run()
+{
+            std::shared_ptr<bool> pbRun(new bool);
+            *pbRun = true;
+                    std::shared_ptr<qint64> pntime(new qint64);
+                    *pntime = QDateTime::currentMSecsSinceEpoch();
 
+    std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
+    return;
 
 //    while(!QThread::isInterruptionRequested())
 //    {
-//        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-//        if((xTime.elapsed()-nlastsend)<ninterval)
+//        std::shared_ptr<bool> pbRun(new bool);
+//        *pbRun = true;
+//        std::shared_ptr<qint64> pntime(new qint64);
+//        *pntime = QDateTime::currentMSecsSinceEpoch();
+//        std::thread * pthread = new std::thread(&grpcclient::threadRPC,this,pntime,pbRun);
+//        (void )pthread;
+//        while(((QDateTime::currentMSecsSinceEpoch() - *pntime)<10000)&&(*pbRun))
 //        {
-//            continue;
+//            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
 //        }
-
-//            bool bImportant = false;
-//            int nkeeptime = 0;
-//            iv::cloud::cloudmsg xmsg;
-//            xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
-//            gMutexMsg.lock();
-//            for(i=0;i<nsize;i++)
-//            {
-//                if(mvectormsgunit[i].mbRefresh)
-//                {
-//                    mvectormsgunit[i].mbRefresh = false;
-//                    if(mvectormsgunit[i].mbImportant)
-//                    {
-//                        bImportant = true;
-//                    }
-//                    if(mvectormsgunit[i].mnkeeptime > nkeeptime)
-//                    {
-//                        nkeeptime = mvectormsgunit[i].mnkeeptime;
-//                    }
-//                    iv::cloud::cloudunit xcloudunit;
-//                    xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
-//                    xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
-//                    iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
-//                    pcu->CopyFrom(xcloudunit);
-//                }
-
-//            }
-//            gMutexMsg.unlock();
-
-//            int nbytesize = xmsg.ByteSize();
-//            char * strbuf = new char[nbytesize];
-//            std::shared_ptr<char> pstrbuf;
-//            pstrbuf.reset(strbuf);
-//            if(xmsg.SerializeToArray(strbuf,nbytesize))
-//            {
-
-//                ClientContext context ;
-//                context.set_deadline(timespec);
-//                qint64 time1 = QDateTime::currentMSecsSinceEpoch();
-
-//                request.set_id(nid);
-//                request.set_ntime(time1);
-//                request.set_strquerymd5(gstrqueryMD5);
-//                request.set_strctrlmd5(gstrctrlMD5);
-//                request.set_strvin(gstrVIN);
-//                request.set_xdata(strbuf,nbytesize);
-//                request.set_kepptime(nkeeptime);
-//                request.set_bimportant(bImportant);
-//                nid++;
-
-//                nlastsend = xTime.elapsed();
-//                // The actual RPC.
-//                Status status = stub_->upload(&context, request, &reply);
-//                if (status.ok()) {
-//                    std::cout<<"  data size is "<<nbytesize<<std::endl;
-//                    std::cout<<nid<<" upload successfully"<<std::endl;
-//                    if(reply.nres() == 1)
-//                    {
-//                        iv::cloud::cloudmsg xmsg;
-//                        if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
-//                        {
-//                            sharectrlmsg(&xmsg);
-//                        }
-//                    }
-//                } else {
-//                  std::cout << status.error_code() << ": " << status.error_message()
-//                            << std::endl;
-//                  std::cout<<"RPC failed"<<std::endl;
-//                  if(status.error_code() == 4)
-//                  {
-//                      std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
-//                      channel = grpc::CreateCustomChannel(
-//                               target_str, grpc::InsecureChannelCredentials(),cargs);
-
-//                      stub_ = iv::Upload::NewStub(channel);
-//                  }
-//                  std::this_thread::sleep_for(std::chrono::milliseconds(900));
-
-//                }
-
-//            }
-
+//        std::cout<<" reconect."<<std::endl;
+//        *pbRun = false;
 
 //    }
 
-
 }
 
 

+ 4 - 3
src/driver/driver_cloud_grpc_client_stream/grpcclient.h

@@ -64,8 +64,6 @@ private:
     std::thread * guploadthread;
 
 
-
-
     std::vector<iv::msgunit> mvectormsgunit;
 
     std::vector<iv::msgunit> mvectorctrlmsgunit;
@@ -85,7 +83,10 @@ private:
     void dec_yaml(const char * stryamlpath);
     void sharectrlmsg(iv::cloud::cloudmsg * pxmsg);
 
-    void threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,bool * pbrun);
+    void threadRPC(std::shared_ptr<qint64> pnrpctime,std::shared_ptr<bool> pbRun);
+
+    void threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime);
+    void threadrecv(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,std::shared_ptr<bool> pbrun,std::shared_ptr<qint64> nlastreftime);
 };
 
 #endif // GRPCCLIENT_H

+ 8 - 0
src/driver/driver_cloud_grpc_client_stream/uploadstreammsg.proto

@@ -42,12 +42,17 @@ message UploadRequestStream {
     bytes xdata = 6;
     bool bimportant = 7;  //if 1, is important.
     int32 kepptime = 8;   //If important keep this data before query ms.
+    float fFrameRate = 9;
+    int64 nSendTime = 10;
+    int64 nLatency = 11;
 }
 
 // The response message containing the greetings
 message UploadReplyStream {
   int32 nres = 1;  //0 no message 1 have ctrl message
   bytes xdata = 2;
+  int32 nreqid = 3;  
+  int64 nreqSendTime = 4;  //id and SendTime used for calculate latency. 
 //  string message = 1;
 }
 
@@ -61,6 +66,7 @@ message queryReqStream {
   bool bimportant = 6;  //if 1, is important.
   int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
   int32 ntype = 8;  //0 only query  1 ctrl.
+  float nSuggestFrameRate = 9;
 }
 
 message queryReplyStream {
@@ -68,6 +74,8 @@ message queryReplyStream {
     int32 id = 2;
     int64 ntime = 3;
     bytes xdata = 4;
+    int64 nculatency = 5;
+    float ncuFrameRage = 6;
 }
 
 

+ 8 - 0
src/driver/driver_cloud_grpc_pc_stream/uploadstreammsg.proto

@@ -42,12 +42,17 @@ message UploadRequestStream {
     bytes xdata = 6;
     bool bimportant = 7;  //if 1, is important.
     int32 kepptime = 8;   //If important keep this data before query ms.
+    float fFrameRate = 9;
+    int64 nSendTime = 10;
+    int64 nLatency = 11;
 }
 
 // The response message containing the greetings
 message UploadReplyStream {
   int32 nres = 1;  //0 no message 1 have ctrl message
   bytes xdata = 2;
+  int32 nreqid = 3;  
+  int64 nreqSendTime = 4;  //id and SendTime used for calculate latency. 
 //  string message = 1;
 }
 
@@ -61,6 +66,7 @@ message queryReqStream {
   bool bimportant = 6;  //if 1, is important.
   int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
   int32 ntype = 8;  //0 only query  1 ctrl.
+  float nSuggestFrameRate = 9;
 }
 
 message queryReplyStream {
@@ -68,6 +74,8 @@ message queryReplyStream {
     int32 id = 2;
     int64 ntime = 3;
     bytes xdata = 4;
+    int64 nculatency = 5;
+    float ncuFrameRage = 6;
 }
 
 

+ 8 - 0
src/driver/driver_cloud_grpc_server_stream/uploadstreammsg.proto

@@ -42,12 +42,17 @@ message UploadRequestStream {
     bytes xdata = 6;
     bool bimportant = 7;  //if 1, is important.
     int32 kepptime = 8;   //If important keep this data before query ms.
+    float fFrameRate = 9;
+    int64 nSendTime = 10;
+    int64 nLatency = 11;
 }
 
 // The response message containing the greetings
 message UploadReplyStream {
   int32 nres = 1;  //0 no message 1 have ctrl message
   bytes xdata = 2;
+  int32 nreqid = 3;  
+  int64 nreqSendTime = 4;  //id and SendTime used for calculate latency. 
 //  string message = 1;
 }
 
@@ -61,6 +66,7 @@ message queryReqStream {
   bool bimportant = 6;  //if 1, is important.
   int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
   int32 ntype = 8;  //0 only query  1 ctrl.
+  float nSuggestFrameRate = 9;
 }
 
 message queryReplyStream {
@@ -68,6 +74,8 @@ message queryReplyStream {
     int32 id = 2;
     int64 ntime = 3;
     bytes xdata = 4;
+    int64 nculatency = 5;
+    float ncuFrameRage = 6;
 }
 
 

+ 8 - 0
src/include/proto3/uploadstreammsg.proto

@@ -42,12 +42,17 @@ message UploadRequestStream {
     bytes xdata = 6;
     bool bimportant = 7;  //if 1, is important.
     int32 kepptime = 8;   //If important keep this data before query ms.
+    float fFrameRate = 9;
+    int64 nSendTime = 10;
+    int64 nLatency = 11;
 }
 
 // The response message containing the greetings
 message UploadReplyStream {
   int32 nres = 1;  //0 no message 1 have ctrl message
   bytes xdata = 2;
+  int32 nreqid = 3;  
+  int64 nreqSendTime = 4;  //id and SendTime used for calculate latency. 
 //  string message = 1;
 }
 
@@ -61,6 +66,7 @@ message queryReqStream {
   bool bimportant = 6;  //if 1, is important.
   int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
   int32 ntype = 8;  //0 only query  1 ctrl.
+  float nSuggestFrameRate = 9;
 }
 
 message queryReplyStream {
@@ -68,6 +74,8 @@ message queryReplyStream {
     int32 id = 2;
     int64 ntime = 3;
     bytes xdata = 4;
+    int64 nculatency = 5;
+    float ncuFrameRage = 6;
 }