Change in osmo-trx[master]: transceiver: get rid of the ctrl thread

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

Hoernchen gerrit-no-reply at lists.osmocom.org
Sun Apr 12 01:08:46 UTC 2020


Hoernchen has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmo-trx/+/17805 )


Change subject: transceiver: get rid of the ctrl thread
......................................................................

transceiver: get rid of the ctrl thread

There is no need to have n threads handle n ctrl sockets, since they all
will immediately respond to commands, so handle them from the existing
main osmo select loop.

Care must be taken to ensure that calls from within the command handler
do not block, or at least don't block too long, which currently is the
case.

Change-Id: I642a34451e1825eafecf71a902df916ccee7944c
---
M Transceiver52M/Transceiver.cpp
M Transceiver52M/Transceiver.h
M Transceiver52M/osmo-trx.cpp
3 files changed, 155 insertions(+), 94 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/osmo-trx refs/changes/05/17805/1

diff --git a/Transceiver52M/Transceiver.cpp b/Transceiver52M/Transceiver.cpp
index dc218d7..da8db75 100644
--- a/Transceiver52M/Transceiver.cpp
+++ b/Transceiver52M/Transceiver.cpp
@@ -45,6 +45,8 @@
 
 using namespace GSM;
 
+Transceiver *transceiver;
+
 #define USB_LATENCY_INTRVL		10,0
 
 /* Number of running values use in noise average */
@@ -151,20 +153,28 @@
     close(mClockSocket);
 
   for (size_t i = 0; i < mChans; i++) {
-    if (mControlServiceLoopThreads[i]) {
-      mControlServiceLoopThreads[i]->cancel();
-      mControlServiceLoopThreads[i]->join();
-      delete mControlServiceLoopThreads[i];
-    }
-
     mTxPriorityQueues[i].clear();
-    if (mCtrlSockets[i] >= 0)
-      close(mCtrlSockets[i]);
     if (mDataSockets[i] >= 0)
       close(mDataSockets[i]);
   }
 }
 
+int Transceiver::ctrl_sock_cb(struct osmo_fd *bfd, unsigned int flags)
+{
+	int rc = 0;
+	int chan = static_cast<int>(reinterpret_cast<uintptr_t>(bfd->data));
+
+	if (flags & BSC_FD_READ)
+		rc = driveControl(bfd, chan);
+	if (rc < 0)
+		return rc;
+
+	if (flags & BSC_FD_WRITE)
+		rc = ctrl_sock_write(bfd, chan);
+
+	return rc;
+}
+
 /*
  * Initialize transceiver
  *
@@ -193,8 +203,7 @@
   mEdge = edge;
 
   mDataSockets.resize(mChans, -1);
-  mCtrlSockets.resize(mChans, -1);
-  mControlServiceLoopThreads.resize(mChans);
+  mCtrlSockets.resize(mChans);
   mTxPriorityQueueServiceLoopThreads.resize(mChans);
   mRxServiceLoopThreads.resize(mChans);
 
@@ -221,19 +230,27 @@
     d_srcport = mBasePort + 2 * i + 2;
     d_dstport = mBasePort + 2 * i + 102;
 
-    mCtrlSockets[i] = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
+    int rv = osmo_sock_init2_ofd(&mCtrlSockets[i].conn_bfd, AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
                                       mLocalAddr.c_str(), c_srcport,
                                       mRemoteAddr.c_str(), c_dstport,
 				      OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
-    if (mCtrlSockets[i] < 0)
+    if (rv < 0)
       return false;
 
+    mCtrlSockets[i].conn_bfd.cb = ctrl_sock_cb;
+    mCtrlSockets[i].conn_bfd.data = reinterpret_cast<void*>(i);
+
     mDataSockets[i] = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
                                       mLocalAddr.c_str(), d_srcport,
                                       mRemoteAddr.c_str(), d_dstport,
 				      OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
-    if (mCtrlSockets[i] < 0)
+    if (mDataSockets[i] < 0)
       return false;
+
+    if (i && filler == FILLER_DUMMY)
+      filler = FILLER_ZERO;
+
+    mStates[i].init(filler, mSPSTx, txFullScale, rtsc, rach_delay);
   }
 
   /* Randomize the central clock */
