ROOT logo
// $Id: UdpPacketTcpServer.cxx 2718 2012-04-07 03:31:13Z matevz $

// Copyright (C) 1999-2008, Matevz Tadel. All rights reserved.
// This file is part of GLED, released under GNU General Public License version 2.
// For the licensing terms see $GLEDSYS/LICENSE or http://www.gnu.org/.

#include "UdpPacketTcpServer.h"
#include "UdpPacketSource.h"
#include "Glasses/ZLog.h"
#include "UdpPacketTcpServer.c7"

#include "Stones/SServerSocket.h"
#include "Stones/SMessage.h"
#include "Stones/SUdpPacket.h"
#include "Gled/GThread.h"

#include <cerrno>


// UdpPacketTcpServer

//______________________________________________________________________________
//
//

ClassImp(UdpPacketTcpServer);

//==============================================================================

void UdpPacketTcpServer::_init()
{
  mServThread = 0;
  mServSocket = 0;
  mServPort   = 9940;

  mDeliThread = 0;
}

UdpPacketTcpServer::UdpPacketTcpServer(const Text_t* n, const Text_t* t) :
  ZGlass(n, t),
  mSelector(GMutex::recursive)
{
  _init();
}

UdpPacketTcpServer::~UdpPacketTcpServer()
{}

//==============================================================================

void* UdpPacketTcpServer::tl_Serve(UdpPacketTcpServer* s)
{
  s->Serve();
  return 0;
}

void UdpPacketTcpServer::Serve()
{
  static const Exc_t _eh("UdpPacketTcpServer::Serve ");

  GThread::SetCancelType(GThread::CT_Deferred);

  mSelector.Clear();
  mSelector.fRead.Add(mServSocket);

  while (true)
  {
    GThread::CancelOn();
    mSelector.Select();
    GThread::CancelOff();

    for (GFdSet_i i = mSelector.fReadOut.begin(); i != mSelector.fReadOut.end(); ++i)
    {
      SSocket* s = (SSocket*) i->first;
      if (s == mServSocket)
      {
        // New client.

        SSocket *cs = mServSocket->Accept();
        if (*mLog)
        {
          mLog->Form(ZLog::L_Message, _eh, "New connection from %s.",
                     cs->GetInetAddress().GetHostName());
        }
        AddClient(cs);
      }
      else
      {
        // Message from some other socket.
        // Can be close ... which is the only thing we MUST handle, to the
        // first order.
        // Eventually support other stuff, like streaming/compression control,
        // play-back, etc. Maybe.

        SMessage *m = SMessage::ReceiveOrReport(s, _eh, true, *mLog);
        if (m)
        {
          if (*mLog)
          {
            mLog->Form(ZLog::L_Message, _eh, "Got messge from %s, len=%u, what=%u.",
                       s->GetInetAddress().GetHostName(), m->Length(), m->What());
          }
          delete m;
        }
        else if (s->TestBit(SSocket::kBrokenConn))
        {
          RemoveClient(s);
        }
      }
    }
  }
}

void UdpPacketTcpServer::AddClient(SSocket *cs)
{
  mSelector.Lock();
  mSelector.fRead.Add(cs);
  mClients.push_back(cs);
  mSelector.Unlock();
}

void UdpPacketTcpServer::RemoveClient(SSocket* cs)
{
  mSelector.Lock();
  list<SSocket*>::iterator sli = find(mClients.begin(), mClients.end(), cs);
  if (sli != mClients.end())
  {
    RemoveClient(sli);
  }
  mSelector.Unlock();
}

void UdpPacketTcpServer::RemoveClient(list<SSocket*>::iterator sli)
{
  SSocket *cs = *sli;
  mSelector.Lock();
  mSelector.fRead.Remove(*sli);
  mClients.erase(sli);
  mSelector.Unlock();
  delete cs;
}

//==============================================================================

void* UdpPacketTcpServer::tl_Deliver(UdpPacketTcpServer* s)
{
  s->Deliver();
  return 0;
}

void UdpPacketTcpServer::Deliver()
{
  // wait on condifiton, while queue not empty, deliver to all sockets

  static const Exc_t _eh("UdpPacketTcpServer::Deliver ");

  while (true)
  {
    SUdpPacket *p = mUdpQueue.PopFront();

    // Loop over clients
    SMessage msg(444, p->NetBufferSize());
    p->NetStreamer(msg);
    msg.SetLength();

    mSelector.Lock();
    list<SSocket*>::iterator i = mClients.begin();
    while (i != mClients.end())
    {
      try
      {
        msg.Send(*i, false);
      }
      catch (Int_t err)
      {
        if (*mLog)
        {
          mLog->Form(ZLog::L_Error, _eh,
                     "Error %d sending to %s:%d (%s). Closing connection.",
                     err, (*i)->GetInetAddress().GetHostName(), (*i)->GetLocalPort(),
                     strerror(errno));
        }
        list<SSocket*>::iterator j = i++;
        RemoveClient(j);
        continue;
      }
      ++i;
    }
    mSelector.Unlock();

    p->DecRefCount();
  }
}

