Hoernchen submitted this change.

View Change

Approvals: laforge: Looks good to me, but someone else must approve pespin: Looks good to me, but someone else must approve Hoernchen: Looks good to me, approved Jenkins Builder: Verified
clean up mutex, scopedlock, and signal classes

This also uncovers very interesting design decisions like the copying of
mutexes and condition vars depending on recursive locks that were
previously hidden by shady c function calls..
We have perfectly good c++11 versions for all of that.

While we're at it, also use the initialization list for the other (still
copy constructable) vectors, which cleans up the radio interfaces.

Change-Id: Idc9e3b1144c5b93f5dad2f8e0e30f1058477aa52
---
M CommonLibs/Interthread.h
M CommonLibs/Threads.cpp
M CommonLibs/Threads.h
M Transceiver52M/Transceiver.cpp
M Transceiver52M/Transceiver.h
M Transceiver52M/device/common/radioDevice.h
M Transceiver52M/device/ipc/IPCDevice.cpp
M Transceiver52M/radioBuffer.cpp
M Transceiver52M/radioInterface.cpp
M Transceiver52M/radioInterface.h
M Transceiver52M/radioInterfaceMulti.cpp
M Transceiver52M/radioInterfaceResamp.cpp
M tests/CommonLibs/InterthreadTest.cpp
13 files changed, 96 insertions(+), 237 deletions(-)

