StatelessWriter.h 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file StatelessWriter.h
  16. */
  17. #ifndef _FASTDDS_RTPS_STATELESSWRITER_H_
  18. #define _FASTDDS_RTPS_STATELESSWRITER_H_
  19. #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
  20. #include <fastdds/rtps/writer/RTPSWriter.h>
  21. #include <fastdds/rtps/writer/ReaderLocator.h>
  22. #include <fastdds/rtps/common/Time_t.h>
  23. #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
  24. #include <list>
  25. namespace eprosima {
  26. namespace fastrtps {
  27. namespace rtps {
  28. /**
  29. * Class StatelessWriter, specialization of RTPSWriter that manages writers that don't keep state of the matched readers.
  30. * @ingroup WRITER_MODULE
  31. */
  32. class StatelessWriter : public RTPSWriter
  33. {
  34. friend class RTPSParticipantImpl;
  35. protected:
  36. StatelessWriter(
  37. RTPSParticipantImpl* participant,
  38. GUID_t& guid,
  39. WriterAttributes& attributes,
  40. WriterHistory* history,
  41. WriterListener* listener = nullptr);
  42. public:
  43. virtual ~StatelessWriter();
  44. /**
  45. * Add a specific change to all ReaderLocators.
  46. * @param change Pointer to the change.
  47. * @param max_blocking_time
  48. */
  49. void unsent_change_added_to_history(
  50. CacheChange_t* change,
  51. const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
  52. /**
  53. * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
  54. * @param change Pointer to the change that is going to be removed.
  55. * @return True if removed correctly.
  56. */
  57. bool change_removed_by_history(
  58. CacheChange_t* change) override;
  59. /**
  60. * Add a matched reader.
  61. * @param data Pointer to the ReaderProxyData object added.
  62. * @return True if added.
  63. */
  64. bool matched_reader_add(
  65. const ReaderProxyData& data) override;
  66. /**
  67. * Remove a matched reader.
  68. * @param reader_guid GUID of the reader to remove.
  69. * @return True if removed.
  70. */
  71. bool matched_reader_remove(
  72. const GUID_t& reader_guid) override;
  73. /**
  74. * Tells us if a specific Reader is matched against this writer
  75. * @param reader_guid GUID of the reader to check.
  76. * @return True if it was matched.
  77. */
  78. bool matched_reader_is_matched(
  79. const GUID_t& reader_guid) override;
  80. /**
  81. * Method to indicate that there are changes not sent in some of all ReaderProxy.
  82. */
  83. void send_any_unsent_changes() override;
  84. /**
  85. * Update the Attributes of the Writer.
  86. * @param att New attributes
  87. */
  88. void updateAttributes(
  89. const WriterAttributes& att) override
  90. {
  91. (void)att;
  92. //FOR NOW THERE IS NOTHING TO UPDATE.
  93. }
  94. bool set_fixed_locators(
  95. const LocatorList_t& locator_list);
  96. void update_unsent_changes(
  97. const SequenceNumber_t& seq_num,
  98. const FragmentNumber_t& frag_num);
  99. //!Reset the unsent changes.
  100. void unsent_changes_reset();
  101. bool is_acked_by_all(
  102. const CacheChange_t* change) const override;
  103. bool try_remove_change(
  104. const std::chrono::steady_clock::time_point&,
  105. std::unique_lock<RecursiveTimedMutex>&) override;
  106. void add_flow_controller(
  107. std::unique_ptr<FlowController> controller) override;
  108. /**
  109. * Send a message through this interface.
  110. *
  111. * @param message Pointer to the buffer with the message already serialized.
  112. * @param max_blocking_time_point Future timepoint where blocking send should end.
  113. */
  114. bool send(
  115. CDRMessage_t* message,
  116. std::chrono::steady_clock::time_point& max_blocking_time_point) const override;
  117. private:
  118. void get_builtin_guid();
  119. bool has_builtin_guid();
  120. void update_reader_info(
  121. bool create_sender_resources);
  122. bool intraprocess_delivery(
  123. CacheChange_t* change,
  124. ReaderLocator& reader_locator);
  125. void send_all_unsent_changes();
  126. void send_unsent_changes_with_flow_control();
  127. bool is_inline_qos_expected_ = false;
  128. LocatorList_t fixed_locators_;
  129. ResourceLimitedVector<ReaderLocator> matched_readers_;
  130. ResourceLimitedVector<GUID_t> late_joiner_guids_;
  131. SequenceNumber_t first_seq_for_all_readers_;
  132. bool ignore_fixed_locators_ = false;
  133. ResourceLimitedVector<ChangeForReader_t, std::true_type> unsent_changes_;
  134. std::vector<std::unique_ptr<FlowController> > flow_controllers_;
  135. uint64_t last_intraprocess_sequence_number_;
  136. bool there_are_remote_readers_ = false;
  137. };
  138. } /* namespace rtps */
  139. } /* namespace fastrtps */
  140. } /* namespace eprosima */
  141. #endif
  142. #endif /* _FASTDDS_RTPS_STATELESSWRITER_H_ */