@@ -243,19 +260,10 @@
   mLastClockUpdateTime = startTime;
   mLatencyUpdateTime = startTime;
 
-  /* Start control threads */
   for (size_t i = 0; i < mChans; i++) {
-    TrxChanThParams *params = (TrxChanThParams *)malloc(sizeof(struct TrxChanThParams));
-    params->trx = this;
-    params->num = i;
-    mControlServiceLoopThreads[i] = new Thread(stackSize);
-    mControlServiceLoopThreads[i]->start((void * (*)(void*))
-                                 ControlServiceLoopAdapter, (void*) params);
-
-    if (i && filler == FILLER_DUMMY)
-      filler = FILLER_ZERO;
-
-    mStates[i].init(filler, mSPSTx, txFullScale, rtsc, rach_delay);
+     if (i && filler == FILLER_DUMMY)
+        filler = FILLER_ZERO;
+     mStates[i].init(filler, mSPSTx, txFullScale, rtsc, rach_delay);
   }
 
   return true;
@@ -719,8 +727,6 @@
 }
 
 
-#define MAX_PACKET_LENGTH 100
-
 /**
  * Matches a buffer with a command.
  * @param  buf    a buffer to look command in
@@ -750,19 +756,72 @@
   return true;
 }
 
-bool Transceiver::driveControl(size_t chan)
+int Transceiver::ctrl_sock_send(ctrl_msg& m, int chan)
 {
-  char buffer[MAX_PACKET_LENGTH + 1];
-  char response[MAX_PACKET_LENGTH + 1];
+	struct osmo_fd *conn_bfd;
+	conn_bfd = &transceiver->mCtrlSockets[chan].conn_bfd;
+
+	if (conn_bfd->fd <= 0) {
+		return -EIO;
+	}
+
+	transceiver->mCtrlSockets[chan].txmsgqueue.push_back(m);
+	conn_bfd->when |= BSC_FD_WRITE;
+
+	return 0;
+}
+
+int Transceiver::ctrl_sock_write(struct osmo_fd *bfd, int chan)
+{
+	int rc;
+
+
+	while (transceiver->mCtrlSockets[chan].txmsgqueue.size()) {
+		const ctrl_msg m = transceiver->mCtrlSockets[chan].txmsgqueue.front();
+
+		bfd->when &= ~BSC_FD_WRITE;
+
+		/* try to send it over the socket */
+		rc = write(bfd->fd, m.data, strlen(m.data) + 1);
+		if (rc == 0)
+			goto close;
+		if (rc < 0) {
+			if (errno == EAGAIN) {
+				bfd->when |= BSC_FD_WRITE;
+				break;
+			}
+			goto close;
+		}
+
+		transceiver->mCtrlSockets[chan].txmsgqueue.pop_front();
+	}
+	return 0;
+
+close:
+	LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets write(" << transceiver->mCtrlSockets[chan].conn_bfd.fd << ") failed: " << rc;
+	osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
+
+	return -1;
+}
+
+bool Transceiver::driveControl(struct osmo_fd *bfd, int chan)
+{
+  ctrl_msg cmd_received;
+  ctrl_msg cmd_to_send;
+  char *buffer = cmd_received.data;
+  char *response = cmd_to_send.data;
   char *command, *params;
   int msgLen;
 
   /* Attempt to read from control socket */
-  msgLen = read(mCtrlSockets[chan], buffer, MAX_PACKET_LENGTH);
+  msgLen = read(bfd->fd, buffer, sizeof(cmd_received.data));
   if (msgLen <= 0) {
-    LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets read(" << mCtrlSockets[chan] << ") failed: " << msgLen;
-    return false;
+	  LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets read(" << transceiver->mCtrlSockets[chan].conn_bfd.fd << ") failed: " << msgLen;
+	  osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
+	  return -EIO;
   }
+  if (msgLen < 0 && errno == EAGAIN)
+	  return false; /* Try again later */
 
   /* Zero-terminate received string */
   buffer[msgLen] = '\0';
@@ -778,16 +837,16 @@
   LOGCHAN(chan, DTRXCTRL, INFO) << "command is '" << command << "'";
 
   if (match_cmd(command, "POWEROFF", NULL)) {
-    stop();
+    transceiver->stop();
     sprintf(response,"RSP POWEROFF 0");
   } else if (match_cmd(command, "POWERON", NULL)) {
-    if (!start()) {
+    if (!transceiver->start()) {
       sprintf(response,"RSP POWERON 1");
     } else {
       sprintf(response,"RSP POWERON 0");
       for (int i = 0; i < 8; i++) {
         for (int j = 0; j < 8; j++)
-          mHandover[i][j] = false;
+          transceiver->mHandover[i][j] = false;
       }
     }
   } else if (match_cmd(command, "HANDOVER", &params)) {
@@ -796,7 +855,7 @@
     if (ts > 7 || ss > 7) {
       sprintf(response, "RSP HANDOVER 1 %u %u", ts, ss);
     } else {
-      mHandover[ts][ss] = true;
+      transceiver->mHandover[ts][ss] = true;
       sprintf(response, "RSP HANDOVER 0 %u %u", ts, ss);
     }
   } else if (match_cmd(command, "NOHANDOVER", &params)) {
@@ -805,32 +864,32 @@
     if (ts > 7 || ss > 7) {
       sprintf(response, "RSP NOHANDOVER 1 %u %u", ts, ss);
     } else {
-      mHandover[ts][ss] = false;
+      transceiver->mHandover[ts][ss] = false;
       sprintf(response, "RSP NOHANDOVER 0 %u %u", ts, ss);
     }
   } else if (match_cmd(command, "SETMAXDLY", &params)) {
     //set expected maximum time-of-arrival
     int maxDelay;
     sscanf(params, "%d", &maxDelay);
-    mMaxExpectedDelayAB = maxDelay; // 1 GSM symbol is approx. 1 km
+    transceiver->mMaxExpectedDelayAB = maxDelay; // 1 GSM symbol is approx. 1 km
     sprintf(response,"RSP SETMAXDLY 0 %d",maxDelay);
   } else if (match_cmd(command, "SETMAXDLYNB", &params)) {
     //set expected maximum time-of-arrival
     int maxDelay;
     sscanf(params, "%d", &maxDelay);
-    mMaxExpectedDelayNB = maxDelay; // 1 GSM symbol is approx. 1 km
+    transceiver->mMaxExpectedDelayNB = maxDelay; // 1 GSM symbol is approx. 1 km
     sprintf(response,"RSP SETMAXDLYNB 0 %d",maxDelay);
   } else if (match_cmd(command, "SETRXGAIN", &params)) {
     //set expected maximum time-of-arrival
     int newGain;
     sscanf(params, "%d", &newGain);
-    newGain = mRadioInterface->setRxGain(newGain, chan);
+    newGain =transceiver-> mRadioInterface->setRxGain(newGain, chan);
     sprintf(response,"RSP SETRXGAIN 0 %d",newGain);
   } else if (match_cmd(command, "NOISELEV", NULL)) {
-    if (mOn) {
-      float lev = mStates[chan].mNoiseLev;
+    if (transceiver->mOn) {
+      float lev = transceiver->mStates[chan].mNoiseLev;
       sprintf(response,"RSP NOISELEV 0 %d",
-              (int) round(20.0 * log10(rxFullScale / lev)));
+              (int) round(20.0 * log10(transceiver->rxFullScale / lev)));
     }
     else {
       sprintf(response,"RSP NOISELEV 1  0");
@@ -838,22 +897,22 @@
   } else if (match_cmd(command, "SETPOWER", &params)) {
     int power;
     sscanf(params, "%d", &power);
-    power = mRadioInterface->setPowerAttenuation(power, chan);
-    mStates[chan].mPower = power;
+    power = transceiver->mRadioInterface->setPowerAttenuation(power, chan);
+    transceiver->mStates[chan].mPower = power;
     sprintf(response, "RSP SETPOWER 0 %d", power);
   } else if (match_cmd(command, "ADJPOWER", &params)) {
     int power, step;
     sscanf(params, "%d", &step);
-    power = mStates[chan].mPower + step;
-    power = mRadioInterface->setPowerAttenuation(power, chan);
-    mStates[chan].mPower = power;
+    power = transceiver->mStates[chan].mPower + step;
+    power = transceiver->mRadioInterface->setPowerAttenuation(power, chan);
+    transceiver->mStates[chan].mPower = power;
     sprintf(response, "RSP ADJPOWER 0 %d", power);
   } else if (match_cmd(command, "RXTUNE", &params)) {
     // tune receiver
     int freqKhz;
     sscanf(params, "%d", &freqKhz);
-    mRxFreq = freqKhz * 1e3;
-    if (!mRadioInterface->tuneRx(mRxFreq, chan)) {
+    transceiver->mRxFreq = freqKhz * 1e3;
+    if (!transceiver->mRadioInterface->tuneRx(transceiver->mRxFreq, chan)) {
        LOGCHAN(chan, DTRXCTRL, FATAL) << "RX failed to tune";
        sprintf(response,"RSP RXTUNE 1 %d",freqKhz);
     }
@@ -863,8 +922,8 @@
     // tune txmtr
     int freqKhz;
     sscanf(params, "%d", &freqKhz);
-    mTxFreq = freqKhz * 1e3;
-    if (!mRadioInterface->tuneTx(mTxFreq, chan)) {
+   transceiver-> mTxFreq = freqKhz * 1e3;
+    if (!transceiver->mRadioInterface->tuneTx(transceiver->mTxFreq, chan)) {
        LOGCHAN(chan, DTRXCTRL, FATAL) << "TX failed to tune";
        sprintf(response,"RSP TXTUNE 1 %d",freqKhz);
     }
@@ -877,8 +936,8 @@
     if (TSC > 7) {
       sprintf(response, "RSP SETTSC 1 %d", TSC);
     } else {
-      LOGC(DTRXCTRL, NOTICE) << "Changing TSC from " << mTSC << " to " << TSC;
-      mTSC = TSC;
+      LOGC(DTRXCTRL, NOTICE) << "Changing TSC from " << transceiver->mTSC << " to " << TSC;
+      transceiver->mTSC = TSC;
       sprintf(response,"RSP SETTSC 0 %d", TSC);
     }
   } else if (match_cmd(command, "SETSLOT", &params)) {
@@ -891,8 +950,8 @@
       sprintf(response,"RSP SETSLOT 1 %d %d",timeslot,corrCode);
       return true;
     }
-    mStates[chan].chanType[timeslot] = (ChannelCombination) corrCode;
-    setModulus(timeslot, chan);
+    transceiver->mStates[chan].chanType[timeslot] = (ChannelCombination) corrCode;
+    transceiver->setModulus(timeslot, chan);
     sprintf(response,"RSP SETSLOT 0 %d %d",timeslot,corrCode);
   } else if (match_cmd(command, "SETFORMAT", &params)) {
     // set TRXD protocol version
@@ -905,7 +964,7 @@
       sprintf(response, "RSP SETFORMAT %u %u", TRX_DATA_FORMAT_VER, version_recv);
     } else {
       LOGCHAN(chan, DTRXCTRL, NOTICE) << "switching to TRXD version " << version_recv;
-      mVersionTRXD[chan] = version_recv;
+      transceiver->mVersionTRXD[chan] = version_recv;
       sprintf(response, "RSP SETFORMAT %u %u", version_recv, version_recv);
     }
   } else if (match_cmd(command, "_SETBURSTTODISKMASK", &params)) {
@@ -913,7 +972,7 @@
     // set a mask which bursts to dump to disk
     int mask;
     sscanf(params, "%d", &mask);
-    mWriteBurstToDiskMask = mask;
+    transceiver->mWriteBurstToDiskMask = mask;
     sprintf(response,"RSP _SETBURSTTODISKMASK 0 %d",mask);
   } else {
     LOGCHAN(chan, DTRXCTRL, NOTICE) << "bogus command " << command << " on control interface.";
@@ -921,9 +980,9 @@
   }
 
   LOGCHAN(chan, DTRXCTRL, INFO) << "response is '" << response << "'";
-  msgLen = write(mCtrlSockets[chan], response, strlen(response) + 1);
+  msgLen = transceiver->ctrl_sock_send(cmd_to_send, chan);
   if (msgLen <= 0) {
-    LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets write(" << mCtrlSockets[chan] << ") failed: " << msgLen;
+    LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets write(" << transceiver->mCtrlSockets[chan].conn_bfd.fd << ") failed: " << msgLen;
     return false;
   }
   return true;
@@ -940,7 +999,7 @@
   // check data socket
   msgLen = read(mDataSockets[chan], buffer, sizeof(buffer));
   if (msgLen <= 0) {
-    LOGCHAN(chan, DTRXDDL, NOTICE) << "mDataSockets read(" << mCtrlSockets[chan] << ") failed: " << msgLen;
+    LOGCHAN(chan, DTRXDDL, NOTICE) << "mDataSockets read(" << mDataSockets[chan] << ") failed: " << msgLen;
     return false;
   }
 
@@ -1179,28 +1238,6 @@
   return NULL;
 }
 
-void *ControlServiceLoopAdapter(TrxChanThParams *params)
-{
-  char thread_name[16];
-  Transceiver *trx = params->trx;
-  size_t num = params->num;
-
-  free(params);
-
-  snprintf(thread_name, 16, "CtrlService%zu", num);
-  set_selfthread_name(thread_name);
-
-  while (1) {
-    if (!trx->driveControl(num)) {
-      LOGCHAN(num, DTRXCTRL, FATAL) << "Something went wrong in thread " << thread_name << ", requesting stop";
-      osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
-      break;
-    }
-    pthread_testcancel();
-  }
-  return NULL;
-}
-
 void *TxUpperLoopAdapter(TrxChanThParams *params)
 {
   char thread_name[16];
diff --git a/Transceiver52M/Transceiver.h b/Transceiver52M/Transceiver.h
index ed063dd..76416c3 100644
--- a/Transceiver52M/Transceiver.h
+++ b/Transceiver52M/Transceiver.h
@@ -33,11 +33,14 @@
 
 extern "C" {
 #include <osmocom/core/signal.h>
+#include <osmocom/core/select.h>
 #include "config_defs.h"
 }
 
 class Transceiver;
 
+extern Transceiver *transceiver;
+
 /** Channel descriptor for transceiver object and channel number pair */
 struct TrxChanThParams {
 	Transceiver *trx;
@@ -142,12 +145,34 @@
   } ChannelCombination;
 
 private:
+
+struct ctrl_msg {
+  char data[101];
+  ctrl_msg() {};
+  ctrl_msg(char* src) {
+	  strncpy(data,src,99);
+	  data[100] = 0;
+  }
+};
+
+struct ctrl_sock_state {
+  osmo_fd conn_bfd;
+  std::deque<ctrl_msg> txmsgqueue;
+  ~ctrl_sock_state() {
+      if(conn_bfd.fd > 0) {
+          close(conn_bfd.fd);
+          conn_bfd.fd = -1;
+          osmo_fd_unregister(&conn_bfd);
+      }
+  }
+};
+
   int mBasePort;
   std::string mLocalAddr;
   std::string mRemoteAddr;
 
   std::vector<int> mDataSockets;  ///< socket for writing to/reading from GSM core
-  std::vector<int> mCtrlSockets;  ///< socket for writing/reading control commands from GSM core
+  std::vector<ctrl_sock_state> mCtrlSockets;  ///< socket for writing/reading control commands from GSM core
   int mClockSocket;               ///< socket for writing clock updates to GSM core
 
   std::vector<VectorQueue> mTxPriorityQueues;   ///< priority queue of transmit bursts received from GSM core
@@ -156,7 +181,6 @@
   std::vector<Thread *> mRxServiceLoopThreads;  ///< thread to pull bursts into receive FIFO
   Thread *mRxLowerLoopThread;                   ///< thread to pull bursts into receive FIFO
   Thread *mTxLowerLoopThread;                   ///< thread to push bursts into transmit FIFO
-  std::vector<Thread *> mControlServiceLoopThreads;         ///< thread to process control messages from GSM core
   std::vector<Thread *> mTxPriorityQueueServiceLoopThreads; ///< thread to process transmit bursts from GSM core
 
   GSM::Time mTransmitLatency;             ///< latency between basestation clock and transmit deadline clock
@@ -193,6 +217,12 @@
   /** send messages over the clock socket */
   bool writeClockInterface(void);
 
+  static int ctrl_sock_cb(struct osmo_fd *bfd, unsigned int flags);
+  static int ctrl_sock_write(struct osmo_fd *bfd, int chan);
+  static int ctrl_sock_send(ctrl_msg& m, int chan);
+  /** drive handling of control messages from GSM core */
+  static bool driveControl(struct osmo_fd *bfd, int chan);
+
   int mSPSTx;                          ///< number of samples per Tx symbol
   int mSPSRx;                          ///< number of samples per Rx symbol
   size_t mChans;
@@ -229,9 +259,6 @@
   /** drive transmission of GSM bursts */
   void driveTxFIFO();
 
-  /** drive handling of control messages from GSM core */
-  bool driveControl(size_t chan);
-
   /**
     drive modulation and sorting of GSM bursts from GSM core
     @return true if a burst was transferred successfully
@@ -242,7 +269,7 @@
   friend void *TxUpperLoopAdapter(TrxChanThParams *params);
   friend void *RxLowerLoopAdapter(Transceiver *transceiver);
   friend void *TxLowerLoopAdapter(Transceiver *transceiver);
-  friend void *ControlServiceLoopAdapter(TrxChanThParams *params);
+
 
 
   void reset();
@@ -256,8 +283,5 @@
 void *RxLowerLoopAdapter(Transceiver *transceiver);
 void *TxLowerLoopAdapter(Transceiver *transceiver);
 
-/** control message handler thread loop */
-void *ControlServiceLoopAdapter(TrxChanThParams *params);
-
 /** transmit queueing thread loop */
 void *TxUpperLoopAdapter(TrxChanThParams *params);
diff --git a/Transceiver52M/osmo-trx.cpp b/Transceiver52M/osmo-trx.cpp
index 0ad60ef..de504c6 100644
--- a/Transceiver52M/osmo-trx.cpp
+++ b/Transceiver52M/osmo-trx.cpp
@@ -79,7 +79,7 @@
 
 static RadioDevice *usrp;
 static RadioInterface *radio;
-static Transceiver *transceiver;
+
 
 /* Create radio interface
  *     The interface consists of sample rate changes, frequency shifts,

-- 
To view, visit https://gerrit.osmocom.org/c/osmo-trx/+/17805
To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings

Gerrit-Project: osmo-trx
Gerrit-Branch: master
Gerrit-Change-Id: I642a34451e1825eafecf71a902df916ccee7944c
Gerrit-Change-Number: 17805
Gerrit-PatchSet: 1
Gerrit-Owner: Hoernchen <ewild at sysmocom.de>
Gerrit-MessageType: newchange
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20200412/3781e1ed/attachment.htm>


More information about the gerrit-log mailing list