diff --git a/CommonLibs/Interthread.h b/CommonLibs/Interthread.h
index 881e1a8..207ada2 100644
--- a/CommonLibs/Interthread.h
+++ b/CommonLibs/Interthread.h
@@ -517,7 +517,7 @@
@param timeout The blocking timeout in ms.
@return Pointer at key or NULL on timeout.
*/
- D* read(const K &key, unsigned timeout) const
+ D* read(const K &key, unsigned timeout)
{
if (timeout==0) return readNoBlock(key);
ScopedLock lock(mLock);
@@ -537,7 +537,7 @@
@param key The key to read from.
@return Pointer at key.
*/
- D* read(const K &key) const
+ D* read(const K &key)
{
ScopedLock lock(mLock);
typename Map::const_iterator iter = mMap.find(key);
diff --git a/CommonLibs/Threads.cpp b/CommonLibs/Threads.cpp
index b6750ab..377a1b0 100644
--- a/CommonLibs/Threads.cpp
+++ b/CommonLibs/Threads.cpp
@@ -43,71 +43,6 @@
#endif


-Mutex gStreamLock; ///< Global lock to control access to cout and cerr.
-
-void lockCout()
-{
- gStreamLock.lock();
- Timeval entryTime;
- cout << entryTime << " " << osmo_gettid() << ": ";
-}
-
-
-void unlockCout()
-{
- cout << dec << endl << flush;
- gStreamLock.unlock();
-}
-
-
-void lockCerr()
-{
- gStreamLock.lock();
- Timeval entryTime;
- cerr << entryTime << " " << osmo_gettid() << ": ";
-}
-
-void unlockCerr()
-{
- cerr << dec << endl << flush;
- gStreamLock.unlock();
-}
-
-
-
-
-
-
-
-Mutex::Mutex()
-{
- bool res;
- res = pthread_mutexattr_init(&mAttribs);
- assert(!res);
- res = pthread_mutexattr_settype(&mAttribs,PTHREAD_MUTEX_RECURSIVE);
- assert(!res);
- res = pthread_mutex_init(&mMutex,&mAttribs);
- assert(!res);
-}
-
-
-Mutex::~Mutex()
-{
- pthread_mutex_destroy(&mMutex);
- bool res = pthread_mutexattr_destroy(&mAttribs);
- assert(!res);
-}
-
-
-
-
-/** Block for the signal up to the cancellation timeout. */
-void Signal::wait(Mutex& wMutex, unsigned timeout) const
-{
- Timeval then(timeout);
- struct timespec waitTime = then.timespec();
- pthread_cond_timedwait(&mSignal,&wMutex.mMutex,&waitTime);
-}

void set_selfthread_name(const char *name)
{
diff --git a/CommonLibs/Threads.h b/CommonLibs/Threads.h
index 5ff137b..24fec00 100644
--- a/CommonLibs/Threads.h
+++ b/CommonLibs/Threads.h
@@ -28,143 +28,96 @@
#ifndef THREADS_H
#define THREADS_H

-#include "config.h"
-
+#include <chrono>
+#include <mutex>
+#include <condition_variable>
#include <pthread.h>
#include <iostream>
-#include <assert.h>
+#include <cassert>
#include <unistd.h>

+#include "config.h"
+#include "Timeval.h"
+
class Mutex;

-
-/**@name Multithreaded access for standard streams. */
-//@{
-
-/**@name Functions for gStreamLock. */
-//@{
-extern Mutex gStreamLock; ///< global lock for cout and cerr
-void lockCerr(); ///< call prior to writing cerr
-void unlockCerr(); ///< call after writing cerr
-void lockCout(); ///< call prior to writing cout
-void unlockCout(); ///< call after writing cout
-//@}
-
-/**@name Macros for standard messages. */
-//@{
-#define COUT(text) { lockCout(); std::cout << text; unlockCout(); }
-#define CERR(text) { lockCerr(); std::cerr << __FILE__ << ":" << __LINE__ << ": " << text; unlockCerr(); }
-#ifdef NDEBUG
-#define DCOUT(text) {}
-#define OBJDCOUT(text) {}
-#else
-#define DCOUT(text) { COUT(__FILE__ << ":" << __LINE__ << " " << text); }
-#define OBJDCOUT(text) { DCOUT(this << " " << text); }
-#endif
-//@}
-//@}
-
-
-
/**@defgroup C++ wrappers for pthread mechanisms. */
//@{

-/** A class for recursive mutexes based on pthread_mutex. */
+/** A class for recursive mutexes. */
class Mutex {
+ std::recursive_mutex m;

- private:
+ public:

- pthread_mutex_t mMutex;
- pthread_mutexattr_t mAttribs;
+ void lock() {
+ m.lock();
+ }

- public:
+ bool trylock() {
+ return m.try_lock();
+ }

- Mutex();
-
- ~Mutex();
-
- void lock() { pthread_mutex_lock(&mMutex); }
-
- bool trylock() { return pthread_mutex_trylock(&mMutex)==0; }
-
- void unlock() { pthread_mutex_unlock(&mMutex); }
+ void unlock() {
+ m.unlock();
+ }

friend class Signal;
-
};

-
class ScopedLock {
+ Mutex &mMutex;

- private:
- Mutex& mMutex;
-
- public:
- ScopedLock(Mutex& wMutex) :mMutex(wMutex) { mMutex.lock(); }
- ~ScopedLock() { mMutex.unlock(); }
-
+ public:
+ ScopedLock(Mutex &wMutex) : mMutex(wMutex) {
+ mMutex.lock();
+ }
+ ~ScopedLock() {
+ mMutex.unlock();
+ }
};

-
-
-
-/** A C++ interthread signal based on pthread condition variables. */
+/** A C++ interthread signal. */
class Signal {
+ /* any, because for some reason our mutex is recursive... */
+ std::condition_variable_any mSignal;

- private:
+ public:

- mutable pthread_cond_t mSignal;
+ void wait(Mutex &wMutex, unsigned timeout) {
+ mSignal.wait_for(wMutex.m, std::chrono::milliseconds(timeout));
+ }

- public:
+ void wait(Mutex &wMutex) {
+ mSignal.wait(wMutex.m);
+ }

- Signal() { int s = pthread_cond_init(&mSignal,NULL); assert(!s); }
+ void signal() {
+ mSignal.notify_one();
+ }

- ~Signal() { pthread_cond_destroy(&mSignal); }
-
- /**
- Block for the signal up to the cancellation timeout.
- Under Linux, spurious returns are possible.
- */
- void wait(Mutex& wMutex, unsigned timeout) const;
-
- /**
- Block for the signal.
- Under Linux, spurious returns are possible.
- */
- void wait(Mutex& wMutex) const
- { pthread_cond_wait(&mSignal,&wMutex.mMutex); }
-
- void signal() { pthread_cond_signal(&mSignal); }
-
- void broadcast() { pthread_cond_broadcast(&mSignal); }
-
+ void broadcast() {
+ mSignal.notify_all();
+ }
};

-
-
-#define START_THREAD(thread,function,argument) \
- thread.start((void *(*)(void*))function, (void*)argument);
-
void set_selfthread_name(const char *name);
void thread_enable_cancel(bool cancel);

/** A C++ wrapper for pthread threads. */
class Thread {
-
- private:
-
+ private:
pthread_t mThread;
pthread_attr_t mAttrib;
// FIXME -- Can this be reduced now?
size_t mStackSize;

-
- public:
-
+ public:
/** Create a thread in a non-running state. */
- Thread(size_t wStackSize = 0):mThread((pthread_t)0) {
- pthread_attr_init(&mAttrib); // (pat) moved this here.
- mStackSize=wStackSize;
+ Thread(size_t wStackSize = 0) : mThread((pthread_t)0)
+ {
+ pthread_attr_init(&mAttrib); // (pat) moved this here.
+ mStackSize = wStackSize;
}

/**
@@ -172,14 +125,17 @@
It should be stopped and joined.
*/
// (pat) If the Thread is destroyed without being started, then mAttrib is undefined. Oops.
- ~Thread() { pthread_attr_destroy(&mAttrib); }
-
+ ~Thread()
+ {
+ pthread_attr_destroy(&mAttrib);
+ }

/** Start the thread on a task. */
- void start(void *(*task)(void*), void *arg);
+ void start(void *(*task)(void *), void *arg);

/** Join a thread that will stop on its own. */
- void join() {
+ void join()
+ {
if (mThread) {
int s = pthread_join(mThread, NULL);
assert(!s);
@@ -187,7 +143,10 @@
}

/** Send cancellation to thread */
- void cancel() { pthread_cancel(mThread); }
+ void cancel()
+ {
+ pthread_cancel(mThread);
+ }
};

#ifdef HAVE_ATOMIC_OPS
diff --git a/Transceiver52M/Transceiver.cpp b/Transceiver52M/Transceiver.cpp
index cbe1e27..c9894f0 100644
--- a/Transceiver52M/Transceiver.cpp
+++ b/Transceiver52M/Transceiver.cpp
@@ -135,12 +135,13 @@
Transceiver::Transceiver(const struct trx_cfg *cfg,
GSM::Time wTransmitLatency,
RadioInterface *wRadioInterface)
- : cfg(cfg), mClockSocket(-1),
- mRxLowerLoopThread(nullptr), mTxLowerLoopThread(nullptr),
- mTransmitLatency(wTransmitLatency), mRadioInterface(wRadioInterface),
- mChans(cfg->num_chans), mOn(false), mForceClockInterface(false),
- mTxFreq(0.0), mRxFreq(0.0), mTSC(0), mMaxExpectedDelayAB(0),
- mMaxExpectedDelayNB(0), mWriteBurstToDiskMask(0)
+ : mChans(cfg->num_chans), cfg(cfg),
+ mCtrlSockets(mChans), mClockSocket(-1),
+ mTxPriorityQueues(mChans), mReceiveFIFO(mChans),
+ mRxServiceLoopThreads(mChans), mRxLowerLoopThread(nullptr), mTxLowerLoopThread(nullptr),
+ mTxPriorityQueueServiceLoopThreads(mChans), mTransmitLatency(wTransmitLatency), mRadioInterface(wRadioInterface),
+ mOn(false),mForceClockInterface(false), mTxFreq(0.0), mRxFreq(0.0), mTSC(0), mMaxExpectedDelayAB(0),
+ mMaxExpectedDelayNB(0), mWriteBurstToDiskMask(0), mVersionTRXD(mChans), mStates(mChans)
{
txFullScale = mRadioInterface->fullScaleInputValue();
rxFullScale = mRadioInterface->fullScaleOutputValue();
@@ -208,14 +209,7 @@
}

mDataSockets.resize(mChans, -1);
- mCtrlSockets.resize(mChans);
- mTxPriorityQueueServiceLoopThreads.resize(mChans);
- mRxServiceLoopThreads.resize(mChans);

- mTxPriorityQueues.resize(mChans);
- mReceiveFIFO.resize(mChans);
- mStates.resize(mChans);
- mVersionTRXD.resize(mChans);

/* Filler table retransmissions - support only on channel 0 */
if (cfg->filler == FILLER_DUMMY)
diff --git a/Transceiver52M/Transceiver.h b/Transceiver52M/Transceiver.h
index 0389e60..1860884 100644
--- a/Transceiver52M/Transceiver.h
+++ b/Transceiver52M/Transceiver.h
@@ -148,6 +148,7 @@
} ChannelCombination;

private:
+ size_t mChans;
struct ctrl_msg {
char data[101];
ctrl_msg() {};
@@ -218,7 +219,7 @@
/** drive handling of control messages from GSM core */
int ctrl_sock_handle_rx(int chan);

- size_t mChans;
+
bool mOn; ///< flag to indicate that transceiver is powered on
bool mForceClockInterface; ///< flag to indicate whether IND CLOCK shall be sent unconditionally after transceiver is started
bool mHandover[8][8]; ///< expect handover to the timeslot/subslot
diff --git a/Transceiver52M/device/common/radioDevice.h b/Transceiver52M/device/common/radioDevice.h
index 3f5da1f..404ef75 100644
--- a/Transceiver52M/device/common/radioDevice.h
+++ b/Transceiver52M/device/common/radioDevice.h
@@ -169,14 +169,13 @@
const std::vector<std::string>& tx_paths,
const std::vector<std::string>& rx_paths):
tx_sps(tx_sps), rx_sps(rx_sps), iface(type), chans(chan_num), lo_offset(offset),
- tx_paths(tx_paths), rx_paths(rx_paths)
+ tx_paths(tx_paths), rx_paths(rx_paths), m_ctr(chans)
{
if (iface == MULTI_ARFCN) {
LOGC(DDEV, INFO) << "Multi-ARFCN: "<< chan_num << " logical chans -> 1 physical chans";
chans = 1;
}

- m_ctr.resize(chans);
for (size_t i = 0; i < chans; i++) {
memset(&m_ctr[i], 0, sizeof(m_ctr[i]));
m_ctr[i].chan = i;
diff --git a/Transceiver52M/device/ipc/IPCDevice.cpp b/Transceiver52M/device/ipc/IPCDevice.cpp
index 2e40aa3..8bf4836 100644
--- a/Transceiver52M/device/ipc/IPCDevice.cpp
+++ b/Transceiver52M/device/ipc/IPCDevice.cpp
@@ -58,18 +58,12 @@

IPCDevice::IPCDevice(size_t tx_sps, size_t rx_sps, InterfaceType iface, size_t chan_num, double lo_offset,
const std::vector<std::string> &tx_paths, const std::vector<std::string> &rx_paths)
- : RadioDevice(tx_sps, rx_sps, iface, chan_num, lo_offset, tx_paths, rx_paths), tx_attenuation(),
- tmp_state(IPC_IF_MSG_GREETING_REQ), shm(NULL), shm_dec(0), started(false)
+ : RadioDevice(tx_sps, rx_sps, iface, chan_num, lo_offset, tx_paths, rx_paths), sk_chan_state(chans, ipc_per_trx_sock_state()),
+ tx_attenuation(), tmp_state(IPC_IF_MSG_GREETING_REQ), shm(NULL), shm_dec(0),
+ rx_buffers(chans), started(false), tx_gains(chans), rx_gains(chans)
{
LOGC(DDEV, INFO) << "creating IPC device...";

- rx_gains.resize(chans);
- tx_gains.resize(chans);
-
- rx_buffers.resize(chans);
-
- sk_chan_state.resize(chans, ipc_per_trx_sock_state());
-
/* Set up per-channel Rx timestamp based Ring buffers */
for (size_t i = 0; i < rx_buffers.size(); i++)
rx_buffers[i] = new smpl_buf(SAMPLE_BUF_SZ / sizeof(uint32_t));
diff --git a/Transceiver52M/radioBuffer.cpp b/Transceiver52M/radioBuffer.cpp
index 62f6553..ec868e5 100644
--- a/Transceiver52M/radioBuffer.cpp
+++ b/Transceiver52M/radioBuffer.cpp
@@ -28,7 +28,7 @@

RadioBuffer::RadioBuffer(size_t numSegments, size_t segmentLen,
size_t hLen, bool outDirection)
- : writeIndex(0), readIndex(0), availSamples(0)
+ : writeIndex(0), readIndex(0), availSamples(0), segments(numSegments)
{
if (!outDirection)
hLen = 0;
@@ -36,7 +36,6 @@
buffer = new float[2 * (hLen + numSegments * segmentLen)];
bufferLen = numSegments * segmentLen;

- segments.resize(numSegments);

for (size_t i = 0; i < numSegments; i++)
segments[i] = &buffer[2 * (hLen + i * segmentLen)];
diff --git a/Transceiver52M/radioInterface.cpp b/Transceiver52M/radioInterface.cpp
index 875245d..311af34 100644
--- a/Transceiver52M/radioInterface.cpp
+++ b/Transceiver52M/radioInterface.cpp
@@ -39,9 +39,10 @@
RadioInterface::RadioInterface(RadioDevice *wDevice, size_t tx_sps,
size_t rx_sps, size_t chans,
int wReceiveOffset, GSM::Time wStartTime)
- : mDevice(wDevice), mSPSTx(tx_sps), mSPSRx(rx_sps), mChans(chans),
- underrun(false), overrun(false), writeTimestamp(0), readTimestamp(0),
- receiveOffset(wReceiveOffset), mOn(false)
+ : mSPSTx(tx_sps), mSPSRx(rx_sps), mChans(chans), mReceiveFIFO(mChans), mDevice(wDevice),
+ sendBuffer(mChans), recvBuffer(mChans), convertRecvBuffer(mChans),
+ convertSendBuffer(mChans), powerScaling(mChans), underrun(false), overrun(false),
+ writeTimestamp(0), readTimestamp(0), receiveOffset(wReceiveOffset), mOn(false)
{
mClock.set(wStartTime);
}
@@ -58,15 +59,6 @@
return false;
}

- close();
-
- sendBuffer.resize(mChans);
- recvBuffer.resize(mChans);
- convertSendBuffer.resize(mChans);
- convertRecvBuffer.resize(mChans);
- mReceiveFIFO.resize(mChans);
- powerScaling.resize(mChans);
-
for (size_t i = 0; i < mChans; i++) {
sendBuffer[i] = new RadioBuffer(NUMCHUNKS, CHUNK * mSPSTx, 0, true);
recvBuffer[i] = new RadioBuffer(NUMCHUNKS, CHUNK * mSPSRx, 0, false);
diff --git a/Transceiver52M/radioInterface.h b/Transceiver52M/radioInterface.h
index efe5606..cd40ddf 100644
--- a/Transceiver52M/radioInterface.h
+++ b/Transceiver52M/radioInterface.h
@@ -31,6 +31,9 @@
class RadioInterface {

protected:
+ size_t mSPSTx;
+ size_t mSPSRx;
+ size_t mChans;

Thread mAlignRadioServiceLoopThread; ///< thread that synchronizes transmit and receive sections

@@ -38,10 +41,6 @@

RadioDevice *mDevice; ///< the USRP object

- size_t mSPSTx;
- size_t mSPSRx;
- size_t mChans;
-
std::vector<RadioBuffer *> sendBuffer;
std::vector<RadioBuffer *> recvBuffer;

diff --git a/Transceiver52M/radioInterfaceMulti.cpp b/Transceiver52M/radioInterfaceMulti.cpp
index eaf0886..eeb0c97 100644
--- a/Transceiver52M/radioInterfaceMulti.cpp
+++ b/Transceiver52M/radioInterfaceMulti.cpp
@@ -44,8 +44,9 @@
RadioInterfaceMulti::RadioInterfaceMulti(RadioDevice *radio, size_t tx_sps,
size_t rx_sps, size_t chans)
: RadioInterface(radio, tx_sps, rx_sps, chans),
- outerSendBuffer(NULL), outerRecvBuffer(NULL),
- dnsampler(NULL), upsampler(NULL), channelizer(NULL), synthesis(NULL)
+ outerSendBuffer(NULL), outerRecvBuffer(NULL), history(mChans), active(MCHANS, false),
+ rx_freq_state(mChans), tx_freq_state(mChans), dnsampler(NULL), upsampler(NULL), channelizer(NULL),
+ synthesis(NULL)
{
}

@@ -74,12 +75,12 @@
for (std::vector<signalVector*>::iterator it = history.begin(); it != history.end(); ++it)
delete *it;

- mReceiveFIFO.resize(0);
- powerScaling.resize(0);
- history.resize(0);
- active.resize(0);
- rx_freq_state.resize(0);
- tx_freq_state.resize(0);
+ mReceiveFIFO.clear();
+ powerScaling.clear();
+ history.clear();
+ active.clear();
+ rx_freq_state.clear();
+ tx_freq_state.clear();

RadioInterface::close();
}
@@ -154,18 +155,9 @@

close();

- sendBuffer.resize(mChans);
- recvBuffer.resize(mChans);
convertSendBuffer.resize(1);
convertRecvBuffer.resize(1);

- mReceiveFIFO.resize(mChans);
- powerScaling.resize(mChans);
- history.resize(mChans);
- rx_freq_state.resize(mChans);
- tx_freq_state.resize(mChans);
- active.resize(MCHANS, false);
-
/* 4 == sps */
inchunk = RESAMP_INRATE * 4;
outchunk = RESAMP_OUTRATE * 4;
diff --git a/Transceiver52M/radioInterfaceResamp.cpp b/Transceiver52M/radioInterfaceResamp.cpp
index b92432f..37167ae 100644
--- a/Transceiver52M/radioInterfaceResamp.cpp
+++ b/Transceiver52M/radioInterfaceResamp.cpp
@@ -100,13 +100,6 @@

close();

- sendBuffer.resize(1);
- recvBuffer.resize(1);
- convertSendBuffer.resize(1);
- convertRecvBuffer.resize(1);
- mReceiveFIFO.resize(1);
- powerScaling.resize(1);
-
switch (type) {
case RadioDevice::RESAMP_64M:
resamp_inrate = RESAMP_64M_INRATE;
diff --git a/tests/CommonLibs/InterthreadTest.cpp b/tests/CommonLibs/InterthreadTest.cpp
index 462df08..a00980f 100644
--- a/tests/CommonLibs/InterthreadTest.cpp
+++ b/tests/CommonLibs/InterthreadTest.cpp
@@ -29,9 +29,9 @@
#include "Threads.h"
#include "Interthread.h"
#include <iostream>
+#include <mutex>

-using namespace std;
-
+std::mutex dbg_cout;

InterthreadQueue<int> gQ;
InterthreadMap<int,int> gMap;
@@ -41,6 +41,8 @@
int m_last_read_val;
int m_last_write_val;

+#define CERR(text) { dbg_cout.lock() ; std::cerr << text; dbg_cout.unlock(); }
+
void* qWriter(void*)
{
int *p;

To view, visit change 30486. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: osmo-trx
Gerrit-Branch: master
Gerrit-Change-Id: Idc9e3b1144c5b93f5dad2f8e0e30f1058477aa52
Gerrit-Change-Number: 30486
Gerrit-PatchSet: 21
Gerrit-Owner: Hoernchen <ewild@sysmocom.de>
Gerrit-Reviewer: Hoernchen <ewild@sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <laforge@osmocom.org>
Gerrit-Reviewer: pespin <pespin@sysmocom.de>
Gerrit-MessageType: merged