//==============================================================================

void UdpPacketTcpServer::StartAllServices()
{
  static const Exc_t _eh("UdpPacketTcpServer::StartAllServices ");

  // XXX Assert some stuff about source

  {
    GLensReadHolder _lck(this);
    if (mDeliThread || mServThread)
      throw _eh + "already running.";

    mServSocket = new SServerSocket(mServPort, true);
    if (!mServSocket->IsValid())
    {
      delete mServSocket; mServSocket = 0;
      throw _eh + "Creation of server socket failed.";
    }

    mDeliThread = new GThread("UdpPacketTcpServer-Deliverer",
                              (GThread_foo) tl_Deliver, this,
                              false);
    mDeliThread->SetNice(10);

    mServThread = new GThread("UdpPacketTcpServer-Server",
                                (GThread_foo) tl_Serve, this,
                                false);
    mServThread->SetNice(20);
  }

  mSource->RegisterConsumer(&mUdpQueue);

  mDeliThread->Spawn();
  mServThread->Spawn();
}

void UdpPacketTcpServer::StopAllServices()
{
  static const Exc_t _eh("UdpPacketTcpServer::StopAllServices ");

  GThread *thr = 0;
  {
    GLensReadHolder _lck(this);
    if ( ! GThread::IsValidPtr(mServThread))
      throw _eh + "not running.";
    thr = mServThread;
    GThread::InvalidatePtr(mServThread);
  }

  // XXX Somewhere, close all sockets.

  mSource->UnregisterConsumer(&mUdpQueue);

  thr->Cancel();
  thr->Join();

  mDeliThread->Cancel();
  mDeliThread->Join();

  {
    GLensReadHolder _lck(this);
    mServThread = 0;
    mDeliThread = 0;
  }
}
 UdpPacketTcpServer.cxx:1
 UdpPacketTcpServer.cxx:2
 UdpPacketTcpServer.cxx:3
 UdpPacketTcpServer.cxx:4
 UdpPacketTcpServer.cxx:5
 UdpPacketTcpServer.cxx:6
 UdpPacketTcpServer.cxx:7
 UdpPacketTcpServer.cxx:8
 UdpPacketTcpServer.cxx:9
 UdpPacketTcpServer.cxx:10
 UdpPacketTcpServer.cxx:11
 UdpPacketTcpServer.cxx:12
 UdpPacketTcpServer.cxx:13
 UdpPacketTcpServer.cxx:14
 UdpPacketTcpServer.cxx:15
 UdpPacketTcpServer.cxx:16
 UdpPacketTcpServer.cxx:17
 UdpPacketTcpServer.cxx:18
 UdpPacketTcpServer.cxx:19
 UdpPacketTcpServer.cxx:20
 UdpPacketTcpServer.cxx:21
 UdpPacketTcpServer.cxx:22
 UdpPacketTcpServer.cxx:23
 UdpPacketTcpServer.cxx:24
 UdpPacketTcpServer.cxx:25
 UdpPacketTcpServer.cxx:26
 UdpPacketTcpServer.cxx:27
 UdpPacketTcpServer.cxx:28
 UdpPacketTcpServer.cxx:29
 UdpPacketTcpServer.cxx:30
 UdpPacketTcpServer.cxx:31
 UdpPacketTcpServer.cxx:32
 UdpPacketTcpServer.cxx:33
 UdpPacketTcpServer.cxx:34
 UdpPacketTcpServer.cxx:35
 UdpPacketTcpServer.cxx:36
 UdpPacketTcpServer.cxx:37
 UdpPacketTcpServer.cxx:38
 UdpPacketTcpServer.cxx:39
 UdpPacketTcpServer.cxx:40
 UdpPacketTcpServer.cxx:41
 UdpPacketTcpServer.cxx:42
 UdpPacketTcpServer.cxx:43
 UdpPacketTcpServer.cxx:44
 UdpPacketTcpServer.cxx:45
 UdpPacketTcpServer.cxx:46
 UdpPacketTcpServer.cxx:47
 UdpPacketTcpServer.cxx:48
 UdpPacketTcpServer.cxx:49
 UdpPacketTcpServer.cxx:50
 UdpPacketTcpServer.cxx:51
 UdpPacketTcpServer.cxx:52
 UdpPacketTcpServer.cxx:53
 UdpPacketTcpServer.cxx:54
 UdpPacketTcpServer.cxx:55
 UdpPacketTcpServer.cxx:56
 UdpPacketTcpServer.cxx:57
 UdpPacketTcpServer.cxx:58
 UdpPacketTcpServer.cxx:59
 UdpPacketTcpServer.cxx:60
 UdpPacketTcpServer.cxx:61
 UdpPacketTcpServer.cxx:62
 UdpPacketTcpServer.cxx:63
 UdpPacketTcpServer.cxx:64
 UdpPacketTcpServer.cxx:65
 UdpPacketTcpServer.cxx:66
 UdpPacketTcpServer.cxx:67
 UdpPacketTcpServer.cxx:68
 UdpPacketTcpServer.cxx:69
 UdpPacketTcpServer.cxx:70
 UdpPacketTcpServer.cxx:71
 UdpPacketTcpServer.cxx:72
 UdpPacketTcpServer.cxx:73
 UdpPacketTcpServer.cxx:74
 UdpPacketTcpServer.cxx:75
 UdpPacketTcpServer.cxx:76
 UdpPacketTcpServer.cxx:77
 UdpPacketTcpServer.cxx:78
 UdpPacketTcpServer.cxx:79
 UdpPacketTcpServer.cxx:80
 UdpPacketTcpServer.cxx:81
 UdpPacketTcpServer.cxx:82
 UdpPacketTcpServer.cxx:83
 UdpPacketTcpServer.cxx:84
 UdpPacketTcpServer.cxx:85
 UdpPacketTcpServer.cxx:86
 UdpPacketTcpServer.cxx:87
 UdpPacketTcpServer.cxx:88
 UdpPacketTcpServer.cxx:89
 UdpPacketTcpServer.cxx:90
 UdpPacketTcpServer.cxx:91
 UdpPacketTcpServer.cxx:92
 UdpPacketTcpServer.cxx:93
 UdpPacketTcpServer.cxx:94
 UdpPacketTcpServer.cxx:95
 UdpPacketTcpServer.cxx:96
 UdpPacketTcpServer.cxx:97
 UdpPacketTcpServer.cxx:98
 UdpPacketTcpServer.cxx:99
 UdpPacketTcpServer.cxx:100
 UdpPacketTcpServer.cxx:101
 UdpPacketTcpServer.cxx:102
 UdpPacketTcpServer.cxx:103
 UdpPacketTcpServer.cxx:104
 UdpPacketTcpServer.cxx:105
 UdpPacketTcpServer.cxx:106
 UdpPacketTcpServer.cxx:107
 UdpPacketTcpServer.cxx:108
 UdpPacketTcpServer.cxx:109
 UdpPacketTcpServer.cxx:110
 UdpPacketTcpServer.cxx:111
 UdpPacketTcpServer.cxx:112
 UdpPacketTcpServer.cxx:113
 UdpPacketTcpServer.cxx:114
 UdpPacketTcpServer.cxx:115
 UdpPacketTcpServer.cxx:116
 UdpPacketTcpServer.cxx:117
 UdpPacketTcpServer.cxx:118
 UdpPacketTcpServer.cxx:119
 UdpPacketTcpServer.cxx:120
 UdpPacketTcpServer.cxx:121
 UdpPacketTcpServer.cxx:122
 UdpPacketTcpServer.cxx:123
 UdpPacketTcpServer.cxx:124
 UdpPacketTcpServer.cxx:125
 UdpPacketTcpServer.cxx:126
 UdpPacketTcpServer.cxx:127
 UdpPacketTcpServer.cxx:128
 UdpPacketTcpServer.cxx:129
 UdpPacketTcpServer.cxx:130
 UdpPacketTcpServer.cxx:131
 UdpPacketTcpServer.cxx:132
 UdpPacketTcpServer.cxx:133
 UdpPacketTcpServer.cxx:134
 UdpPacketTcpServer.cxx:135
 UdpPacketTcpServer.cxx:136
 UdpPacketTcpServer.cxx:137
 UdpPacketTcpServer.cxx:138
 UdpPacketTcpServer.cxx:139
 UdpPacketTcpServer.cxx:140
 UdpPacketTcpServer.cxx:141
 UdpPacketTcpServer.cxx:142
 UdpPacketTcpServer.cxx:143
 UdpPacketTcpServer.cxx:144
 UdpPacketTcpServer.cxx:145
 UdpPacketTcpServer.cxx:146
 UdpPacketTcpServer.cxx:147
 UdpPacketTcpServer.cxx:148
 UdpPacketTcpServer.cxx:149
 UdpPacketTcpServer.cxx:150
 UdpPacketTcpServer.cxx:151
 UdpPacketTcpServer.cxx:152
 UdpPacketTcpServer.cxx:153
 UdpPacketTcpServer.cxx:154
 UdpPacketTcpServer.cxx:155
 UdpPacketTcpServer.cxx:156
 UdpPacketTcpServer.cxx:157
 UdpPacketTcpServer.cxx:158
 UdpPacketTcpServer.cxx:159
 UdpPacketTcpServer.cxx:160
 UdpPacketTcpServer.cxx:161
 UdpPacketTcpServer.cxx:162
 UdpPacketTcpServer.cxx:163
 UdpPacketTcpServer.cxx:164
 UdpPacketTcpServer.cxx:165
 UdpPacketTcpServer.cxx:166
 UdpPacketTcpServer.cxx:167
 UdpPacketTcpServer.cxx:168
 UdpPacketTcpServer.cxx:169
 UdpPacketTcpServer.cxx:170
 UdpPacketTcpServer.cxx:171
 UdpPacketTcpServer.cxx:172
 UdpPacketTcpServer.cxx:173
 UdpPacketTcpServer.cxx:174
 UdpPacketTcpServer.cxx:175
 UdpPacketTcpServer.cxx:176
 UdpPacketTcpServer.cxx:177
 UdpPacketTcpServer.cxx:178
 UdpPacketTcpServer.cxx:179
 UdpPacketTcpServer.cxx:180
 UdpPacketTcpServer.cxx:181
 UdpPacketTcpServer.cxx:182
 UdpPacketTcpServer.cxx:183
 UdpPacketTcpServer.cxx:184
 UdpPacketTcpServer.cxx:185
 UdpPacketTcpServer.cxx:186
 UdpPacketTcpServer.cxx:187
 UdpPacketTcpServer.cxx:188
 UdpPacketTcpServer.cxx:189
 UdpPacketTcpServer.cxx:190
 UdpPacketTcpServer.cxx:191
 UdpPacketTcpServer.cxx:192
 UdpPacketTcpServer.cxx:193
 UdpPacketTcpServer.cxx:194
 UdpPacketTcpServer.cxx:195
 UdpPacketTcpServer.cxx:196
 UdpPacketTcpServer.cxx:197
 UdpPacketTcpServer.cxx:198
 UdpPacketTcpServer.cxx:199
 UdpPacketTcpServer.cxx:200
 UdpPacketTcpServer.cxx:201
 UdpPacketTcpServer.cxx:202
 UdpPacketTcpServer.cxx:203
 UdpPacketTcpServer.cxx:204
 UdpPacketTcpServer.cxx:205
 UdpPacketTcpServer.cxx:206
 UdpPacketTcpServer.cxx:207
 UdpPacketTcpServer.cxx:208
 UdpPacketTcpServer.cxx:209
 UdpPacketTcpServer.cxx:210
 UdpPacketTcpServer.cxx:211
 UdpPacketTcpServer.cxx:212
 UdpPacketTcpServer.cxx:213
 UdpPacketTcpServer.cxx:214
 UdpPacketTcpServer.cxx:215
 UdpPacketTcpServer.cxx:216
 UdpPacketTcpServer.cxx:217
 UdpPacketTcpServer.cxx:218
 UdpPacketTcpServer.cxx:219
 UdpPacketTcpServer.cxx:220
 UdpPacketTcpServer.cxx:221
 UdpPacketTcpServer.cxx:222
 UdpPacketTcpServer.cxx:223
 UdpPacketTcpServer.cxx:224
 UdpPacketTcpServer.cxx:225
 UdpPacketTcpServer.cxx:226
 UdpPacketTcpServer.cxx:227
 UdpPacketTcpServer.cxx:228
 UdpPacketTcpServer.cxx:229
 UdpPacketTcpServer.cxx:230
 UdpPacketTcpServer.cxx:231
 UdpPacketTcpServer.cxx:232
 UdpPacketTcpServer.cxx:233
 UdpPacketTcpServer.cxx:234
 UdpPacketTcpServer.cxx:235
 UdpPacketTcpServer.cxx:236
 UdpPacketTcpServer.cxx:237
 UdpPacketTcpServer.cxx:238
 UdpPacketTcpServer.cxx:239
 UdpPacketTcpServer.cxx:240
 UdpPacketTcpServer.cxx:241
 UdpPacketTcpServer.cxx:242
 UdpPacketTcpServer.cxx:243
 UdpPacketTcpServer.cxx:244
 UdpPacketTcpServer.cxx:245
 UdpPacketTcpServer.cxx:246
 UdpPacketTcpServer.cxx:247
 UdpPacketTcpServer.cxx:248
 UdpPacketTcpServer.cxx:249
 UdpPacketTcpServer.cxx:250
 UdpPacketTcpServer.cxx:251
 UdpPacketTcpServer.cxx:252
 UdpPacketTcpServer.cxx:253
 UdpPacketTcpServer.cxx:254
 UdpPacketTcpServer.cxx:255
 UdpPacketTcpServer.cxx:256
 UdpPacketTcpServer.cxx:257
 UdpPacketTcpServer.cxx:258
 UdpPacketTcpServer.cxx:259
 UdpPacketTcpServer.cxx:260