ROOT logo
// $Id: XrdFileCloseReporterAmq.cxx 2821 2012-07-14 01:30:37Z matevz $

// Copyright (C) 1999-2008, Matevz Tadel. All rights reserved.
// This file is part of GLED, released under GNU General Public License version 2.
// For the licensing terms see $GLEDSYS/LICENSE or http://www.gnu.org/.

#include "XrdFileCloseReporterAmq.h"
#include "XrdFileCloseReporterAmq.c7"

#include "XrdFile.h"
#include "XrdUser.h"
#include "XrdServer.h"

#include "Glasses/ZLog.h"

#include <activemq/library/ActiveMQCPP.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>

#include <cerrno>

// XrdFileCloseReporterAmq

//______________________________________________________________________________
//
//

ClassImp(XrdFileCloseReporterAmq);

//==============================================================================

void XrdFileCloseReporterAmq::_init()
{
  mAmqHost  = "gridmsg007.cern.ch";
  mAmqPort  = 6163;
  mAmqUser  = "xrdpop";
  mAmqPswd  = "xyzz";
  mAmqTopic = "xrdpop.uscms_test_popularity";

  mConnFac = 0;
  mConn = 0;
  mSess = 0;
  mDest = 0;
  mProd = 0;
}

XrdFileCloseReporterAmq::XrdFileCloseReporterAmq(const Text_t* n, const Text_t* t) :
  XrdFileCloseReporter(n, t)
{
  _init();
}

XrdFileCloseReporterAmq::~XrdFileCloseReporterAmq()
{}

//==============================================================================

void XrdFileCloseReporterAmq::ReportLoopInit()
{
  static const Exc_t _eh("XrdFileCloseReporterAmq::ReportLoopInit ");

  TString uri;
  uri.Form("failover://(tcp://%s:%hu?wireFormat=stomp)", mAmqHost.Data(), mAmqPort);
  mConnFac = new activemq::core::ActiveMQConnectionFactory(uri.Data(), mAmqUser.Data(), mAmqPswd.Data());

  try
  {
    mConn = mConnFac->createConnection();
    mConn->start();
  }
  catch (cms::CMSException& e)
  {
    throw _eh + "Exception during connection creation: " + e.getStackTraceString();
  }

  mSess = mConn->createSession(); // Default is AUTO_ACKNOWLEDGE
  mDest = mSess->createTopic(mAmqTopic.Data());
  mProd = mSess->createProducer(mDest);
  mProd->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT); // Copied from examples, NFI.
}

namespace
{
  Long64_t dtoll (Double_t x) { return static_cast<Long64_t>(x);   }
  Double_t dmtod (Double_t x) { return 1024.0 * 1024.0 * x;        }
  Long64_t dmtoll(Double_t x) { return dtoll(dmtod(x));            }
}

