#include "AEVDistAnRep.h"
#include "AEVEventBatch.h"
#include "AEVJobRep.h"
#include "AEVDemoDriver.h"
#include "AEVDistAnRep.c7"
#include "AEVSite.h"
#include "AEVSiteViz.h"
#include "AEVProcessViz.h"
#include <Glasses/ZQueen.h>
#include <Glasses/Eventor.h>
#include <Stones/ZComet.h>
#include <TPRegexp.h>
#include <TMath.h>
ClassImp(AEVDistAnRep);
void AEVDistAnRep::_init()
{
  mConnectTime = 2.5;
  mRotTime     = 2;
  mTravelTime  = 1.25;
  mWaitTime    = 0.05;
  mProcDuration = 0;
  mReplayDuration = 60;
  mReplayMaxWait  = 5;
  bInReplay       = false;
}
void AEVDistAnRep::AdEnlightenment()
{
  PARENT_GLASS::AdEnlightenment();
  if(mSites == 0) {
    assign_link<ZNameMap>(mSites, FID(), "Sites", GForm("Containter of %s", GetName()));
    mSites->SetElementFID(AEVSite::FID());
    mSites->SetMIRActive(false);
  }
  if(mEvBatches == 0) {
    assign_link<ZNameMap>(mEvBatches, FID(), "EventBatches", GForm("Containter of %s", GetName()));
    mEvBatches->SetElementFID(AEVEventBatch::FID());
    mEvBatches->SetMIRActive(false);
  }
  if(mProcVizes == 0) {
    assign_link<ZNameMap>(mProcVizes, FID(), "ProcVizes", GForm("Containter of %s", GetName()));
    mProcVizes->SetElementFID(AEVProcessViz::FID());
    mProcVizes->SetMIRActive(false);
  }
}
void AEVDistAnRep::AddEventBatch(AEVEventBatch* evb)
{
  Add(evb);
  mEvBatches->Add(evb);
}
Int_t AEVDistAnRep::create_evbatches(map<string, int>& cmap)
{
  map<string, int>::iterator i = cmap.begin();
  int nres      = cmap.size();
  int totevents = 0;
  float dx = 1.5, x = -dx*nres/2, y = 2;
  while(i != cmap.end()) {
    Int_t events = i->second;
    printf("%-12s %d\n", i->first.c_str(), events);
    totevents += events;
    AEVEventBatch* eb = new AEVEventBatch(i->first.c_str(), GForm("EvBatch for %s", i->first.c_str()));
    eb->Reinit(events);
    eb->SetNWorkers(-1);
    eb->SetDataSizeMB(-1);
    mQueen->CheckIn(eb);
    AddEventBatch(eb);
    bool placed = false;
    if(mDemoDriver->GetMapViz() != 0) {
      placed = place_evbatch_on_map(eb);
    }
    if(!placed) {
      eb->SetPos(x, y, 0); x += dx;
    }
    ++i;
  }
  printf("Sites=%zd Events=%d\n", cmap.size(), totevents);
  return totevents;
}
void AEVDistAnRep::fix_evbatches()
{
  
}
Bool_t AEVDistAnRep::place_evbatch_on_map(AEVEventBatch* eb)
{
  AEVSiteViz* sv = mDemoDriver->GetMapViz()->FindSiteViz(eb->GetName());
  if(sv == 0) return false;
  auto_ptr<ZTrans> t( ZNode::BtoA(this, sv) );
  if(t.get() == 0) return false;
  Double_t s[3];
  t->Unscale(s[0], s[1], s[2]);
  eb->SetTrans(*t);
  eb->SetScales(s[0], s[1], s[2]);
  return true;
}
void AEVDistAnRep::ResetJob()
{
  mBatchState.Reinit(mBatchState.GetNAll());
  mEventState.Reinit(mEventState.GetNAll());
  Stepper<AEVEventBatch> s(*mEvBatches);
  while(s.step())
    s->Reinit();
  mProcVizes->ClearList();
}
void AEVDistAnRep::ResetDistAn()
{
  ClearList();
  mSites->ClearList();
  mEvBatches->ClearList();
  mProcVizes->ClearList();
  mDemoDriver->GetMapViz()->ClearSiteVizes();
  mDemoDriver->GetTheatre()->ClearAmphitheatre();
  mBatchState.Reinit(0);
  mEventState.Reinit(0);
  Stamp(FID());
}
void AEVDistAnRep::InitJob(AEVJobRep* job)
{
  static const Exc_t _eh("AEVDistAnRep::InitJob ");
  if(job == 0 && mJobId == "")
    throw(_eh + "null argument.");
  if(job != 0) {
    mJobId   = job->GetName();
    mJobName = job->GetJobname();
  }
  Int_t total_entries = 0;
  {
    GLensWriteHolder wrlck(this);
    ResetDistAn();
  }
  AEVMlClient* mona_client = mDemoDriver->GetMonaClient();
  mona_client->SetCluster("ALIEN_QUERY");
  mona_client->SetNode(mJobId);
  mona_client->SetParam("sitelist");
  mona_client->SetFLSort(AEVMlClient::SM_Ascending);
  lMonaEntry_t l;
  mona_client->GetFLValues(l);
  if(l.empty())    throw(_eh + "empty sitelist.");
  if(l.size() > 1) throw(_eh + "got more than one result for sitelist.");
  
  mQueryTime = l.front().fTime;
  TPMERegexp sites("\\s*:|,\\s*");
  Int_t n_sites = sites.Split(l.front().fValue);
  printf("n_sites=%d: %s\n", n_sites, l.front().fValue.Data());
  for(int s=0; s<n_sites; ++s) {
    mona_client->SetParam(sites[s] + ":param");
    l.clear();
    mona_client->GetFLValues(l);
    if(l.empty()) {
      printf("no data for %s\n", sites[s].Data());
      continue;
    }
    if(l.size() > 1) {
      printf("multiple entries for %s, using first\n", sites[s].Data());
    }
    TPMERegexp params("\\s*,\\s*");
    Int_t n_params = params.Split(l.front().fValue);
    printf("  site=%s, n_params=%d: %s\n", sites[s].Data(), n_params,
	   l.front().fValue.Data());
    AEVSite*       site    = new AEVSite(sites[s]);
    AEVEventBatch* evbatch = new AEVEventBatch(sites[s], GForm("EvBatch for %s", sites[s].Data()));
    TPMERegexp keyval("\\s*:\\s*");
    for(int p=0; p<n_params; ++p) {
      Int_t n = keyval.Split(params[p]);
      if(n != 2) {
	printf("split problems with %s\n", params[p].Data());
	continue;
      }
      printf("    '%s' = '%s'\n", keyval[0].Data(), keyval[1].Data());
      if(keyval[0] == "location") {
	site->SetLocation(keyval[1]);
      }
      
      else if(keyval[0] == "longitude") {
	site->SetLatitude(atof(keyval[1]));
      }
      else if(keyval[0] == "latitude") {
	site->SetLongitude(atof(keyval[1]));
      }
      else if(keyval[0] == "entries") {
	Int_t ent = atoi(keyval[1]);
	total_entries += ent;
	evbatch->Reinit(ent);
      }
      else if(keyval[0] == "datasize") {
	Double_t siz = atol(keyval[1])/1024.0/1024.0;
	evbatch->SetDataSizeMB(siz);
      }
      else {
	printf("unknown paramater '%s'\n", keyval[0].Data());
      }
    }
    GLensWriteHolder wrlck(this);
    mQueen->CheckIn(site);    mSites->Add(site);
    mQueen->CheckIn(evbatch); AddEventBatch(evbatch);
    if(mDemoDriver->GetMapViz()->ImportSite(site) == false)
      ISerr(_eh + "site '" + sites[s] + "' not placed on map.");
    place_evbatch_on_map(evbatch);
  }
  GLensWriteHolder wrlck(this);
  mBatchState.Reinit(n_sites);
  mEventState.Reinit(total_entries);
  Stamp(FID());
}
namespace
{
struct SiteData
{
  const char* name;
  const char* location;
  float       longitude;
  float       latitude;
};
SiteData SiteDatas[] = {
  { "Muenster", "Muenster", 7.633, 51.967 },
  { "ITEP", "Moscow", 37.42, 55.45 },
  
  { "RMKI", "Budapest", 19.5, 47.30 },
  { "CERN", "Geneva", 6.15, 46.217 },
  { "CNAF", "Bologna", 11.333, 44.5 },
  { "Bari", "Bari", 16.867, 41.117 },
  { "Catania", "Catania", 15.1, 37.517 },
  { "Prague", "Prague", 14.26, 50.10 },
  { "FZK", "Karlsruhe", 8.4, 49.05 },
  { "Houston", "Houston", -95.33, 29.7500 },
  { "PNPI", "St. Petersburg", 30.2500, 58.8833 },
};
Int_t SiteDatasN = 11;
}
void AEVDistAnRep::FakeInitJob(AEVJobRep* job)
{
  static const Exc_t _eh("AEVDistAnRep::FakeInitJob ");
  if(job == 0 && mJobId == "")
    throw(_eh + "null argument.");
  if(job != 0) {
    mJobId   = job->GetName();
    mJobName = job->GetJobname();
  }
  Int_t total_entries = 0;
  {
    GLensWriteHolder wrlck(this);
    ResetDistAn();
  }
  TRandom gen(0);
  lMonaEntry_t l;
  Int_t n_sites = SiteDatasN;
  for(int s=0; s<n_sites; ++s) {
    AEVSite*       site    = new AEVSite(SiteDatas[s].name);
    AEVEventBatch* evbatch = new AEVEventBatch
      (SiteDatas[s].name, GForm("EvBatch for %s", SiteDatas[s].name));
    site->SetLocation(SiteDatas[s].location);
    site->SetLatitude(SiteDatas[s].latitude);
    site->SetLongitude(SiteDatas[s].longitude);
    {
      Int_t ent = Int_t(13 + 47*gen.Rndm());
      total_entries += ent;
      evbatch->Reinit(ent);
      Double_t siz = ent*100*(1 + 0.4*(gen.Rndm()-0.5));
      evbatch->SetDataSizeMB(siz);
      evbatch->SetProcAvg(TMath::Nint(2 + 2*gen.Rndm()));
    }
    GLensWriteHolder wrlck(this);
    mQueen->CheckIn(site);    mSites->Add(site);
    mQueen->CheckIn(evbatch); AddEventBatch(evbatch);
    if(mDemoDriver->GetMapViz()->ImportSite(site) == false)
      ISerr(_eh + "site '" + SiteDatas[s].name + "' not placed on map.");
    place_evbatch_on_map(evbatch);
  }
  GLensWriteHolder wrlck(this);
  mBatchState.Reinit(n_sites);
  mEventState.Reinit(total_entries);
  Stamp(FID());
}
void AEVDistAnRep::StudyJobHistory(Bool_t dump_p)
{
  static const Exc_t _eh("AEVDistAnRep::StudyJobHistory ");
  if(mJobId == "")
    throw(_eh + "no job specified.");
  AEVMlClient* mona_client = mDemoDriver->GetMonaClient();
  mona_client->SetCluster("ALIEN_PROCESS");
  mona_client->SetNode(mJobId);
  mona_client->SetParam("*");
  mona_client->SetFromHrs(mQueryTime.TimeUntilNow().ToDouble()/3600);
  mona_client->SetToHrs(0);
  mHistory.clear();
  mona_client->GetValues(mHistory);
  if(mHistory.empty() == false) {
    mProcStart    = mHistory.front().fTime;
    mProcEnd      = mHistory.back().fTime;
    mProcDuration = (mProcEnd - mProcStart).ToDouble();
  } else {
    mProcStart    = 0l;
    mProcEnd      = 0l;
    mProcDuration = 0;
  }
  Stamp(FID());
  if(dump_p) {
    printf("%sjobname='%s' jobid='%s' entries=%zd\n", _eh.Data(),
	   mJobName.Data(), mJobId.Data(), mHistory.size());
    for(lMonaEntry_i i=mHistory.begin(); i!=mHistory.end(); ++i)
      printf("  %-32s %12s %s\n", i->fParam.Data(), i->fValue.Data(), i->fDateStr.Data());
  }
}
AEVProcessViz* AEVDistAnRep::find_or_crete_procviz(const TString& proc,
						   const TString& site)
{
  static const Exc_t _eh("AEVDistAnRep::find_or_crete_procviz ");
  AEVProcessViz* pv = (AEVProcessViz*) mProcVizes->GetElementByName(proc);
  if(pv == 0) {
    AEVSiteViz* sv = mDemoDriver->GetMapViz()->FindSiteViz(site);
    if(sv == 0)
      throw(_eh + "site-viz for '" + site + "' not found.");
    AEVEventBatch* eb = (AEVEventBatch*) mEvBatches->GetElementByName(site);
    if(eb == 0)
      throw(_eh + "event-batch for '" + site + "' not found.");
    pv = new AEVProcessViz(proc);
    mQueen->CheckIn(pv);
    pv->SetParent(this);
    pv->SetNodeA(sv);
    pv->SetNodeB(mDemoDriver->GetMonaViz()->GetParent());
    pv->SetBatch(eb);
    
    mDemoDriver->setup_tube(pv, "Red2Yellow");
    pv->SetTexUScale(5);
    pv->SetDtexU(-0.5);
    pv->SetDefVelocity(1 / mTravelTime);
    pv->SetMinWaitTime(mWaitTime);
    mDemoDriver->animate_tube(pv);
    {
      GLensWriteHolder wrlck(this);
      mProcVizes->Add(pv);
    }
    pv->AnimateConnect(1 / mConnectTime);
    {
      GLensWriteHolder wrlck(eb);
      eb->IncProcessing();
    }
  }
  return pv;
}
Int_t AEVDistAnRep::ReplayJobFromHistory()
{
  
  static const Exc_t _eh("AEVDistAnRep::ReplayJobFromHistory ");
  GMutexHolder replay_lock(mReplayCond);
  {
    GLensWriteHolder wrlck(this);
    if(bInReplay)
      throw(_eh + "already playing.");
    if(mHistory.empty())
      throw(_eh + "job history empty.");
    SetInReplay(true);
    mReplayTime = mHistory.front().fTime;
  }
  TPMERegexp proc_parname_re("([^:]+:[^:]+:[^:]+):([^:]+)");
  TPMERegexp site_subproc_re("([^:]+):([^:]+:[^:]+)");
  while(true) {
    if(mHistory.empty()) {
      GLensWriteHolder wrlck(this);
      printf("history empty, returning\n");
      SetInReplay(false);
      break;
    }
    AEVMlClient::MonaEntry me = mHistory.front();
    if(me.fTime > mReplayTime) {
      Double_t sleep_sec = (me.fTime - mReplayTime).ToDouble() *
	mReplayDuration / mProcDuration;
      if(sleep_sec > mReplayMaxWait)
	sleep_sec = mReplayMaxWait;
      printf("sleeping for %.3lf ms\n", sleep_sec);
      mReplayCond.TimedWait(GTime(sleep_sec, 0));
      GLensWriteHolder wrlck(this);
      if(bInReplay == false) {
	printf("replay stopped, returning\n");
	break;
      }
      mReplayTime = me.fTime;
      mReplayPos  = (mReplayTime - mProcStart).ToDouble()*
	mReplayDuration/mProcDuration;
      Stamp(FID());
    }
    mHistory.pop_front();
    printf("processing: '%s'\n", me.StdFormat());
    mDemoDriver->set_blurp(me.fDateStr);
    try {
      if(me.fParam == "status") {
	if(me.fValue == "STARTED") {
	  continue;
	}
	else if(me.fValue == "DONE") {
	  
	  
	  
	}
	
	
	continue;
      }
      Int_t m1, m2;
      m1 = proc_parname_re.Match(me.fParam);
      if(m1 != 3) {
	printf("  m1=%d trouble, skipping\n", m1);
	continue;
      }
      printf("  proc_id='%s', subparam='%s'\n",
	     proc_parname_re[1].Data(), proc_parname_re[2].Data());
      m2 = site_subproc_re.Match(proc_parname_re[1]);
      if(m2 != 3) {
	printf("  m2=%d trouble, skipping\n", m2);
	continue;
      }
      printf("  site='%s', subproc='%s'\n",
	     site_subproc_re[1].Data(), site_subproc_re[2].Data());
      TString proc    = proc_parname_re[1];
      TString parname = proc_parname_re[2];
      TString site    = site_subproc_re[1];
      TString subproc = site_subproc_re[2];
      AEVProcessViz* pv = find_or_crete_procviz(proc, site);
      AEVEventBatch* eb = pv->GetBatch();
      if(parname == "status") {
	printf("  status = %s\n", me.fValue.Data());
	if(me.fValue == "PROCESSING") {
	  
	  
	}
	else if(me.fValue == "DONE") {
	  { GLensWriteHolder wrlck(eb);
	    eb->DecProcessing();
	    if(eb->IsDone()) {
	      eb->Finalize();
	      mBatchState.IncNOK(1);
	      Stamp(FID());
	    }
	  }
	  { GLensWriteHolder wrlck(pv);
	    pv->AnimateDisconnect(1 / mConnectTime, true);
	  }
	}
      }
      else if(parname == "entries") {
	Int_t ents  = atoi(me.fValue.Data());
	Int_t delta = ents - pv->GetEntsDone();
	printf("  entries = %d, delta = %d\n", ents, delta);
	if(delta != 0) {
	  eb->mEvState.IncNOK(delta);
	  eb->Stamp(AEVEventBatch::FID());
	  mEventState.IncNOK(delta);
	  Stamp(FID());
	  pv->SetEntsDone(ents);
	  {
	    GLensWriteHolder tube_lck(pv);
	    pv->MakeTraveler();
	  }
	}
      }
      else if(parname == "datasize") {
	Float_t megs  = atof(me.fValue.Data())/1024/1024;
	Float_t delta = megs - pv->GetMegsDone();
	printf("  megs = %lf\n", megs);
	if(delta != 0) {
	  eb->mDataDoneMB = megs;
	  eb->Stamp(AEVEventBatch::FID());
	  pv->SetMegsDone(megs);
	}
      }
      else {
	printf("  unknown parameter name\n");
      }
    } catch(Exc_t exc) {
      printf("exception: '%s'.\nproceeding ...\n", exc.Data());
    }
  }
  GLensWriteHolder wrlck(this);
  Int_t n_left = mHistory.size();
  SetInReplay(false);
  ZMIR* mir = get_MIR();
  if(mir && mir->HasResultReq()) {
    TBufferFile b(TBuffer::kWrite);
    b << n_left;
    mSaturn->ShootMIRResult(b);
  }
  return n_left;
}
namespace {
struct EvBatchInfo {
  AEVEventBatch*         event_batch;
  vector<AEVProcessViz*> proc_vector;
  Int_t                  all_proc_count;
  Int_t                  done_proc_count;
  EvBatchInfo(AEVEventBatch* eb) : event_batch(eb)
  { all_proc_count = done_proc_count = 0;}
};
};
Int_t AEVDistAnRep::FakeReplayJobFromHistory()
{
  
  static const Exc_t _eh("AEVDistAnRep::FakeReplayJobFromHistory ");
  GMutexHolder replay_lock(mReplayCond);
  {
    GLensWriteHolder wrlck(this);
    if(bInReplay)
      throw(_eh + "already playing.");
    SetInReplay(true);
    
  }
  TRandom gen(0);
  Int_t n_todo = 0;
  list<EvBatchInfo> eb_infos;
  Stepper<AEVEventBatch> ebs(*mEvBatches);
  while(ebs.step()) {
    n_todo += ebs->RefEvState().GetNToDo();
    eb_infos.push_back(EvBatchInfo(*ebs));
  }
  while(true) {
    
    if(n_todo == 0) {
      GLensWriteHolder wrlck(this);
      printf("fake seems completed, returning\n");
      mEventState.SetNProc(0);
      SetInReplay(false);
      break;
    }
    n_todo = 0; 
    for(list<EvBatchInfo>::iterator ebi=eb_infos.begin(); ebi!=eb_infos.end(); ++ebi) {
      AEVEventBatch* eb = ebi->event_batch;
      Int_t ex_nproc = eb->RefEvState().GetNProc();
      Int_t ex_ndone = eb->RefEvState().GetNDone();
      Int_t ex_nok   = eb->RefEvState().GetNOK();
      Int_t ex_nfail = eb->RefEvState().GetNFail();
      eb->FakeProc();
      Int_t nproc = eb->RefEvState().GetNProc();
      Int_t ndone = eb->RefEvState().GetNDone();
      Int_t nok   = eb->RefEvState().GetNOK();
      Int_t nfail = eb->RefEvState().GetNFail();
      Int_t delta_nproc = nproc - ex_nproc;
      Int_t delta_ndone = ndone - ex_ndone;
      Int_t delta_nok   = nok   - ex_nok;
      Int_t delta_nfail = nfail - ex_nfail;
      
      
      
      Int_t born_procs = delta_nproc;
      while(born_procs-- > 0) {
	TString new_name(GForm("%s-%d", eb->GetName(), ebi->all_proc_count++));
	AEVProcessViz* pv = 0;
	{
	  AEVSiteViz* sv = mDemoDriver->GetMapViz()->FindSiteViz(eb->GetName());
	  if(sv == 0)
	    throw(_eh + "site-viz for '" + eb->GetName() + "' not found.");
	  pv = new AEVProcessViz(new_name);
	  mQueen->CheckIn(pv);
	  pv->SetParent(this);
	  pv->SetNodeA(sv);
	  pv->SetNodeB(mDemoDriver->GetMonaViz()->GetParent());
	  pv->SetBatch(eb);
	  
	  mDemoDriver->setup_tube(pv, "Red2Yellow");
	  pv->SetTexUScale(5);
	  pv->SetDtexU(-0.5);
	  pv->SetDefVelocity(1 / mTravelTime);
	  pv->SetMinWaitTime(mWaitTime);
	  mDemoDriver->animate_tube(pv);
	  {
	    GLensWriteHolder wrlck(this);
	    mProcVizes->Add(pv);
	  }
	  GLensWriteHolder wrlck(pv);
	  pv->AnimateConnect(1 / mConnectTime);
	}
	ebi->proc_vector.push_back(pv);
      }
      Int_t balls = delta_ndone;
      while(balls-- > 0) {
	int nproc = ebi->all_proc_count  - ebi->done_proc_count;
	int pos   = ebi->done_proc_count + int(nproc*gen.Rndm());
	
	AEVProcessViz* pv = ebi->proc_vector[pos];
	GLensWriteHolder tube_lck(pv);
	pv->MakeTraveler();
      }
      Int_t died_procs = delta_nproc;
      while(died_procs++ < 0) {
	int pos = ebi->done_proc_count++;
	AEVProcessViz* pv = ebi->proc_vector[pos];
	GLensWriteHolder wrlck(pv);
	pv->AnimateDisconnect(1 / mConnectTime, true);
      }
      if(delta_ndone != 0) {
	mEventState.IncNOK(delta_nok);
	mEventState.IncNFail(delta_nfail);
	mEventState.IncNProc(delta_nproc);
	Stamp(FID());
      }
      if(eb->RefEvState().GetNToDo() == 0) {
	
	list<EvBatchInfo>::iterator i = ebi--;
	
	eb_infos.erase(i);
	{ GLensWriteHolder wrlck(eb);
	  eb->Finalize();
	  mBatchState.IncNOK(1);
	  Stamp(FID());
	}
      }
      n_todo += eb->RefEvState().GetNToDo();
    }
    
    {
      Eventor* etor = mDemoDriver->GetAnimator();
      Double_t tstart = etor->GetInternalTime();
      do {
	mReplayCond.TimedWait(GTime::MiliSec(200));
	GLensWriteHolder wrlck(this);
	if(bInReplay == false) {
	  printf("replay stopped, returning\n");
	  break;
	}
      } while (etor->GetInternalTime() - tstart < 1.4);
    }
  }
  GLensWriteHolder wrlck(this);
  SetInReplay(false);
  ZMIR* mir = get_MIR();
  if(mir && mir->HasResultReq()) {
    TBufferFile b(TBuffer::kWrite);
    b << n_todo;
    mSaturn->ShootMIRResult(b);
  }
  return n_todo;
}
void AEVDistAnRep::StopReplay()
{
  static const Exc_t _eh("AEVDistAnRep::StopReplay ");
  if(bInReplay == false)
    throw(_eh + "not playing.");
  SetInReplay(false);
  mReplayCond.LockSignal();
}
void AEVDistAnRep::FinalizeJob()
{
  static const Exc_t _eh("AEVDistAnRep::FinalizeJob ");
  if(bInReplay)
    throw(_eh + "replay not finished.");
  { GMutexHolder llck(mProcVizes->RefListMutex());
    Stepper<AEVProcessViz> s(*mProcVizes);
    while(s.step()) {
      if(s->GetConnectionStauts() == WSTube::CS_Connected) {
	AEVEventBatch* eb = s->GetBatch();
	{ GLensWriteHolder wrlck(eb);
	  eb->DecProcessing();
	}
	{ GLensWriteHolder wrlck(*s);
	  s->AnimateDisconnect(1 / mConnectTime, true);
	}
      }
    }
  }
  { GMutexHolder llck(mEvBatches->RefListMutex());
    Stepper<AEVEventBatch> s(*mEvBatches);
    while(s.step()) {
      if(s->RefEvState().GetState() != 'F') {
	GLensWriteHolder wrlck(*s);
	s->Finalize();
	if(s->RefEvState().GetNOK() > s->RefEvState().GetNAll()/2) {
	  mBatchState.IncNOK(1);
	} else {
	  mBatchState.IncNFail(1);
	}
      }
    }
  }
  mBatchState.Finalize();
  mEventState.Finalize();
  Stamp(FID());
}
namespace {
struct ProcInfo {
  TString fSite;
  TString fStatus;
  Int_t   fEntsDone;
  Float_t fMegsDone;
  ProcInfo() : fEntsDone(0), fMegsDone(0) {}
};
}
void AEVDistAnRep::FollowJob()
{
  static const Exc_t _eh("AEVDistAnRep::FollowJob ");
  AEVMlClient* mona_client = mDemoDriver->GetMonaClient();
  mona_client->SetCluster("ALIEN_PROCESS");
  mona_client->SetNode(mJobId);
  mona_client->SetParam("*");
  TPMERegexp proc_parname_re("([^:]+:[^:]+:[^:]+):([^:]+)");
  TPMERegexp site_subproc_re("([^:]+):([^:]+:[^:]+)");
  map<TString, ProcInfo> proc_map;
  Bool_t first_entry = true;
  while(bInReplay) {
    mHistory.clear();
    proc_map.clear();
    mona_client->GetFLValues(mHistory);
    for(lMonaEntry_i mei=mHistory.begin(); mei!=mHistory.end(); ++mei) {
      AEVMlClient::MonaEntry& me = *mei;
      if(me.fParam == "status") {
	if(me.fValue == "STARTED") {
	  continue;
	}
	else if(me.fValue == "DONE") {
	  
	  
	  
	}
	
	
	continue;
      }
      Int_t m1, m2;
      m1 = proc_parname_re.Match(me.fParam);
      if(m1 != 3) {
	printf("  m1=%d trouble, skipping\n", m1);
	continue;
      }
      printf("  proc_id='%s', subparam='%s'\n",
	     proc_parname_re[1].Data(), proc_parname_re[2].Data());
      m2 = site_subproc_re.Match(proc_parname_re[1]);
      if(m2 != 3) {
	printf("  m2=%d trouble, skipping\n", m2);
	continue;
      }
      printf("  site='%s', subproc='%s'\n",
	     site_subproc_re[1].Data(), site_subproc_re[2].Data());
      TString proc    = proc_parname_re[1];
      TString parname = proc_parname_re[2];
      TString site    = site_subproc_re[1];
      TString subproc = site_subproc_re[2];
      proc_map[proc].fSite = site;
      if(parname == "status")
	proc_map[proc].fStatus = me.fValue;
      else if(parname == "entries")
	proc_map[proc].fEntsDone = atoi(me.fValue.Data());
      else if(parname == "datasize")
	proc_map[proc].fMegsDone = atof(me.fValue.Data())/1024/1024;
    } 
    for(map<TString, ProcInfo>::iterator pimi = proc_map.begin();
	pimi != proc_map.end(); ++pimi)
      {
	const TString& proc = pimi->first;
	ProcInfo&      pi   = pimi->second;
	AEVProcessViz* pv = 0;
	if(pi.fStatus == "PROCESSING")
	  pv = find_or_crete_procviz(proc, pi.fSite);
	else
	  pv = (AEVProcessViz*) mProcVizes->GetElementByName(proc);
	AEVEventBatch* eb = (AEVEventBatch*) mEvBatches->GetElementByName(pi.fSite);
	if(pv != 0) {
	  { 
	    Int_t ents  = pi.fEntsDone;
	    Int_t delta = ents - pv->GetEntsDone();
	    if(delta != 0) {
	      printf("  entries = %d, delta = %d\n", ents, delta);
	      eb->mEvState.IncNOK(delta);
	      eb->Stamp(AEVEventBatch::FID());
	      mEventState.IncNOK(delta);
	      Stamp(FID());
	      pv->SetEntsDone(ents);
	      {
		GLensWriteHolder tube_lck(pv);
		pv->MakeTraveler();
	      }
	    }
	  }
	  { 
	    Float_t megs  = pi.fMegsDone;
	    Float_t delta = megs - pv->GetMegsDone();
	    printf("  megs = %lf\n", megs);
	    if(delta != 0) {
	      eb->mDataDoneMB = megs;
	      eb->Stamp(AEVEventBatch::FID());
	      pv->SetMegsDone(megs);
	    }
	  }
	}
	if(pi.fStatus == "DONE") {
	  if(first_entry) {
	    Int_t delta = pi.fEntsDone;
	    eb->mEvState.IncNOK(delta);
	    eb->Stamp(AEVEventBatch::FID());
	    mEventState.IncNOK(delta);
	    Stamp(FID());
	  } else {
	    
	    if(pv != 0) {
	      { GLensWriteHolder wrlck(pv);
		pv->AnimateDisconnect(1 / mConnectTime, true);
	      }
	      { GLensWriteHolder wrlck(eb);
		eb->DecProcessing();
	      }
	    }
	  }
	} 
	{ GLensWriteHolder wrlck(eb);
	  if(eb->IsDone()) {
	    eb->Finalize();
	    mBatchState.IncNOK(1);
	    Stamp(FID());
	  }
	}
      } 
    first_entry = false;
  } 
}
void AEVDistAnRep::SendEvBatchesToTheatre(Amphitheatre* amph)
{
  static const Exc_t _eh("AEVDistAnRep::SendEvBatchesToTheatre ");
  if(amph == 0) amph = mDemoDriver->GetTheatre();
  if(amph == 0) throw(_eh + "no theatre in sight.");
  list<AEVEventBatch*> ebs;
  CopyListByGlass<AEVEventBatch>(ebs);
  amph->WriteLock();
  for(list<AEVEventBatch*>::iterator eb=ebs.begin(); eb!=ebs.end(); ++eb) {
    amph->AddGuest(*eb);
    RemoveAll(*eb);
  }
  amph->WriteUnlock();
  Stamp(FID());
  mSaturn->ShootMIR( amph->S_StartHunt() );
}
void AEVDistAnRep::Process()
{
  static const Exc_t _eh("AEVDistAnRep::Process ");
  
}
void AEVDistAnRep::UpdateProcStatus(TList* siteinfos,
				    map<string,int>* siteevmap)
{
  
}
void AEVDistAnRep::StartProc()
{
  mBatchState.SetNProc(mBatchState.GetNAll());
  list<AEVEventBatch*> ebs;
  mEvBatches->CopyListByGlass<AEVEventBatch>(ebs);
  for(list<AEVEventBatch*>::iterator eb=ebs.begin(); eb!=ebs.end(); ++eb) {
    (*eb)->WriteLock();
    const SEvTaskState&	es((*eb)->RefEvState());
    (*eb)->SetProcAvg((Int_t) TMath::Floor(es.GetNAll()*0.1 + 1));
    (*eb)->SetFracFail(0.15);
    (*eb)->WriteUnlock();
  }
  Stamp(FID());
}
void AEVDistAnRep::FakeProc()
{
  map<string, int> changes;
  FakeProc(changes);
}
void AEVDistAnRep::FakeProc(map<string, int>& changes)
{
  list<AEVEventBatch*> ebs;
  mEvBatches->CopyListByGlass<AEVEventBatch>(ebs);
  for(list<AEVEventBatch*>::iterator eb=ebs.begin(); eb!=ebs.end(); ++eb) {
    (*eb)->WriteLock();
    SEvTaskState s0( (*eb)->RefEvState() );
    if(s0.GetNToDo() > 0) {
      (*eb)->FakeProc();
      SEvTaskState s1( (*eb)->RefEvState() );
      int delta = s1.GetNDone() - s0.GetNDone();
      changes[(*eb)->GetName()] = delta;
      if(s1.GetNToDo() == 0) {
	mBatchState.IncNOK(1);
	mBatchState.IncNProc(-1);
      }
      { int d = s1.GetNOK()   - s0.GetNOK();   mEventState.IncNOK(d); }
      { int d = s1.GetNFail() - s0.GetNFail(); mEventState.IncNFail(d); }
      { int d = s1.GetNProc() - s0.GetNProc(); mEventState.IncNProc(d); }
    }
    (*eb)->WriteUnlock();
  }
  Stamp(FID());
}