// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. /*! * @file TopicsSubscriber.cpp * This file contains the implementation of the subscriber functions. * * This file was generated by the tool fastcdrgen. */ #ifdef USE_FASTRTPS #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "TopicsSubscriber.h" using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastdds::rtps; #include TopicsSubscriber::TopicsSubscriber() : mp_participant(nullptr), mp_subscriber(nullptr) {} TopicsSubscriber::~TopicsSubscriber() { Domain::removeParticipant(mp_participant);} bool TopicsSubscriber::init(const char * strtopic,const char * strpubip,const unsigned short nPort,int ntype) { strncpy(mstrtopic,strtopic,255); // Create RTPSParticipant ParticipantAttributes PParam; PParam.rtps.setName("Participant_subscriber"); //You can put the name you want PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE; PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true; PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true; PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true; PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite; // SharedMem transport configuration PParam.rtps.useBuiltinTransports = false; if(ntype == 0) { auto sm_transport = std::make_shared(); sm_transport->segment_size(100*1024*1024); PParam.rtps.userTransports.push_back(sm_transport); // UDP auto udp_transport = std::make_shared(); //udp_transport->interfaceWhiteList.push_back("127.0.0.1"); PParam.rtps.userTransports.push_back(udp_transport); } else { auto tcp2_transport = std::make_shared(); //Set initial peers. Locator_t initial_peer_locator; initial_peer_locator.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(initial_peer_locator, strpubip); initial_peer_locator.port = nPort; PParam.rtps.builtin.initialPeersList.push_back(initial_peer_locator); // PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite; // PParam.rtps.builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(5, 0); // PParam.rtps.setName("Participant_sub"); tcp2_transport->wait_for_tcp_negotiation = false; //Link the Transport Layer to the Participant. PParam.rtps.userTransports.push_back(tcp2_transport); } mp_participant = Domain::createParticipant(PParam); if(mp_participant == nullptr) { return false; } //Register the type Domain::registerType(mp_participant, static_cast(&myType)); // Create Subscriber SubscriberAttributes Rparam; Rparam.topic.topicKind = NO_KEY; Rparam.topic.topicDataType = myType.getName(); //Must be registered before the creation of the subscriber Rparam.topic.topicName = strtopic;//"TopicsPubSubTopic"; Rparam.topic.historyQos.depth = 30; Rparam.topic.resourceLimitsQos.max_samples = 50; Rparam.topic.resourceLimitsQos.allocated_samples = 20; Rparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; mp_subscriber = Domain::createSubscriber(mp_participant,Rparam, static_cast(&m_listener)); if(mp_subscriber == nullptr) { return false; } return true; } void TopicsSubscriber::SubListener::onSubscriptionMatched(Subscriber* sub,MatchingInfo& info) { (void)sub; if (info.status == MATCHED_MATCHING) { n_matched++; std::cout << "Subscriber matched" << std::endl; } else { n_matched--; std::cout << "Subscriber unmatched" << std::endl; } } void TopicsSubscriber::SubListener::setReceivedTopicFunction(ModuleFun xFun) { mFun = xFun; mbSetFun = true; } void TopicsSubscriber::SubListener::onNewDataMessage(Subscriber* sub) { // Take data TopicSample::Message st; static int ncount = 0; static int nmaxlatancy = 0; std::cout<<"new msg"<takeNextData(pxcdr, &m_info)) // { // if(m_info.sampleKind == ALIVE) // { // // Print your structure data here. // ++n_msg; // std::cout << "Sample received, count=" << n_msg<get_first_untaken_info(&m_info); std::cout<<"count is "<getUnreadCount()<takeNextData(&st, &m_info)) { if(m_info.sampleKind == ALIVE) { // Print your structure data here. ++n_msg; ncount++; std::cout << "Sample received, count=" << st.counter() <<" total: "<nmaxlatancy)nmaxlatancy = nlatancy; std::cout<<" latency is "<