xrootd
|
00001 00002 // // 00003 // XrdClientPhyConnection // 00004 // Author: Fabrizio Furano (INFN Padova, 2004) // 00005 // Adapted from TXNetFile (root.cern.ch) originally done by // 00006 // Alvise Dorigo, Fabrizio Furano // 00007 // INFN Padova, 2003 // 00008 // // 00009 // Class handling physical connections to xrootd servers // 00010 // // 00012 00013 // $Id$ 00014 00015 #ifndef _XrdClientPhyConnection 00016 #define _XrdClientPhyConnection 00017 00018 #include "XrdClient/XrdClientPSock.hh" 00019 #include "XrdClient/XrdClientMessage.hh" 00020 #include "XrdClient/XrdClientUnsolMsg.hh" 00021 #include "XrdClient/XrdClientInputBuffer.hh" 00022 #include "XrdClient/XrdClientUrlInfo.hh" 00023 #include "XrdClient/XrdClientThread.hh" 00024 #include "XrdSys/XrdSysPthread.hh" 00025 #include "XrdSys/XrdSysSemWait.hh" 00026 00027 #include <time.h> // for time_t data type 00028 00029 enum ELoginState { 00030 kNo = 0, 00031 kYes = 1, 00032 kPending = 2 00033 }; 00034 00035 enum ERemoteServerType { 00036 kSTError = -1, // Some error occurred: server type undetermined 00037 kSTNone = 0, // Remote server type un-recognized 00038 kSTRootd = 1, // Remote server type: old rootd server 00039 kSTBaseXrootd = 2, // Remote server type: xrootd dynamic load balancer 00040 kSTDataXrootd = 3 // Remote server type: xrootd data server 00041 }; 00042 00043 class XrdClientSid; 00044 class XrdSecProtocol; 00045 00046 class XrdClientPhyConnection: public XrdClientUnsolMsgSender { 00047 00048 private: 00049 time_t fLastUseTimestamp; 00050 enum ELoginState fLogged; // only 1 login/auth is needed for physical 00051 XrdSecProtocol *fSecProtocol; // authentication protocol 00052 00053 XrdClientInputBuffer 00054 fMsgQ; // The queue used to hold incoming messages 00055 00056 int fRequestTimeout; 00057 bool fMStreamsGoing; 00058 XrdSysRecMutex fRwMutex; // Lock before using the physical channel 00059 // (for reading and/or writing) 00060 00061 XrdSysRecMutex fMutex; 00062 XrdSysRecMutex fMultireadMutex; // Used to arbitrate between multiple 00063 // threads reading msgs from the same conn 00064 00065 XrdClientThread *fReaderthreadhandler[64]; // The thread which is going to pump 00066 // out the data from the socket 00067 00068 int fReaderthreadrunning; 00069 00070 XrdClientUrlInfo fServer; 00071 00072 XrdClientSock *fSocket; 00073 00074 UnsolRespProcResult HandleUnsolicited(XrdClientMessage *m); 00075 00076 XrdSysSemWait fReaderCV; 00077 00078 short fLogConnCnt; // Number of logical connections using this phyconn 00079 00080 XrdClientSid *fSidManager; 00081 00082 public: 00083 long fServerProto; // The server protocol 00084 ERemoteServerType fServerType; 00085 long fTTLsec; 00086 00087 XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h, XrdClientSid *sid); 00088 ~XrdClientPhyConnection(); 00089 00090 XrdClientMessage *BuildMessage(bool IgnoreTimeouts, bool Enqueue); 00091 bool CheckAutoTerm(); 00092 00093 bool Connect(XrdClientUrlInfo RemoteHost, bool isUnix = 0); 00094 00095 //-------------------------------------------------------------------------- 00103 //-------------------------------------------------------------------------- 00104 bool Connect( XrdClientUrlInfo RemoteHost, bool isUnix , int fd ); 00105 00106 void CountLogConn(int d = 1); 00107 void Disconnect(); 00108 00109 ERemoteServerType 00110 DoHandShake(ServerInitHandShake &xbody, 00111 int substreamid = 0); 00112 00113 bool ExpiredTTL(); 00114 short GetLogConnCnt() const { return fLogConnCnt; } 00115 int GetReaderThreadsCnt() { XrdSysMutexHelper l(fMutex); return fReaderthreadrunning; } 00116 00117 long GetTTL() { return fTTLsec; } 00118 00119 XrdSecProtocol *GetSecProtocol() const { return fSecProtocol; } 00120 int GetSocket() { return fSocket ? fSocket->fSocket : -1; } 00121 00122 // Tells to the sock to rebuild the list of interesting selectors 00123 void ReinitFDTable() { if (fSocket) fSocket->ReinitFDTable(); } 00124 00125 int SaveSocket() { fTTLsec = 0; return fSocket ? (fSocket->SaveSocket()) : -1; } 00126 void SetInterrupt() { if (fSocket) fSocket->SetInterrupt(); } 00127 void SetSecProtocol(XrdSecProtocol *sp) { fSecProtocol = sp; } 00128 00129 void StartedReader(); 00130 00131 bool IsAddress(const XrdOucString &addr) { 00132 return ( (fServer.Host == addr) || 00133 (fServer.HostAddr == addr) ); 00134 } 00135 00136 ELoginState IsLogged(); 00137 00138 bool IsPort(int port) { return (fServer.Port == port); }; 00139 bool IsUser(const XrdOucString &usr) { return (fServer.User == usr); }; 00140 bool IsValid(); 00141 00142 00143 void LockChannel(); 00144 00145 // see XrdClientSock for the meaning of the parameters 00146 int ReadRaw(void *buffer, int BufferLength, int substreamid = -1, 00147 int *usedsubstreamid = 0); 00148 00149 XrdClientMessage *ReadMessage(int streamid); 00150 bool ReConnect(XrdClientUrlInfo RemoteHost); 00151 void SetLogged(ELoginState status) { fLogged = status; } 00152 inline void SetTTL(long ttl) { fTTLsec = ttl; } 00153 void StartReader(); 00154 void Touch(); 00155 void UnlockChannel(); 00156 int WriteRaw(const void *buffer, int BufferLength, int substreamid = 0); 00157 00158 int TryConnectParallelStream(int port, int windowsz, int sockid) { return ( fSocket ? fSocket->TryConnectParallelSock(port, windowsz, sockid) : -1); } 00159 int EstablishPendingParallelStream(int tmpid, int newid) { return ( fSocket ? fSocket->EstablishParallelSock(tmpid, newid) : -1); } 00160 void RemoveParallelStream(int substreamid) { if (fSocket) fSocket->RemoveParallelSock(substreamid); } 00161 // Tells if the attempt to establish the parallel streams is ongoing or was done 00162 // and mark it as ongoing or done 00163 bool TestAndSetMStreamsGoing(); 00164 00165 int GetSockIdHint(int reqsperstream) { return ( fSocket ? fSocket->GetSockIdHint(reqsperstream) : 0); } 00166 int GetSockIdCount() {return ( fSocket ? fSocket->GetSockIdCount() : 0); } 00167 void PauseSelectOnSubstream(int substreamid) { if (fSocket) fSocket->PauseSelectOnSubstream(substreamid); } 00168 void RestartSelectOnSubstream(int substreamid) { if (fSocket) fSocket->RestartSelectOnSubstream(substreamid); } 00169 00170 // To prohibit/re-enable a socket descriptor from being looked at by the reader threads 00171 virtual void BanSockDescr(int sockdescr, int sockid) { if (fSocket) fSocket->BanSockDescr(sockdescr, sockid); } 00172 virtual void UnBanSockDescr(int sockdescr) { if (fSocket) fSocket->UnBanSockDescr(sockdescr); } 00173 00174 void ReadLock() { fMultireadMutex.Lock(); } 00175 void ReadUnLock() { fMultireadMutex.UnLock(); } 00176 00177 int WipeStreamid(int streamid) { return fMsgQ.WipeStreamid(streamid); } 00178 }; 00179 00180 00181 00182 00183 // 00184 // Class implementing a trick to automatically unlock an XrdClientPhyConnection 00185 // 00186 class XrdClientPhyConnLocker { 00187 private: 00188 XrdClientPhyConnection *phyconn; 00189 00190 public: 00191 XrdClientPhyConnLocker(XrdClientPhyConnection *phyc) { 00192 // Constructor 00193 phyconn = phyc; 00194 phyconn->LockChannel(); 00195 } 00196 00197 ~XrdClientPhyConnLocker(){ 00198 // Destructor. 00199 phyconn->UnlockChannel(); 00200 } 00201 00202 }; 00203 00204 00205 #endif