123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- #ifndef _FASTDDS_RTPS_STATEFULWRITER_H_
- #define _FASTDDS_RTPS_STATEFULWRITER_H_
- #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
- #include <fastdds/rtps/writer/RTPSWriter.h>
- #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
- #include <condition_variable>
- #include <mutex>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- class ReaderProxy;
- class TimedEvent;
- class StatefulWriter : public RTPSWriter
- {
- friend class RTPSParticipantImpl;
- friend class ReaderProxy;
- public:
-
- virtual ~StatefulWriter();
- protected:
-
- StatefulWriter(
- RTPSParticipantImpl*,
- const GUID_t& guid,
- const WriterAttributes& att,
- WriterHistory* hist,
- WriterListener* listen = nullptr);
- private:
-
- TimedEvent* periodic_hb_event_;
-
- TimedEvent* nack_response_event_;
-
- TimedEvent* ack_event_;
-
- Count_t m_heartbeatCount;
-
- WriterTimes m_times;
-
- ResourceLimitedVector<ReaderProxy*> matched_readers_;
-
- ResourceLimitedVector<ReaderProxy*> matched_readers_pool_;
- using ReaderProxyIterator = ResourceLimitedVector<ReaderProxy*>::iterator;
- using ReaderProxyConstIterator = ResourceLimitedVector<ReaderProxy*>::const_iterator;
-
- SequenceNumber_t next_all_acked_notify_sequence_;
-
- std::mutex all_acked_mutex_;
- std::condition_variable all_acked_cond_;
-
- bool all_acked_;
- std::condition_variable_any may_remove_change_cond_;
- unsigned int may_remove_change_;
- public:
-
- void unsent_change_added_to_history(
- CacheChange_t* p,
- const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
-
- bool change_removed_by_history(
- CacheChange_t* a_change) override;
-
- void send_any_unsent_changes() override;
-
- bool intraprocess_delivery(
- CacheChange_t* change,
- ReaderProxy* reader_proxy);
- bool intraprocess_gap(
- ReaderProxy* reader_proxy,
- const SequenceNumber_t& seq_num);
- bool intraprocess_heartbeat(
- ReaderProxy* reader_proxy,
- bool liveliness = false);
-
- inline void incrementHBCount()
- {
- ++m_heartbeatCount;
- }
-
- bool matched_reader_add(
- const ReaderProxyData& data) override;
-
- bool matched_reader_remove(
- const GUID_t& reader_guid) override;
-
- bool matched_reader_is_matched(
- const GUID_t& reader_guid) override;
- bool is_acked_by_all(
- const CacheChange_t* a_change) const override;
- bool wait_for_all_acked(
- const Duration_t& max_wait) override;
- bool all_readers_updated();
-
- bool try_remove_change(
- const std::chrono::steady_clock::time_point& max_blocking_time_point,
- std::unique_lock<RecursiveTimedMutex>& lock) override;
-
- void updateAttributes(
- const WriterAttributes& att) override;
-
- bool matched_reader_lookup(
- GUID_t& readerGuid,
- ReaderProxy** RP);
-
- inline Count_t getHeartbeatCount() const
- {
- return this->m_heartbeatCount;
- }
-
- inline RTPSParticipantImpl* getRTPSParticipant() const
- {
- return mp_RTPSParticipant;
- }
-
- inline size_t getMatchedReadersSize() const
- {
- std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
- return matched_readers_.size();
- }
-
- inline bool get_disable_positive_acks() const
- {
- return disable_positive_acks_;
- }
-
- void updateTimes(
- const WriterTimes& times);
- void add_flow_controller(
- std::unique_ptr<FlowController> controller) override;
- SequenceNumber_t next_sequence_number() const;
-
- bool send_periodic_heartbeat(
- bool final = false,
- bool liveliness = false);
-
- void send_heartbeat_to_nts(
- ReaderProxy& remoteReaderProxy,
- bool liveliness = false);
- void perform_nack_response();
- void perform_nack_supression(
- const GUID_t& reader_guid);
-
- bool process_acknack(
- const GUID_t& writer_guid,
- const GUID_t& reader_guid,
- uint32_t ack_count,
- const SequenceNumberSet_t& sn_set,
- bool final_flag,
- bool& result) override;
-
- virtual bool process_nack_frag(
- const GUID_t& writer_guid,
- const GUID_t& reader_guid,
- uint32_t ack_count,
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t fragments_state,
- bool& result) override;
- private:
- void update_reader_info(
- bool create_sender_resources);
- void send_heartbeat_piggyback_nts_(
- ReaderProxy* reader,
- RTPSMessageGroup& message_group,
- uint32_t& last_bytes_processed);
- void send_heartbeat_nts_(
- size_t number_of_readers,
- RTPSMessageGroup& message_group,
- bool final,
- bool liveliness = false);
- void check_acked_status();
-
- bool ack_timer_expired();
- void send_heartbeat_to_all_readers();
- void send_changes_separatedly(
- SequenceNumber_t max_sequence,
- bool& activateHeartbeatPeriod);
- void send_all_intraprocess_changes(
- SequenceNumber_t max_sequence);
- void send_all_unsent_changes(
- SequenceNumber_t max_sequence,
- bool& activateHeartbeatPeriod);
- void send_unsent_changes_with_flow_control(
- SequenceNumber_t max_sequence,
- bool& activateHeartbeatPeriod);
- bool send_hole_gaps_to_group(
- RTPSMessageGroup& group);
-
- bool disable_heartbeat_piggyback_;
-
- bool disable_positive_acks_;
-
- std::chrono::duration<double, std::ratio<1, 1000000> > keep_duration_us_;
-
- SequenceNumber_t last_sequence_number_;
-
- SequenceNumber_t biggest_removed_sequence_number_;
- const uint32_t sendBufferSize_;
- int32_t currentUsageSendBufferSize_;
- std::vector<std::unique_ptr<FlowController> > m_controllers;
- bool there_are_remote_readers_ = false;
- bool there_are_local_readers_ = false;
- StatefulWriter& operator =(
- const StatefulWriter&) = delete;
- };
- }
- }
- }
- #endif
- #endif
|