void XrdFileCloseReporterAmq::ReportFileClosed(FileUserServer& fus)
{
  static const Exc_t _eh("XrdFileCloseReporterAmq::ReportFileClosed ");

  XrdFile   *file   = fus.fFile;
  XrdUser   *user   = fus.fUser;
  XrdServer *server = fus.fServer;

  TString msg("{");

  {
    GLensReadHolder _flck(file);

    // Long64_t unique_id = 1000ll * file->RefCloseTime().ToMiliSec();
    // if (unique_id == mLastUidBase)
    // {
    //   unique_id = mLastUidBase + ++mLastUidInner;
    //   if (mLastUidInner >= 1000ll)
    //   {
    //     ZLog::Helper log(*mLog, ZLog::L_Warning, _eh);
    //     log.Form("Inner counter for unique-id overflowed for file='%s'.", file->GetName());
    //     return;
    //   }
    // }
    // else
    // {
    //   mLastUidBase  = unique_id;
    //   mLastUidInner = 0;
    // }

    const SRange &RS   = file->RefReadStats();
    const SRange &RSS  = file->RefSingleReadStats();
    const SRange &RSV  = file->RefVecReadStats();
    const SRange &RSVC = file->RefVecReadCntStats();
    const SRange &WS   = file->RefWriteStats();
    msg += TString::Format
      ("'file_lfn':'%s', 'file_size':'%lld', 'start_time':'%llu', 'end_time':'%llu', "
       "'read_bytes':'%lld', 'read_operations':'%llu', 'read_min':'%lld', 'read_max':'%lld', 'read_average':'%f', 'read_sigma':'%f', "
       "'read_single_bytes':'%lld', 'read_single_operations':'%llu', 'read_single_min':'%lld', 'read_single_max':'%lld', 'read_single_average':'%f', 'read_single_sigma':'%f', "
       "'read_vector_bytes':'%lld', 'read_vector_operations':'%llu', 'read_vector_min':'%lld', 'read_vector_max':'%lld', 'read_vector_average':'%f', 'read_vector_sigma':'%f', "
       "'read_vector_count_min':'%lld', 'read_vector_count_max':'%lld', 'read_vector_count_average':'%f', 'read_vector_count_sigma':'%f', "
       "'write_bytes':'%lld', 'write_operations':'%llu', 'write_min':'%lld', 'write_max':'%lld', 'write_average':'%f', 'write_sigma':'%f', "
       "'read_bytes_at_close':'%lld', "
       "'write_bytes_at_close':'%lld', ",
       file->GetName(), dmtoll(file->GetSizeMB()), file->RefOpenTime().GetSec(), file->RefCloseTime().GetSec(),
       dmtoll(RS .GetSumX()), RS .GetN(), dmtoll(RS .GetMin()), dmtoll(RS .GetMax()), dmtod(RS .GetAverage()), dmtod(RS .GetSigma()),
       dmtoll(RSS.GetSumX()), RSS.GetN(), dmtoll(RSS.GetMin()), dmtoll(RSS.GetMax()), dmtod(RSS.GetAverage()), dmtod(RSS.GetSigma()),
       dmtoll(RSV.GetSumX()), RSV.GetN(), dmtoll(RSV.GetMin()), dmtoll(RSV.GetMax()), dmtod(RSV.GetAverage()), dmtod(RSV.GetSigma()),
                                          dtoll(RSVC.GetMin()), dtoll(RSVC.GetMax()), RSVC.GetAverage(),       RSVC.GetSigma(),
       dmtoll(WS .GetSumX()), WS .GetN(), dmtoll(WS .GetMin()), dmtoll(WS .GetMax()), dmtod(WS .GetAverage()), dmtod(WS .GetSigma()),
       dmtoll(file->GetRTotalMB()),
       dmtoll(file->GetWTotalMB()));
  }
  {
    GLensReadHolder _ulck(user);
    msg += TString::Format
      ("'user_dn':'%s', 'user_vo':'%s', 'user_role':'%s', 'user_fqan':'%s', 'client_domain':'%s', 'client_host':'%s', "
       "'server_username':'%s', 'app_info':'%s', ",
       user->GetDN(), user->GetVO(), user->GetRole(), user->GetGroup(),
       user->GetFromDomain(), user->GetFromHost(), user->GetServerUsername(), user->GetAppInfo());
  }
  {
    GLensReadHolder _slck(server);
    msg += TString::Format
      ("'server_domain':'%s', 'server_host':'%s'",
       server->GetDomain(), server->GetHost());
  }

  msg += "}";

  cms::TextMessage* aqm = mSess->createTextMessage(msg.Data());
  mProd->send(aqm);
  delete aqm;
}

void XrdFileCloseReporterAmq::ReportLoopFinalize()
{
  delete mConnFac; mConnFac = 0;
  delete mConn;    mConn = 0;
  delete mSess;    mSess = 0;
  delete mDest;    mDest = 0;
  delete mProd;    mProd = 0;
}


