#include "XrdMonSucker.h"
#include "XrdFileCloseReporter.h"
#include "Glasses/UdpPacketSource.h"
#include "Glasses/ZHashList.h"
#include "Glasses/ZLog.h"
#include "XrdMonSucker.c7"
#include "XrdDomain.h"
#include "XrdServer.h"
#include "XrdUser.h"
#include "XrdFile.h"
#include "Stones/SUdpPacket.h"
#include "Gled/GThread.h"
#include "XrdMon/XrdXrootdMonData.h"
#include <cerrno>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
namespace
{
const Double_t One_MB = 1024 * 1024;
}
ClassImp(XrdMonSucker);
void XrdMonSucker::_init()
{
mSuckPort = 9929;
mUserKeepSec = 300;
mUserDeadSec = 86400;
mServDeadSec = 86400;
mServIdentSec = 300;
mServIdentCnt = 5;
mPacketCount = mSeqIdFailCount = 0;
mSocket = 0;
mSuckerThread = 0;
mLastOldUserCheck = mLastDeadUserCheck =
mLastDeadServCheck = mLastIdentServCheck = GTime(GTime::I_Never);
bTraceAllNull = true;
}
XrdMonSucker::XrdMonSucker(const Text_t* n, const Text_t* t) :
ZNameMap(n, t)
{
_init();
SetElementFID(XrdDomain::FID());
}
XrdMonSucker::~XrdMonSucker()
{}
void XrdMonSucker::AdEnlightenment()
{
PARENT_GLASS::AdEnlightenment();
if (mOpenFiles == 0)
{
assign_link<ZHashList>(mOpenFiles, FID(), "OpenFiles");
mOpenFiles->SetElementFID(XrdFile::FID());
}
if (mFCReporters == 0)
{
assign_link<ZHashList>(mFCReporters, FID(), "FileCloseReporters");
mFCReporters->SetElementFID(XrdFileCloseReporter::FID());
mFCReporters->SetMIRActive(false);
}
}
void XrdMonSucker::AddFileCloseReporter(XrdFileCloseReporter* fcr)
{
mFCReporters->Add(fcr);
}
void XrdMonSucker::RemoveFileCloseReporter(XrdFileCloseReporter* fcr)
{
mFCReporters->Remove(fcr);
}
void XrdMonSucker::on_file_open(XrdFile* file)
{
auto_ptr<ZMIR> mir( mOpenFiles->S_Add(file) );
mSaturn->ShootMIR(mir);
}
void XrdMonSucker::on_file_close(XrdFile* file, XrdUser* user, XrdServer* server)
{
Stepper<XrdFileCloseReporter> stepper(*mFCReporters);
while (stepper.step())
{
stepper->FileClosed(file, user, server);
}
{
auto_ptr<ZMIR> mir( mOpenFiles->S_Remove(file) );
mSaturn->ShootMIR(mir);
}
}
void XrdMonSucker::disconnect_user_and_close_open_files(XrdUser* user, XrdServer* server,
const GTime& time)
{
static const Exc_t _eh("XrdMonSucker::disconnect_user_and_close_open_files ");
{
GLensReadHolder _lck(user);
user->SetDisconnectTime(time);
}
list<XrdFile*> open_files;
user->CopyListByGlass<XrdFile>(open_files);
for (list<XrdFile*>::iterator fi = open_files.begin(); fi != open_files.end(); ++fi)
{
XrdFile *file = *fi;
Bool_t closed = false;
{
GLensReadHolder _lck(file);
if (file->IsOpen())
{
file->SetCloseTime(time);
closed = true;
}
}
if (closed)
{
{
GLensReadHolder _lck(server);
try
{
server->RemoveFile(file);
}
catch (Exc_t exc)
{
if (*mLog)
mLog->Put(ZLog::L_Error, _eh, exc);
}
}
on_file_close(file, user, server);
}
}
{
GLensWriteHolder _lck(server);
try
{
server->DisconnectUser(user);
}
catch (Exc_t exc)
{
if (*mLog)
mLog->Put(ZLog::L_Error, _eh, exc);
}
}
}
void XrdMonSucker::disconnect_server(XrdServer* server, XrdDomain *domain,
const GTime& time)
{
{
GMutexHolder _lck(m_xrd_servers_mutex);
m_xrd_servers.erase(server->m_server_id);
}
{
list<XrdUser*> users;
server->CopyListByGlass<XrdUser>(users);
for (list<XrdUser*>::iterator ui = users.begin(); ui != users.end(); ++ui)
{
XrdUser *user = *ui;
disconnect_user_and_close_open_files(user, server, time);
}
}
mSaturn->ShootMIR( mQueen->S_RemoveLenses(server->GetPrevUsers()) );
mSaturn->ShootMIR( domain->S_RemoveAll(server) );
}
void* XrdMonSucker::tl_Suck(XrdMonSucker* s)
{
s->Suck();
s->mSuckerThread = 0;
return 0;
}
void XrdMonSucker::Suck()
{
static const Exc_t _eh("XrdMonSucker::Suck ");
TPMERegexp ip4addr_re ("(\\d+\\.\\d+\\.\\d+)\\.(\\d+)", "o");
TPMERegexp username_re("(\\w+)\\.(\\d+):(\\d+)@(.+)", "o");
TPMERegexp hostname_re("([^\\.]+)\\.(.*)", "o");
TPMERegexp authinfo_re("^&p=(.*)&n=(.*)&h=(.*)&o=(.*)&r=(.*)&g=(.*)&m=(.*)$", "o");
TPMERegexp authxxxx_re("^&p=(.*)&n=(.*)&h=(.*)&o=(.*)&r=(.*)$", "o");
while (true)
{
SUdpPacket *p = mUdpQueue.PopFront();
GTime recv_time(p->mRecvTime);
{
GLensReadHolder _lck(this);
if (++mPacketCount % 1000 == 0)
Stamp(FID());
}
XrdXrootdMonHeader *xmh = (XrdXrootdMonHeader*) p->mBuff;
Char_t code = xmh->code;
UChar_t pseq = xmh->pseq;
UShort_t plen = ntohs(xmh->plen);
Int_t stod = ntohl(xmh->stod);
UInt_t in4a = p->Ip4AsUInt();
UShort_t port = p->mPort;
SXrdServerId xsid(in4a, stod, port);
xrd_hash_i xshi;
bool server_not_known;
{
GMutexHolder _lck(m_xrd_servers_mutex);
xshi = m_xrd_servers.find(xsid);
server_not_known = (xshi == m_xrd_servers.end());
}
ZLog::Helper log(*mLog, recv_time, ZLog::L_Info, _eh);
XrdServer *server = 0;
XrdDomain *domain = 0;
if (server_not_known)
{
sockaddr_in sa4;
sockaddr_in6 sa6;
sockaddr *sa = 0;
socklen_t sl;
if (p->mAddrLen == 4)
{
sa4.sin_family = AF_INET;
memcpy(&sa4.sin_addr.s_addr, p->mAddr, p->mAddrLen);
sa = (sockaddr*) &sa4;
sl = sizeof(sa4);
}
else
{
sa6.sin6_family = AF_INET6;
memcpy(sa6.sin6_addr.s6_addr, p->mAddr, p->mAddrLen);
sa = (sockaddr*) &sa6;
sl = sizeof(sa6);
}
Char_t hn_buf[64];
getnameinfo((sockaddr*) sa, sl, hn_buf, 64, 0, 0, NI_DGRAM);
TString fqhn(hn_buf);
fqhn.ToLower();
if (hostname_re.Match(fqhn) != 3)
{
char *foo = (char*) &in4a;
log.Form(ZLog::L_Error, "New server NS lookup problem: %hhu.%hhu.%hhu.%hhu:%hu, fqdn='%s'.",
foo[0], foo[1], foo[2], foo[3], port, hn_buf);
continue;
}
log.Form(ZLog::L_Message, "New server: %s.%s:%hu stod=%d",
hostname_re[1].Data(), hostname_re[2].Data(), port, stod);
server = new XrdServer(GForm("%s.%s : %d : %hu", hostname_re[1].Data(), hostname_re[2].Data(), stod, port),
"", hostname_re[1], hostname_re[2], GTime(stod));
server->m_server_id = xsid;
domain = static_cast<XrdDomain*>(GetElementByName(server->GetDomain()));
if (!domain)
{
domain = new XrdDomain(server->GetDomain());
mQueen->CheckIn(domain);
{
GLensWriteHolder _lck(this);
Add(domain);
}
}
mQueen->CheckIn(server);
{
GLensWriteHolder _lck(domain);
domain->Add(server);
}
{
GMutexHolder _lck(m_xrd_servers_mutex);
xshi = m_xrd_servers.insert(make_pair(xsid, server)).first;
}
}
else
{
server = xshi->second;
domain = static_cast<XrdDomain*>(GetElementByName(server->GetDomain()));
}
{
GLensReadHolder _lck(server);
server->IncPacketCount();
server->SetLastMsgTime(recv_time);
}
{
GLensReadHolder _lck(domain);
domain->IncPacketCount();
}
if (code == 'u' || code == 'd' || code == 't' || code == 'i' || code == 'r')
{
if (server->IsSrvSeqInited())
{
UChar_t srv_seq = server->IncAndGetSrvSeq();
if (pseq != srv_seq)
{
log.Form(ZLog::L_Warning, "Sequence-id mismatch at '%s' srv=%hhu, msg=%hhu; code=%c. Ignoring.",
server->GetName(), srv_seq, pseq, code);
server->InitSrvSeq(pseq);
{
GLensReadHolder _lck(server);
server->IncSeqIdFailCount();
}
{
GLensReadHolder _lck(domain);
domain->IncSeqIdFailCount();
}
{
GLensReadHolder _lck(this);
++mSeqIdFailCount;
Stamp(FID());
}
}
}
else
{
server->InitSrvSeq(pseq);
}
}
if (p->mBuffLen != plen)
{
log.Form(ZLog::L_Warning, "Message size mismatch: got %zd, xrd-len=%hu.", p->mBuffLen, plen);
continue;
}
if (code != 't' && code != 'r')
{
TString msg;
msg.Form("Message from %s.%s:%hu, c=%c, seq=%hhu, len=%hu",
server->GetHost(), server->GetDomain(), port,
xmh->code, pseq, plen);
XrdXrootdMonMap *xmm = (XrdXrootdMonMap*) p->mBuff;
Int_t dict_id = ntohl(xmm->dictid);
(p->mBuff)[plen] = 0;
char *prim = xmm->info;
char *sec = strstr(prim, "\n");
if (sec) {
*(sec++) = 0;
}
if (code == 'u')
{
msg += TString::Format("\n\tUser map -- id=%d, uname=%s", dict_id, prim);
TString uname(prim), host, domain;
Bool_t numeric_host = false;
{
if (username_re.Match(uname) != 5)
{
msg += " ... parse error.";
log.Put(ZLog::L_Error, msg);
continue;
}
if (ip4addr_re.Match(username_re[4]) == 3)
{
msg += TString::Format("@%s", server->GetDomain());
host = ip4addr_re[0];
domain = server->RefDomain();
numeric_host = true;
}
else if (hostname_re.Match(username_re[4]) == 3)
{
host = hostname_re[1];
domain = hostname_re[2];
}
else
{
msg += TString::Format(".%s", server->GetDomain());
host = username_re[4];
domain = server->RefDomain();
}
if (username_re[1] == mNagiosUser && host.BeginsWith(mNagiosHost) && domain.BeginsWith(mNagiosDomain))
{
continue;
}
{
XrdUser *xu = server->FindUser(uname);
if (xu != 0)
{
msg += "\n\tUsername was already taken -- deleting old user!";
disconnect_user_and_close_open_files(xu, server, recv_time);
}
}
if (server->ExistsUserDictId(dict_id))
{
msg += "\n\tUser dict_id already taken ... this session will not be tracked.";
log.Put(ZLog::L_Warning, msg);
continue;
}
}
XrdUser *user = 0;
try
{
TString dn;
{
TString group;
TPMERegexp *a_rep = 0;
if (authinfo_re.Match(sec) == 8)
{
group = authinfo_re[6];
dn = authinfo_re[7];
a_rep = &authinfo_re;
}
else if (authxxxx_re.Match(sec) == 6)
{
group = "<unknown>";
dn = "<unknown>";
a_rep = &authxxxx_re;
}
else
{
msg += GForm("\n\tUnparsable auth-info: '%s'", sec);
}
if (a_rep)
{
TPMERegexp &a_re = *a_rep;
msg += GForm("\n\tDN=%s, VO=%s, Role=%s, Group=%s",
dn.Data(), a_re[4].Data(), a_re[5].Data(), group.Data());
user = new XrdUser(uname, "", dn, a_re[4], a_re[5], group,
a_re[2], host, domain, numeric_host, recv_time);
}
else
{
user = new XrdUser(uname, "", "", "", "", "",
"", host, domain, numeric_host, recv_time);
}
mQueen->CheckIn(user);
}
{
GLensWriteHolder _lck(server);
server->AddUser(user, dict_id);
}
{
GLensWriteHolder _lck(user);
user->SetServer(server);
if ( ! bTraceAllNull && mTraceDN_RE.Match(dn) &&
mTraceHost_RE.Match(host) && mTraceDomain_RE.Match(domain))
{
user->SetTraceMon(true);
}
}
}
catch (Exc_t exc)
{
msg += "\n\tException caught while instantiating XrdUser:\n\t" + exc;
log.Put(ZLog::L_Error, msg);
continue;
}
}
else if (code == 'd')
{
TString uname(prim);
TString path (sec);
msg += TString::Format("\n\tPath map -- id=%d, uname=%s path=%s",
dict_id, uname.Data(), path.Data());
XrdUser *user = server->FindUser(uname);
if (user)
{
if (server->ExistsFileDictId(dict_id))
{
msg += "\n\tFile dict_id already taken ... this file will not be tracked.";
log.Put(ZLog::L_Warning, msg);
continue;
}
try
{
XrdFile *file = new XrdFile(path);
mQueen->CheckIn(file);
{
GLensWriteHolder _lck(user);
user->AddFile(file);
user->SetLastMsgTime(recv_time);
}
{
GLensWriteHolder _lck(file);
file->SetUser(user);
file->SetOpenTime(recv_time);
file->SetLastMsgTime(recv_time);
}
{
GLensWriteHolder _lck(server);
server->AddFile(file, dict_id);
}
on_file_open(file);
}
catch (Exc_t exc)
{
msg += "\n\tException caught while instantiating XrdFile:\n\t" + exc;
log.Put(ZLog::L_Error, msg);
continue;
}
}
else
{
if ( ! uname.BeginsWith(mNagiosUser))
{
msg += "\n\tUser not found ... skipping.";
log.Put(ZLog::L_Warning, msg);
}
continue;
}
}
else if (code == 'i')
{
TString uname(prim);
TString info (sec);
msg += TString::Format("\n\tInfo map -- id=%d, uname=%s info=%s",
dict_id, uname.Data(), info.Data());
XrdUser *user = server->FindUser(uname);
if (user)
{
GLensWriteHolder _lck(user);
user->AppendAppInfo(info);
user->SetLastMsgTime(recv_time);
}
else
{
msg += "\n\tUser not found ... skipping.";
log.Put(ZLog::L_Warning, msg);
}
}
else if (code == '=')
{
GLensWriteHolder _lck(server);
server->UpdateSrvIdTime(recv_time);
msg += TString::Format("\n\tServerId -- uname=%s other=%s", prim, sec);
}
else
{
msg += TString::Format("\n\tOther %c -- id=%u, uname=%s other=%s", code, dict_id, prim, sec);
}
log.Put(msg);
}
else if (code == 't')
{
struct local_cache
{
XrdServer *fSrv;
XrdXrootdMonBuff *fXmb;
XrdFile *fFile;
Int_t fDictId;
Int_t fTi, fTiWEnd;
const Int_t fN;
GTime fTime;
Double_t fTimeStep;
local_cache(XrdServer* s, XrdXrootdMonBuff* b, Int_t plen) :
fSrv(s), fXmb(b),
fFile(0), fDictId(0),
fTi(-1), fTiWEnd(-1),
fN((plen - sizeof(XrdXrootdMonHeader)) / sizeof(XrdXrootdMonTrace))
{}
XrdXrootdMonTrace& trace() { return fXmb->info[fTi]; }
XrdXrootdMonTrace& trace(int idx) { return fXmb->info[idx]; }
UChar_t trace_type() { return trace().arg0.id[0]; }
UChar_t trace_type(int idx) { return trace(idx).arg0.id[0]; }
Int_t full_delta_time()
{
return ntohl(trace(fN-1).arg1.Window) - ntohl(trace(0).arg2.Window);
}
Bool_t next(TString& log_msg, Bool_t verbose=false)
{
if (++fTi < fN)
{
if (trace_type() == XROOTD_MON_WINDOW)
{
fTiWEnd = fTi + 1;
while (fTiWEnd < fN && trace(fTiWEnd).arg0.id[0] != XROOTD_MON_WINDOW)
++fTiWEnd;
if (fTiWEnd >= fN)
return false;
Int_t n_div = fTiWEnd - fTi - 2;
fTime = GTime(ntohl(trace().arg2.Window));
fTimeStep = (n_div >= 1) ?
(GTime(ntohl(trace(fTiWEnd).arg1.Window)) - fTime).ToDouble() / n_div :
0;
if (verbose)
log_msg += GForm("\n\tWindow iB=%2d iE=%2d N=%2d delta_t=%f -- start %s.",
fTi, fTiWEnd, fTiWEnd-fTi-1, fTimeStep, fTime.ToDateTimeLocal(false).Data());
++fTi;
}
else
{
fTime += fTimeStep;
}
return true;
}
return false;
}
XrdFile* update(Int_t newid)
{
if (newid != fDictId)
{
fFile = fSrv->FindFile(newid);
fDictId = newid;
}
return fFile;
}
const char* trace_type_name()
{
const char *c;
switch (trace_type())
{
case XROOTD_MON_APPID: c = "apm"; break;
case XROOTD_MON_CLOSE: c = "cls"; break;
case XROOTD_MON_DISC: c = "dis"; break;
case XROOTD_MON_OPEN: c = "opn"; break;
case XROOTD_MON_WINDOW: c = "win"; break;
default: c = "rdw"; break;
}
return c;
}
XrdUser* find_user()
{
for (Int_t i = 1; i < fN; ++i)
{
XrdXrootdMonTrace &xmt = trace(i);
UChar_t tt = trace_type(i);
if (tt <= 0x7F || tt == XROOTD_MON_OPEN || tt == XROOTD_MON_CLOSE)
{
update(ntohl(xmt.arg2.dictid));
if (fFile) return fFile->GetUser();
}
else if (tt == XROOTD_MON_DISC)
{
return fSrv->FindUser(ntohl(xmt.arg2.dictid));
}
}
return 0;
}
};
local_cache lc(server, (XrdXrootdMonBuff*) p->mBuff, plen);
XrdUser *us = lc.find_user();
Bool_t vrb = us && us->GetTraceMon();
TString msg, msg_vrb;
while (lc.next(msg_vrb, vrb))
{
Int_t ti = lc.fTi;
XrdXrootdMonTrace &xmt = lc.trace();
UChar_t tt = lc.trace_type();
const Char_t *ttn = lc.trace_type_name();
XrdFile *file = 0;
if (tt <= 0x7F)
{
Int_t dict_id = ntohl(xmt.arg2.dictid);
file = lc.update(dict_id);
if (file)
{
us = file->GetUser();
Int_t rwlen = ntohl(xmt.arg1.buflen);
GLensReadHolder _lck(file);
if (rwlen >= 0)
{
file->AddReadSample ( rwlen / One_MB);
}
else
{
file->AddWriteSample(-rwlen / One_MB);
}
file->SetLastMsgTime(lc.fTime);
}
}
else if (tt == XROOTD_MON_READV)
{
Int_t dict_id = ntohl(xmt.arg2.dictid);
file = lc.update(dict_id);
if (file)
{
us = file->GetUser();
Int_t rlen = ntohl(xmt.arg1.buflen);
Int_t nels = ntohs(xmt.arg0.sVal[1]);
GLensReadHolder _lck(file);
file->AddVecReadSample(rlen / One_MB, nels);
file->SetLastMsgTime(lc.fTime);
}
}
else if (tt == XROOTD_MON_OPEN || tt == XROOTD_MON_CLOSE)
{
Int_t dict_id = ntohl(xmt.arg2.dictid);
file = lc.update(dict_id);
if (vrb) msg_vrb += GForm("\n\t%2d: %s, file='%s'", ti, ttn, file ? file->GetName() : "<nil>");
if (file)
{
us = file->GetUser();
if (tt == XROOTD_MON_OPEN)
{
msg += GForm("\n\tOpen file='%s'", file ? file->GetName() : "<nil>");
union { Long64_t val; UChar_t id[8]; } jebo;
jebo.val = xmt.arg0.val;
jebo.id[0] = 0;
GLensReadHolder _lck(file);
file->SetSizeMB(net2host(jebo.val) / One_MB);
if (lc.fTime < file->RefOpenTime())
{
file->SetOpenTime(lc.fTime);
}
file->SetLastMsgTime(lc.fTime);
}
else
{
{
GLensReadHolder _lck(file);
file->SetLastMsgTime(lc.fTime);
ULong64_t x;
x = ntohl(xmt.arg0.rTot[1]);
x <<= xmt.arg0.id[1];
file->SetRTotalMB(x / One_MB);
x = ntohl(xmt.arg1.wTot);
x <<= xmt.arg0.id[2];
file->SetWTotalMB(x / One_MB);
file->SetCloseTime(lc.fTime);
}
msg += GForm("\n\tClose file='%s'", file ? file->GetName() : "<nil>");
{
GLensReadHolder _lck(server);
try
{
server->RemoveFile(file);
}
catch (Exc_t exc)
{
if (*mLog)
mLog->Put(ZLog::L_Error, _eh, exc);
}
}
on_file_close(file, us, server);
}
}
}
else if (tt == XROOTD_MON_DISC)
{
Int_t dict_id = ntohl(xmt.arg2.dictid);
XrdUser *us_from_server = server->FindUser(dict_id);
if (us != us_from_server)
{
log.Form(ZLog::L_Warning, _eh + "us != us_from_server: us=%p ('%s'), us_from_server=%p ('%s')",
us, us ? us->GetName() : "",
us_from_server, us_from_server ? us_from_server->GetName() : "");
if (us_from_server) {
us = us_from_server;
}
}
bool disconn_p = true;
TString extra;
if (xmt.arg0.id[1] & XROOTD_MON_FORCED) {
extra += "(forced)";
}
if (xmt.arg0.id[1] & XROOTD_MON_BOUNDP) {
disconn_p = false;
extra += "(bound-path)";
}
msg += GForm("\n\tDisconnect%s user='%s'", extra.Data(), us ? us->GetName() : "<nil>");
if (vrb) msg_vrb += GForm("\n\t%2d: %s%s, user=%s", ti, ttn, extra.Data(), us ? us->GetName() : "<nil>");
if (disconn_p && us)
{
disconnect_user_and_close_open_files(us, server, lc.fTime);
}
}
}
if ( ! msg.IsNull() || vrb)
{
TString txt;
txt.Form("Trace from %s.%s:%hu, user='%s', N=%d, dt=%d, seq=%hhu, len=%hu.",
server->GetHost(), server->GetDomain(), port, us ? us->GetName() : "<nil>",
lc.fN, lc.full_delta_time(), pseq, plen);
txt += msg;
if (vrb) txt += msg_vrb;
log.Put(txt);
}
if (us)
{
GLensReadHolder _lck(us);
us->SetLastMsgTime(lc.fTime);
}
}
else if (code == 'r')
{
XrdXrootdMonBurr *rb = (XrdXrootdMonBurr*) p->mBuff;
TString txt;
txt.Form("Redirect trace from %s.%s:%hu, seq=%hhu, len=%hu.",
server->GetHost(), server->GetDomain(), port, pseq, plen);
if (false)
{
int rb_to_read = plen - sizeof(XrdXrootdMonHeader) - sizeof(kXR_int64);
int i = 0;
int prev_win = 0;
XrdXrootdMonRedir *rr = rb->info;
while (rb_to_read > 0)
{
int len = 8 * (rr->arg0.Dent + 1);
txt += TString::Format("\n %3d - 0x%02hhx len=%3d: ",
i, rr->arg0.Type, len);
if (rr->arg0.Type == 0)
{
txt += TString::Format("window prev_len=%d, start=%d",
ntohl(rr->arg0.Window), ntohl(rr->arg1.Window));
}
else
{
UInt_t uid = ntohl(rr->arg1.dictid);
XrdUser *user = server->FindUser(uid);
txt += TString::Format("uid=%u, %s\n %s",
uid, user ? user->GetName() : "<unknown>",
(const char*)(&rr->arg1.dictid) + 4);
}
rr = (XrdXrootdMonRedir*) ((char*) rr + len);
rb_to_read -= len;
++i;
}
log.Put(ZLog::L_Fatal, txt);
}
else
{
log.Put(txt);
}
}
p->DecRefCount();
}
}
void XrdMonSucker::StartSucker()
{
static const Exc_t _eh("XrdMonSucker::StartSucker ");
{
GLensReadHolder _lck(this);
if (mSuckerThread)
throw _eh + "already running.";
mSuckerThread = new GThread("XrdMonSucker-Sucker",
(GThread_foo) tl_Suck, this, false);
mCheckerThread = new GThread("XrdMonSucker-Checker",
(GThread_foo) tl_Check, this, false);
}
mSuckerThread->SetNice(0);
mSuckerThread->Spawn();
mLastOldUserCheck = mLastDeadUserCheck =
mLastDeadServCheck = mLastIdentServCheck = GTime::ApproximateTime();
mCheckerThread->SetNice(20);
mCheckerThread->Spawn();
mSource->RegisterConsumer(&mUdpQueue);
{
GLensReadHolder _lck(this);
Stamp(FID());
}
}
void XrdMonSucker::StopSucker()
{
static const Exc_t _eh("XrdMonSucker::StopSucker ");
GThread *thr = 0;
{
GLensReadHolder _lck(this);
if ( ! GThread::IsValidPtr(mSuckerThread))
throw _eh + "not running.";
thr = mSuckerThread;
GThread::InvalidatePtr(mSuckerThread);
}
mSource->UnregisterConsumer(&mUdpQueue);
mCheckerThread->Cancel();
mCheckerThread->Join();
delete mCheckerThread;
thr->Cancel();
thr->Join();
delete thr;
close(mSocket);
{
GLensReadHolder _lck(this);
mSocket = 0;
mSuckerThread = 0;
mCheckerThread = 0;
}
}
void* XrdMonSucker::tl_Check(XrdMonSucker* s)
{
s->Check();
s->mCheckerThread = 0;
return 0;
}
void XrdMonSucker::Check()
{
static const Exc_t _eh("XrdMonSucker::Check ");
while (true)
{
GTime now = GTime::ApproximateTime();
{
bool stamp_p = false;
GLensReadHolder _lck(this);
if ((now - mLastOldUserCheck).GetSec() > mUserKeepSec)
{
mSaturn->ShootMIR( S_CleanUpOldUsers() );
mLastOldUserCheck = now;
stamp_p = true;
}
if ((now - mLastDeadUserCheck).GetSec() > mUserDeadSec/100)
{
mSaturn->ShootMIR( S_CleanUpDeadUsers() );
mLastDeadUserCheck = now;
stamp_p = true;
}
if ((now - mLastDeadServCheck).GetSec() > mServDeadSec/100)
{
mSaturn->ShootMIR( S_CleanUpDeadServers() );
mLastDeadServCheck = now;
stamp_p = true;
}
if ((now - mLastIdentServCheck).GetSec() > mServIdentSec)
{
mSaturn->ShootMIR( S_CleanUpNoIdentServers() );
mLastIdentServCheck = now;
stamp_p = true;
}
if (stamp_p)
{
Stamp(FID());
}
}
GTime::SleepMiliSec(30000);
}
}
void XrdMonSucker::CleanUpOldUsers()
{
static const Exc_t _eh("XrdMonSucker::CleanUpOldUsers ");
assert_MIR_presence(_eh, ZGlass::MC_IsDetached);
GTime now = GTime::ApproximateTime();
ZLog::Helper log(*mLog, now, ZLog::L_Message, _eh);
list<XrdDomain*> domains;
CopyListByGlass<XrdDomain>(domains);
for (list<XrdDomain*>::iterator di = domains.begin(); di != domains.end(); ++di)
{
XrdDomain *d = *di;
list<XrdServer*> servers;
d->CopyListByGlass<XrdServer>(servers, false, true);
Int_t n_wiped = 0;
for (list<XrdServer*>::iterator si = servers.begin(); si != servers.end(); ++si)
{
XrdServer *s = *si;
list<XrdUser*> users;
s->GetPrevUsers()->CopyListByGlass<XrdUser>(users);
for (list<XrdUser*>::iterator ui = users.begin(); ui != users.end(); ++ui)
{
XrdUser *u = *ui;
Int_t delta;
{
GLensReadHolder _lck(u);
delta = (Int_t) (now - u->RefDisconnectTime()).GetSec();
}
if (delta > mUserKeepSec)
{
++n_wiped;
mQueen->RemoveLens(u);
}
else
{
break;
}
}
s->DecEyeRefCount();
}
if (n_wiped > 0)
{
log.SetTime(GTime::ApproximateTime());
log.Form("Removed %d previous users for domain '%s'.", n_wiped, d->GetName());
}
}
}
void XrdMonSucker::CleanUpDeadUsers()
{
static const Exc_t _eh("XrdMonSucker::CleanUpDeadUsers ");
assert_MIR_presence(_eh, ZGlass::MC_IsDetached);
GTime now = GTime::ApproximateTime();
ZLog::Helper log(*mLog, now, ZLog::L_Message, _eh);
list<XrdDomain*> domains;
CopyListByGlass<XrdDomain>(domains);
for (list<XrdDomain*>::iterator di = domains.begin(); di != domains.end(); ++di)
{
XrdDomain *d = *di;
list<XrdServer*> servers;
d->CopyListByGlass<XrdServer>(servers, false, true);
Int_t n_wiped = 0;
for (list<XrdServer*>::iterator si = servers.begin(); si != servers.end(); ++si)
{
XrdServer *s = *si;
list<XrdUser*> users;
s->CopyListByGlass<XrdUser>(users);
for (list<XrdUser*>::iterator ui = users.begin(); ui != users.end(); ++ui)
{
XrdUser *u = *ui;
Int_t delta;
{
GLensReadHolder _lck(u);
delta = (Int_t) (now - u->RefLastMsgTime()).GetSec();
}
if (delta > mUserDeadSec)
{
++n_wiped;
disconnect_user_and_close_open_files(u, s, now);
}
}
s->DecEyeRefCount();
}
if (n_wiped > 0)
{
log.SetTime(GTime::ApproximateTime());
log.Form("Removed %d dead users for domain '%s'.", n_wiped, d->GetName());
}
}
}
void XrdMonSucker::CleanUpDeadServers()
{
static const Exc_t _eh("XrdMonSucker::CleanUpDeadServers ");
assert_MIR_presence(_eh, ZGlass::MC_IsDetached);
GTime now = GTime::ApproximateTime();
ZLog::Helper log(*mLog, now, ZLog::L_Message, _eh);
list<XrdDomain*> domains;
CopyListByGlass<XrdDomain>(domains);
for (list<XrdDomain*>::iterator di = domains.begin(); di != domains.end(); ++di)
{
XrdDomain *domain = *di;
list<XrdServer*> servers;
domain->CopyListByGlass<XrdServer>(servers, false, true);
for (list<XrdServer*>::iterator si = servers.begin(); si != servers.end(); ++si)
{
XrdServer *server = *si;
Int_t delta;
{
GLensReadHolder _lck(server);
delta = (Int_t) (now - server->RefLastMsgTime()).GetSec();
}
if (delta > mServDeadSec)
{
log.SetTime(GTime::ApproximateTime());
log.Form("Removing unactive server '%s'.", server->GetName());
disconnect_server(server, domain, now);
}
server->DecEyeRefCount();
}
}
}
void XrdMonSucker::CleanUpNoIdentServers()
{
static const Exc_t _eh("XrdMonSucker::CleanUpNoIdentServers ");
assert_MIR_presence(_eh, ZGlass::MC_IsDetached);
GTime now = GTime::ApproximateTime();
ZLog::Helper log(*mLog, now, ZLog::L_Message, _eh);
list<XrdDomain*> domains;
CopyListByGlass<XrdDomain>(domains);
for (list<XrdDomain*>::iterator di = domains.begin(); di != domains.end(); ++di)
{
XrdDomain *domain = *di;
list<XrdServer*> servers;
domain->CopyListByGlass<XrdServer>(servers, false, true);
for (list<XrdServer*>::iterator si = servers.begin(); si != servers.end(); ++si)
{
XrdServer *server = *si;
Int_t ident_delta, delta;
{
GLensReadHolder _lck(server);
ident_delta = server->GetAvgSrvIdDelta();
if (ident_delta <= 0) goto done;
delta = (Int_t) (now - server->RefLastMsgTime()).GetSec();
}
if (delta > mServIdentCnt * ident_delta)
{
log.SetTime(GTime::ApproximateTime());
log.Form("Removing unactive server '%s'.", server->GetName());
disconnect_server(server, domain, now);
}
done:
server->DecEyeRefCount();
}
}
}
void XrdMonSucker::EmitTraceRERay()
{
bTraceAllNull = mTraceDN.IsNull() && mTraceHost.IsNull() && mTraceDomain.IsNull();
if ( ! bTraceAllNull)
{
mTraceDN_RE .Reset(mTraceDN, "o");
mTraceHost_RE .Reset(mTraceHost, "o");
mTraceDomain_RE.Reset(mTraceDomain, "o");
}
}