Hoernchen has submitted this change. ( https://gerrit.osmocom.org/c/osmo-trx/+/32954 )
Change subject: ms: rearrange code to allow clean exits ......................................................................
ms: rearrange code to allow clean exits
This allows gracefully terminating the application by introducing queue timeouts.
Change-Id: I0b8deebc63cf4d936666fd68e1666d1917e89a5d --- M Transceiver52M/ms/bladerf_specific.h M Transceiver52M/ms/itrq.h M Transceiver52M/ms/ms.cpp M Transceiver52M/ms/ms.h M Transceiver52M/ms/ms_rx_lower.cpp M Transceiver52M/ms/ms_upper.cpp M Transceiver52M/ms/ms_upper.h 7 files changed, 97 insertions(+), 39 deletions(-)
Approvals: Jenkins Builder: Verified pespin: Looks good to me, but someone else must approve laforge: Looks good to me, but someone else must approve Hoernchen: Looks good to me, approved
diff --git a/Transceiver52M/ms/bladerf_specific.h b/Transceiver52M/ms/bladerf_specific.h index 3dc4777..e32d77c 100644 --- a/Transceiver52M/ms/bladerf_specific.h +++ b/Transceiver52M/ms/bladerf_specific.h @@ -192,7 +192,7 @@ struct bladerf_stream *rx_stream; struct bladerf_stream *tx_stream; // using pkt2buf = blade_otw_buffer<2, blade_speed_buffer_type::SS>; - using tx_buf_q_type = spsc_cond<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>; + using tx_buf_q_type = spsc_cond_timeout<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>; const unsigned int rxFullScale, txFullScale; const int rxtxdelay;
diff --git a/Transceiver52M/ms/itrq.h b/Transceiver52M/ms/itrq.h index 1d9e217..69ff515 100644 --- a/Transceiver52M/ms/itrq.h +++ b/Transceiver52M/ms/itrq.h @@ -29,7 +29,58 @@
namespace spsc_detail { -template <bool block_read, bool block_write> class spsc_cond_detail { +template <bool block_read, bool block_write> +class spsc_cond_timeout_detail { + std::condition_variable cond_r, cond_w; + std::mutex lr, lw; + std::atomic_int r_flag, w_flag; + const int timeout_ms = 200; + + public: + explicit spsc_cond_timeout_detail() : r_flag(0), w_flag(0) + { + } + + ~spsc_cond_timeout_detail() + { + } + + ssize_t spsc_check_r() + { + std::unique_lockstd::mutex lk(lr); + if (cond_r.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return r_flag != 0; })) { + r_flag--; + return 1; + } else { + return 0; + } + } + ssize_t spsc_check_w() + { + std::unique_lockstd::mutex lk(lw); + if (cond_w.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return w_flag != 0; })) { + w_flag--; + return 1; + } else { + return 0; + } + } + void spsc_notify_r() + { + std::unique_lockstd::mutex lk(lr); + r_flag++; + cond_r.notify_one(); + } + void spsc_notify_w() + { + std::unique_lockstd::mutex lk(lw); + w_flag++; + cond_w.notify_one(); + } +}; + +template <bool block_read, bool block_write> +class spsc_cond_detail { std::condition_variable cond_r, cond_w; std::mutex lr, lw; std::atomic_int r_flag, w_flag; @@ -74,7 +125,8 @@ };
// originally designed for select loop integration -template <bool block_read, bool block_write> class spsc_efd_detail { +template <bool block_read, bool block_write> +class spsc_efd_detail { int efd_r, efd_w; /* eventfds used to block/notify readers/writers */
public: @@ -191,4 +243,7 @@ template <unsigned int SZ, typename ELEM, bool block_read, bool block_write> class spsc_evfd : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_efd_detail> {}; template <unsigned int SZ, typename ELEM, bool block_read, bool block_write> -class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {}; \ No newline at end of file +class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {}; +template <unsigned int SZ, typename ELEM, bool block_read, bool block_write> +class spsc_cond_timeout + : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_timeout_detail> {}; \ No newline at end of file diff --git a/Transceiver52M/ms/ms.cpp b/Transceiver52M/ms/ms.cpp index 2e91cae..e587c05 100644 --- a/Transceiver52M/ms/ms.cpp +++ b/Transceiver52M/ms/ms.cpp @@ -78,7 +78,7 @@ }; }
-void ms_trx::start() +void ms_trx::start_lower_ms() { if (stop_lower_threads_flag) return; diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h index efccffc..8ca9b02 100644 --- a/Transceiver52M/ms/ms.h +++ b/Transceiver52M/ms/ms.h @@ -117,7 +117,7 @@ }; };
-using rx_queue_t = spsc_cond<8 * NUM_RXQ_FRAMES, one_burst, true, false>; +using rx_queue_t = spsc_cond_timeout<8 * NUM_RXQ_FRAMES, one_burst, true, false>;
enum class SCH_STATE { SEARCHING, FOUND };
@@ -267,7 +267,7 @@ sched_params::target hw_target; single_thread_pool worker_thread;
- void start(); + void start_lower_ms(); std::atomic<bool> upper_is_ready; void set_upper_ready(bool is_ready);
diff --git a/Transceiver52M/ms/ms_rx_lower.cpp b/Transceiver52M/ms/ms_rx_lower.cpp index 4d6ce18..26ee131 100644 --- a/Transceiver52M/ms/ms_rx_lower.cpp +++ b/Transceiver52M/ms/ms_rx_lower.cpp @@ -142,10 +142,8 @@ memcpy(brst.sch_bits, sch_demod_bits, sizeof(sch_demod_bits)); }
- if (upper_is_ready) { // this is blocking, so only submit if there is a reader - only if upper exists! - while (!rxqueue.spsc_push(&brst)) - ; - } + while (upper_is_ready && !rxqueue.spsc_push(&brst)) + ;
if (do_auto_gain) maybe_update_gain(brst); diff --git a/Transceiver52M/ms/ms_upper.cpp b/Transceiver52M/ms/ms_upper.cpp index a10d542..4b2f919 100644 --- a/Transceiver52M/ms/ms_upper.cpp +++ b/Transceiver52M/ms/ms_upper.cpp @@ -80,37 +80,16 @@ while (!g_exit_flag) { driveControl(); } - std::cerr << "exit control!" << std::endl; + std::cerr << "exit U control!" << std::endl; }); - msleep(1); thr_tx = std::thread([this] { set_name_aff_sched(sched_params::thread_names::U_TX); while (!g_exit_flag) { driveTx(); } - std::cerr << "exit tx U!" << std::endl; + std::cerr << "exit U tx!" << std::endl; });
- // atomic ensures data is not written to q until loop reads - start_lower_ms(); - - set_name_aff_sched(sched_params::thread_names::U_RX); - while (!g_exit_flag) { - // set_upper_ready(true) needs to happen during cmd handling: - // the main loop is driven by rx, so unless rx is on AND transceiver is on we get stuck.. - driveReceiveFIFO(); - osmo_select_main(1); - - trxcon_phyif_rsp r; - if (cmdq_from_phy.spsc_pop(&r)) { - DBGLG() << "HAVE RESP:" << r.type << std::endl; - trxcon_phyif_handle_rsp(g_trxcon, &r); - } - } - set_upper_ready(false); - std::cerr << "exit rx U!" << std::endl; - mOn = false; - #ifdef LSANDEBUG std::thread([this] { set_name_aff_sched(sched_params::thread_names::LEAKCHECK); @@ -123,9 +102,23 @@ #endif }
-void upper_trx::start_lower_ms() +void upper_trx::main_loop() { - ms_trx::start(); + set_name_aff_sched(sched_params::thread_names::U_RX); + set_upper_ready(true); + while (!g_exit_flag) { + driveReceiveFIFO(); + osmo_select_main(1); + + trxcon_phyif_rsp r; + if (cmdq_from_phy.spsc_pop(&r)) { + DBGLG() << "HAVE RESP:" << r.type << std::endl; + trxcon_phyif_handle_rsp(g_trxcon, &r); + } + } + set_upper_ready(false); + std::cerr << "exit U rx!" << std::endl; + mOn = false; }
// signalvector is owning despite claiming not to, but we can pretend, too.. @@ -346,7 +339,7 @@ case TRXCON_PHYIF_CMDT_POWERON: if (!mOn) { mOn = true; - set_upper_ready(true); + start_lower_ms(); } break; case TRXCON_PHYIF_CMDT_POWEROFF: @@ -430,7 +423,7 @@
// blocking, will return when global exit is requested trx->start_threads(); - + trx->main_loop(); trx->stop_threads(); trx->stop_upper_threads();
diff --git a/Transceiver52M/ms/ms_upper.h b/Transceiver52M/ms/ms_upper.h index bc9bd14..2362365 100644 --- a/Transceiver52M/ms/ms_upper.h +++ b/Transceiver52M/ms/ms_upper.h @@ -41,7 +41,7 @@
public: void start_threads(); - void start_lower_ms(); + void main_loop(); void stop_upper_threads();
upper_trx(){};