//==============================================================================
// XrdMon libset user-init
//==============================================================================

// Temporary location, as AMQ is the only thing that needs global
// initialization and XrdFileCloseReporterAmq is the only thing that uses AMQ.
//
// This should go elsewhere, eventually.

void libXrdMon_GLED_user_init()
{
  activemq::library::ActiveMQCPP::initializeLibrary();
}

void* XrdMon_GLED_user_init = (void*) libXrdMon_GLED_user_init;

void libXrdMon_GLED_user_shutdown()
{
  activemq::library::ActiveMQCPP::shutdownLibrary();
}

void* XrdMon_GLED_user_shutdown = (void*) libXrdMon_GLED_user_shutdown;
 XrdFileCloseReporterAmq.cxx:1
 XrdFileCloseReporterAmq.cxx:2
 XrdFileCloseReporterAmq.cxx:3
 XrdFileCloseReporterAmq.cxx:4
 XrdFileCloseReporterAmq.cxx:5
 XrdFileCloseReporterAmq.cxx:6
 XrdFileCloseReporterAmq.cxx:7
 XrdFileCloseReporterAmq.cxx:8
 XrdFileCloseReporterAmq.cxx:9
 XrdFileCloseReporterAmq.cxx:10
 XrdFileCloseReporterAmq.cxx:11
 XrdFileCloseReporterAmq.cxx:12
 XrdFileCloseReporterAmq.cxx:13
 XrdFileCloseReporterAmq.cxx:14
 XrdFileCloseReporterAmq.cxx:15
 XrdFileCloseReporterAmq.cxx:16
 XrdFileCloseReporterAmq.cxx:17
 XrdFileCloseReporterAmq.cxx:18
 XrdFileCloseReporterAmq.cxx:19
 XrdFileCloseReporterAmq.cxx:20
 XrdFileCloseReporterAmq.cxx:21
 XrdFileCloseReporterAmq.cxx:22
 XrdFileCloseReporterAmq.cxx:23
 XrdFileCloseReporterAmq.cxx:24
 XrdFileCloseReporterAmq.cxx:25
 XrdFileCloseReporterAmq.cxx:26
 XrdFileCloseReporterAmq.cxx:27
 XrdFileCloseReporterAmq.cxx:28
 XrdFileCloseReporterAmq.cxx:29
 XrdFileCloseReporterAmq.cxx:30
 XrdFileCloseReporterAmq.cxx:31
 XrdFileCloseReporterAmq.cxx:32
 XrdFileCloseReporterAmq.cxx:33
 XrdFileCloseReporterAmq.cxx:34
 XrdFileCloseReporterAmq.cxx:35
 XrdFileCloseReporterAmq.cxx:36
 XrdFileCloseReporterAmq.cxx:37
 XrdFileCloseReporterAmq.cxx:38
 XrdFileCloseReporterAmq.cxx:39
 XrdFileCloseReporterAmq.cxx:40
 XrdFileCloseReporterAmq.cxx:41
 XrdFileCloseReporterAmq.cxx:42
 XrdFileCloseReporterAmq.cxx:43
 XrdFileCloseReporterAmq.cxx:44
 XrdFileCloseReporterAmq.cxx:45
 XrdFileCloseReporterAmq.cxx:46
 XrdFileCloseReporterAmq.cxx:47
 XrdFileCloseReporterAmq.cxx:48
 XrdFileCloseReporterAmq.cxx:49
 XrdFileCloseReporterAmq.cxx:50
 XrdFileCloseReporterAmq.cxx:51
 XrdFileCloseReporterAmq.cxx:52
 XrdFileCloseReporterAmq.cxx:53
 XrdFileCloseReporterAmq.cxx:54
 XrdFileCloseReporterAmq.cxx:55
 XrdFileCloseReporterAmq.cxx:56
 XrdFileCloseReporterAmq.cxx:57
 XrdFileCloseReporterAmq.cxx:58
 XrdFileCloseReporterAmq.cxx:59
 XrdFileCloseReporterAmq.cxx:60
 XrdFileCloseReporterAmq.cxx:61
 XrdFileCloseReporterAmq.cxx:62
 XrdFileCloseReporterAmq.cxx:63
 XrdFileCloseReporterAmq.cxx:64
 XrdFileCloseReporterAmq.cxx:65
 XrdFileCloseReporterAmq.cxx:66
 XrdFileCloseReporterAmq.cxx:67
 XrdFileCloseReporterAmq.cxx:68
 XrdFileCloseReporterAmq.cxx:69
 XrdFileCloseReporterAmq.cxx:70
 XrdFileCloseReporterAmq.cxx:71
 XrdFileCloseReporterAmq.cxx:72
 XrdFileCloseReporterAmq.cxx:73
 XrdFileCloseReporterAmq.cxx:74
 XrdFileCloseReporterAmq.cxx:75
 XrdFileCloseReporterAmq.cxx:76
 XrdFileCloseReporterAmq.cxx:77
 XrdFileCloseReporterAmq.cxx:78
 XrdFileCloseReporterAmq.cxx:79
 XrdFileCloseReporterAmq.cxx:80
 XrdFileCloseReporterAmq.cxx:81
 XrdFileCloseReporterAmq.cxx:82
 XrdFileCloseReporterAmq.cxx:83
 XrdFileCloseReporterAmq.cxx:84
 XrdFileCloseReporterAmq.cxx:85
 XrdFileCloseReporterAmq.cxx:86
 XrdFileCloseReporterAmq.cxx:87
 XrdFileCloseReporterAmq.cxx:88
 XrdFileCloseReporterAmq.cxx:89
 XrdFileCloseReporterAmq.cxx:90
 XrdFileCloseReporterAmq.cxx:91
 XrdFileCloseReporterAmq.cxx:92
 XrdFileCloseReporterAmq.cxx:93
 XrdFileCloseReporterAmq.cxx:94
 XrdFileCloseReporterAmq.cxx:95
 XrdFileCloseReporterAmq.cxx:96
 XrdFileCloseReporterAmq.cxx:97
 XrdFileCloseReporterAmq.cxx:98
 XrdFileCloseReporterAmq.cxx:99
 XrdFileCloseReporterAmq.cxx:100
 XrdFileCloseReporterAmq.cxx:101
 XrdFileCloseReporterAmq.cxx:102
 XrdFileCloseReporterAmq.cxx:103
 XrdFileCloseReporterAmq.cxx:104
 XrdFileCloseReporterAmq.cxx:105
 XrdFileCloseReporterAmq.cxx:106
 XrdFileCloseReporterAmq.cxx:107
 XrdFileCloseReporterAmq.cxx:108
 XrdFileCloseReporterAmq.cxx:109
 XrdFileCloseReporterAmq.cxx:110
 XrdFileCloseReporterAmq.cxx:111
 XrdFileCloseReporterAmq.cxx:112
 XrdFileCloseReporterAmq.cxx:113
 XrdFileCloseReporterAmq.cxx:114
 XrdFileCloseReporterAmq.cxx:115
 XrdFileCloseReporterAmq.cxx:116
 XrdFileCloseReporterAmq.cxx:117
 XrdFileCloseReporterAmq.cxx:118
 XrdFileCloseReporterAmq.cxx:119
 XrdFileCloseReporterAmq.cxx:120
 XrdFileCloseReporterAmq.cxx:121
 XrdFileCloseReporterAmq.cxx:122
 XrdFileCloseReporterAmq.cxx:123
 XrdFileCloseReporterAmq.cxx:124
 XrdFileCloseReporterAmq.cxx:125
 XrdFileCloseReporterAmq.cxx:126
 XrdFileCloseReporterAmq.cxx:127
 XrdFileCloseReporterAmq.cxx:128
 XrdFileCloseReporterAmq.cxx:129
 XrdFileCloseReporterAmq.cxx:130
 XrdFileCloseReporterAmq.cxx:131
 XrdFileCloseReporterAmq.cxx:132
 XrdFileCloseReporterAmq.cxx:133
 XrdFileCloseReporterAmq.cxx:134
 XrdFileCloseReporterAmq.cxx:135
 XrdFileCloseReporterAmq.cxx:136
 XrdFileCloseReporterAmq.cxx:137
 XrdFileCloseReporterAmq.cxx:138
 XrdFileCloseReporterAmq.cxx:139
 XrdFileCloseReporterAmq.cxx:140
 XrdFileCloseReporterAmq.cxx:141
 XrdFileCloseReporterAmq.cxx:142
 XrdFileCloseReporterAmq.cxx:143
 XrdFileCloseReporterAmq.cxx:144
 XrdFileCloseReporterAmq.cxx:145
 XrdFileCloseReporterAmq.cxx:146
 XrdFileCloseReporterAmq.cxx:147
 XrdFileCloseReporterAmq.cxx:148
 XrdFileCloseReporterAmq.cxx:149
 XrdFileCloseReporterAmq.cxx:150
 XrdFileCloseReporterAmq.cxx:151
 XrdFileCloseReporterAmq.cxx:152
 XrdFileCloseReporterAmq.cxx:153
 XrdFileCloseReporterAmq.cxx:154
 XrdFileCloseReporterAmq.cxx:155
 XrdFileCloseReporterAmq.cxx:156
 XrdFileCloseReporterAmq.cxx:157
 XrdFileCloseReporterAmq.cxx:158
 XrdFileCloseReporterAmq.cxx:159
 XrdFileCloseReporterAmq.cxx:160
 XrdFileCloseReporterAmq.cxx:161
 XrdFileCloseReporterAmq.cxx:162
 XrdFileCloseReporterAmq.cxx:163
 XrdFileCloseReporterAmq.cxx:164
 XrdFileCloseReporterAmq.cxx:165
 XrdFileCloseReporterAmq.cxx:166
 XrdFileCloseReporterAmq.cxx:167
 XrdFileCloseReporterAmq.cxx:168
 XrdFileCloseReporterAmq.cxx:169
 XrdFileCloseReporterAmq.cxx:170
 XrdFileCloseReporterAmq.cxx:171
 XrdFileCloseReporterAmq.cxx:172
 XrdFileCloseReporterAmq.cxx:173
 XrdFileCloseReporterAmq.cxx:174
 XrdFileCloseReporterAmq.cxx:175
 XrdFileCloseReporterAmq.cxx:176
 XrdFileCloseReporterAmq.cxx:177
 XrdFileCloseReporterAmq.cxx:178
 XrdFileCloseReporterAmq.cxx:179
 XrdFileCloseReporterAmq.cxx:180
 XrdFileCloseReporterAmq.cxx:181
 XrdFileCloseReporterAmq.cxx:182
 XrdFileCloseReporterAmq.cxx:183
 XrdFileCloseReporterAmq.cxx:184
 XrdFileCloseReporterAmq.cxx:185
 XrdFileCloseReporterAmq.cxx:186
 XrdFileCloseReporterAmq.cxx:187
 XrdFileCloseReporterAmq.cxx:188
 XrdFileCloseReporterAmq.cxx:189
 XrdFileCloseReporterAmq.cxx:190
 XrdFileCloseReporterAmq.cxx:191
 XrdFileCloseReporterAmq.cxx:192
 XrdFileCloseReporterAmq.cxx:193
 XrdFileCloseReporterAmq.cxx:194
 XrdFileCloseReporterAmq.cxx:195
 XrdFileCloseReporterAmq.cxx:196
 XrdFileCloseReporterAmq.cxx:197