Forráskód Böngészése

change groupctrl_grpc. not complete.

yuchuli 1 éve
szülő
commit
3562c6b67a

+ 68 - 1
src/driver/driver_cloud_swap_client/grpcclient.cpp

@@ -387,7 +387,7 @@ void grpcclient::shareswapmsg(iv::cloud::cloudmsg * pxmsg)
     for(i=0;i<nsize;i++)
     {
         int j;
-        int nquerysize = mvectorctrlmsgunit.size();
+        int nquerysize = static_cast<int>(mvectorctrlmsgunit.size());
         for(j=0;j<nquerysize;j++)
         {
             if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
@@ -397,7 +397,31 @@ void grpcclient::shareswapmsg(iv::cloud::cloudmsg * pxmsg)
                 break;
             }
         }
+
+        if((mnShareMode == Mode_Inter) &&(pxmsg->xclouddata(i).data().size()>0))
+        {
+            iv::swapmsgbuff xswapbuff;
+            xswapbuff.mstrmsgname = pxmsg->xclouddata(i).msgname();
+            xswapbuff.mndatalen = pxmsg->xclouddata(i).data().size();
+            xswapbuff.mpdata_ptr = std::shared_ptr<char>(new char[xswapbuff.mndatalen]);
+            memcpy(xswapbuff.mpdata_ptr.get(),pxmsg->xclouddata(i).data().data(),xswapbuff.mndatalen);
+            if(mvectorswapbuff.size() < 10000)
+            {
+                mmutexswapbuff.lock();
+                mvectorswapbuff.push_back(xswapbuff);
+                mmutexswapbuff.unlock();
+
+                mcv_swapbuff.notify_all();
+            }
+            else
+            {
+                std::cout<<" swap buff full. "<<std::endl;
+            }
+
+        }
     }
+
+
 }
 
 void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
@@ -426,3 +450,46 @@ void grpcclient::SetNodeId(std::string strnodeid,std::string strobjnodeid)
     mstrnodeid = strnodeid;
     mstrobjnodeid = strobjnodeid;
 }
+
+void grpcclient::SetShareMode(ShareMode xMode)
+{
+    mnShareMode = xMode;
+}
+
+int grpcclient::GetSwapBuffData(iv::swapmsgbuff & xswap,int nwaitms)
+{
+    int nrtn = 0;
+    mmutexswapbuff.lock();
+    if(mvectorswapbuff.size()>0)
+    {
+        xswap = mvectorswapbuff[0];
+        mvectorswapbuff.erase(mvectorswapbuff.begin());
+        nrtn = 1;
+    }
+    mmutexswapbuff.unlock();
+
+    if(nrtn == 1)return  nrtn;
+
+    if(nwaitms == 0)return 0;
+
+    std::unique_lock<std::mutex> lk(mcvmutex_swapbuff);
+    if(mcv_swapbuff.wait_for(lk, std::chrono::milliseconds(nwaitms)) == std::cv_status::timeout)
+    {
+        lk.unlock();
+    }
+    else
+    {
+        lk.unlock();
+    }
+
+    mmutexswapbuff.lock();
+    if(mvectorswapbuff.size()>0)
+    {
+        xswap = mvectorswapbuff[0];
+        mvectorswapbuff.erase(mvectorswapbuff.begin());
+        nrtn = 1;
+    }
+    mmutexswapbuff.unlock();
+
+    return nrtn;
+}

+ 28 - 0
src/driver/driver_cloud_swap_client/grpcclient.h

@@ -31,6 +31,8 @@
 
 #include "cloudswap.grpc.pb.h"
 
+#include <condition_variable>
+
 
 namespace iv {
 struct msgunit
@@ -45,6 +47,14 @@ struct msgunit
     bool mbImportant = false;
     int mnkeeptime = 100;
 };
+
+struct swapmsgbuff
+{
+    std::string mstrmsgname;
+    std::shared_ptr<char> mpdata_ptr;
+    unsigned int mndatalen;
+};
+
 }
 
 
@@ -54,6 +64,7 @@ using grpc::Status;
 
 class grpcclient : public QThread
 {
+
 public:
     grpcclient(std::string stryamlpath);
 
@@ -88,6 +99,19 @@ private:
 
     CalcPing * mpCalcPing;
 
+    enum ShareMode
+    {
+        Mode_ModuleShare,
+        Mode_Inter
+    };
+
+    ShareMode mnShareMode  = Mode_ModuleShare;
+
+    std::vector<iv::swapmsgbuff> mvectorswapbuff;
+    std::mutex mmutexswapbuff;
+    std::condition_variable mcv_swapbuff;
+    std::mutex mcvmutex_swapbuff;
+
 private:
     iv::cloud::cloudmsg mcloudemsg;
     std::mutex mmutexmsg;
@@ -98,6 +122,10 @@ public:
     void SetServerPort(std::string strServer,std::string strPort);
     void SetNodeId(std::string strnodeid,std::string strobjnodeid);
 
+    void SetShareMode(ShareMode xMode);
+
+    int GetSwapBuffData(iv::swapmsgbuff & xswap,int nwaitms = 0);
+
 private:
     void run();
     void dec_yaml(const char * stryamlpath);

+ 9 - 0
src/tool/groupctrl_grpc/mainwindow.cpp

@@ -9,6 +9,15 @@ MainWindow::MainWindow(QWidget *parent) :
 
     setWindowTitle(tr("群控控制程序"));
 
+    mpgrpcclient_ptr1 = std::shared_ptr<grpcclient>(new grpcclient(""));
+    mpgrpcclient_ptr2 = std::shared_ptr<grpcclient>(new grpcclient(""));
+
+    mpgrpcclient_ptr1->SetServerPort("127.0.0.1","50061");
+    mpgrpcclient_ptr2->SetServerPort("127.0.0.1","50061");
+
+    mpgrpcclient_ptr1->SetNodeId("10001","1");
+    mpgrpcclient_ptr2->SetNodeId("10002","2");
+
 }
 
 MainWindow::~MainWindow()

+ 3 - 0
src/tool/groupctrl_grpc/mainwindow.h

@@ -19,6 +19,9 @@ public:
 
 private:
     Ui::MainWindow *ui;
+
+    std::shared_ptr<grpcclient> mpgrpcclient_ptr1;
+    std::shared_ptr<grpcclient> mpgrpcclient_ptr2;
 };
 
 #endif // MAINWINDOW_H