8 #ifndef SRC_XRDCL_XRDCLECHANDLER_HH_
9 #define SRC_XRDCL_XRDCLECHANDLER_HH_
48 std::vector<uint32_t> cksums;
52 cksums.reserve( nbpages );
54 size_t size = chunk->
length;
55 char *buffer =
reinterpret_cast<char*
>( chunk->
buffer );
57 for(
size_t pg = 0; pg < nbpages; ++pg )
60 if( pgsize > size ) pgsize = size;
62 cksums.push_back( crcval );
70 response->
Set( pages );
106 if(
objcfg->plgr.empty() )
109 if( !st.
IsOK() )
return st;
112 writer->Open( handler, timeout );
121 if(
objcfg->plgr.empty() )
124 if( !st.
IsOK() )
return st;
127 reader->Open( handler, timeout );
140 (void)url; (void)mode;
141 return Open( flags, handler, timeout );
158 std::string commit = redir.GetPath()
159 +
"?xrdec.objid=" + objcfg->obj
160 +
"&xrdec.close=true&xrdec.size=" + std::to_string( curroff );
163 std::string ckstype = cksHelper->GetType();
165 auto st = cksHelper->GetCheckSum( cksval, ckstype );
168 handler->HandleResponse( new XRootDStatus( st ), nullptr );
171 commit +=
"&xrdec.cksum=" + cksval;
188 handler->HandleResponse( st, rsp );
190 return XRootDStatus();
204 if( !objcfg->nomtfile )
205 return fs.Stat( redir.GetPath(), handler, timeout );
207 if( !force && statcache )
209 auto rsp = StatRsp( statcache->GetSize() );
210 Schedule( handler, rsp );
217 statcache->SetSize( writer->GetSize() );
218 auto rsp = StatRsp( statcache->GetSize() );
219 Schedule( handler, rsp );
226 statcache->SetSize( reader->GetSize() );
227 auto rsp = StatRsp( statcache->GetSize() );
228 Schedule( handler, rsp );
246 reader->Read( offset, size, buffer, handler, timeout );
273 cksHelper->Update( buffer, size );
277 writer->Write( size, buffer, handler );
288 std::vector<uint32_t> &cksums,
290 uint16_t timeout = 0 )
292 if(! cksums.empty() )
294 const char *data =
static_cast<const char*
>( buffer );
295 std::vector<uint32_t> local_cksums;
297 if (data)
delete data;
298 if (local_cksums != cksums)
301 return Write(offset, size, buffer, handler, timeout);
309 return writer || reader;
318 std::unique_ptr<LocationInfo> ptr( infoAll );
319 if( !st.
IsOK() )
return st;
322 std::unique_ptr<LocationInfo> ptr1( info );
325 for(
size_t i = 0; i < infoAll->
GetSize(); ++i )
327 auto &location = infoAll->
At( i );
332 if( info->
GetSize() < objcfg->nbchunks )
334 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
335 shuffle (info->
Begin(), info->
End(), std::default_random_engine(seed));
336 for(
size_t i = 0; i < objcfg->nbchunks; ++i )
338 auto &location = info->
At( i );
339 objcfg->plgr.emplace_back(
"root://" + location.GetAddress() +
'/' );
348 std::unique_ptr<LocationInfo> ptr( info );
349 if( !st.
IsOK() )
return st;
351 if( info->
GetSize() < objcfg->nbdata )
354 uint64_t verNumMax = 0;
355 std::vector<uint64_t> verNums;
356 std::vector<std::string> xattrkeys;
357 std::vector<XrdCl::XAttr> xattrvals;
358 xattrkeys.push_back(
"xrdec.strpver");
359 for(
size_t i = 0; i < info->
GetSize(); ++i )
363 st = fs_i->
GetXAttr(path, xattrkeys, xattrvals, 0);
364 if (st.
IsOK() && ! xattrvals[0].value.empty())
366 std::stringstream sstream(xattrvals[0].value);
369 verNums.push_back(verNum);
370 if (verNum > verNumMax)
374 verNums.push_back(0);
379 for(
size_t i = 0; i < info->
GetSize(); ++i )
381 if ( verNums.at(i) == 0 || verNums.at(i) != verNumMax )
385 auto &location = info->
At( i );
386 objcfg->plgr.emplace_back(
"root://" + location.GetAddress() +
'/' );
388 if (n < objcfg->nbdata )
411 std::unique_ptr<XrdEc::StrmWriter>
writer;
429 std::vector<std::string> && plgr ) :
430 nbdta( nbdta ), nbprt( nbprt ), chsz( chsz ), plgr(
std::move( plgr ) )
449 objcfg->
plgr = std::move( plgr );
450 return new EcHandler( url, objcfg,
nullptr );
Definition: XrdClAnyObject.hh:33
void Set(Type object, bool own=true)
Definition: XrdClAnyObject.hh:59
void Get(Type &object)
Retrieve the object being held.
Definition: XrdClAnyObject.hh:78
Binary blob representation.
Definition: XrdClBuffer.hh:34
void FromString(const std::string str)
Fill the buffer from a string.
Definition: XrdClBuffer.hh:205
static PostMaster * GetPostMaster()
Get default post master.
Definition: XrdClEcHandler.hh:78
static void Schedule(ResponseHandler *handler, AnyObject *rsp)
Definition: XrdClEcHandler.hh:402
std::unique_ptr< XrdEc::Reader > reader
Definition: XrdClEcHandler.hh:412
XRootDStatus LoadPlacement(const std::string &path)
Definition: XrdClEcHandler.hh:344
static AnyObject * StatRsp(uint64_t size)
Definition: XrdClEcHandler.hh:393
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:148
std::unique_ptr< XrdEc::ObjCfg > objcfg
Definition: XrdClEcHandler.hh:410
FileSystem fs
Definition: XrdClEcHandler.hh:409
XRootDStatus PgWrite(uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
Definition: XrdClEcHandler.hh:285
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:134
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:266
uint64_t curroff
Definition: XrdClEcHandler.hh:413
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:199
bool IsOpen() const
Definition: XrdClEcHandler.hh:307
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:238
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:253
URL redir
Definition: XrdClEcHandler.hh:408
std::unique_ptr< XrdEc::StrmWriter > writer
Definition: XrdClEcHandler.hh:411
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:95
std::unique_ptr< StatInfo > statcache
Definition: XrdClEcHandler.hh:415
virtual ~EcHandler()
Definition: XrdClEcHandler.hh:91
EcHandler(const URL &redir, XrdEc::ObjCfg *objcfg, std::unique_ptr< CheckSumHelper > cksHelper)
Definition: XrdClEcHandler.hh:80
std::unique_ptr< CheckSumHelper > cksHelper
Definition: XrdClEcHandler.hh:414
XRootDStatus LoadPlacement()
Definition: XrdClEcHandler.hh:314
Definition: XrdClEcHandler.hh:27
EcPgReadResponseHandler(ResponseHandler *a)
Definition: XrdClEcHandler.hh:32
XrdCl::ResponseHandler * realHandler
Definition: XrdClEcHandler.hh:29
void HandleResponse(XRootDStatus *status, AnyObject *rdresp)
Definition: XrdClEcHandler.hh:35
Plugin factory.
Definition: XrdClEcHandler.hh:423
uint64_t chsz
Definition: XrdClEcHandler.hh:464
EcPlugInFactory(uint8_t nbdta, uint8_t nbprt, uint64_t chsz, std::vector< std::string > &&plgr)
Constructor.
Definition: XrdClEcHandler.hh:428
std::vector< std::string > plgr
Definition: XrdClEcHandler.hh:465
uint8_t nbdta
Definition: XrdClEcHandler.hh:462
virtual FilePlugIn * CreateFile(const std::string &u)
Create a file plug-in for the given URL.
Definition: XrdClEcHandler.hh:444
uint8_t nbprt
Definition: XrdClEcHandler.hh:463
virtual FileSystemPlugIn * CreateFileSystem(const std::string &url)
Create a file system plug-in for the given URL.
Definition: XrdClEcHandler.hh:456
virtual ~EcPlugInFactory()
Destructor.
Definition: XrdClEcHandler.hh:437
An interface for file plug-ins.
Definition: XrdClPlugInInterface.hh:39
An interface for file plug-ins.
Definition: XrdClPlugInInterface.hh:284
Send file/filesystem queries to an XRootD cluster.
Definition: XrdClFileSystem.hh:203
XRootDStatus Query(QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus GetXAttr(const std::string &path, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition: XrdClJobManager.hh:92
const std::string & GetAddress() const
Get address.
Definition: XrdClXRootDResponses.hh:86
Path location info.
Definition: XrdClXRootDResponses.hh:44
uint32_t GetSize() const
Get number of locations.
Definition: XrdClXRootDResponses.hh:152
Iterator Begin()
Get the location begin iterator.
Definition: XrdClXRootDResponses.hh:168
Location & At(uint32_t index)
Get the location at index.
Definition: XrdClXRootDResponses.hh:160
void Add(const Location &location)
Add a location.
Definition: XrdClXRootDResponses.hh:200
@ ServerOnline
server node where the file is online
Definition: XrdClXRootDResponses.hh:53
Iterator End()
Get the location end iterator.
Definition: XrdClXRootDResponses.hh:184
Plugin factory.
Definition: XrdClPlugInInterface.hh:549
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
Definition: XrdClXRootDResponses.hh:1117
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXRootDResponses.hh:1146
Call the user callback.
Definition: XrdClResponseJob.hh:31
Object stat info.
Definition: XrdClXRootDResponses.hh:400
void SetSize(uint64_t size)
Set size.
URL representation.
Definition: XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:212
Request status.
Definition: XrdClXRootDResponses.hh:219
bool enable_plugins
Definition: XrdEcConfig.hh:77
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
Definition: XrdEcReader.hh:58
Definition: XrdEcStrmWriter.hh:53
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
Definition: XrdClAnyObject.hh:26
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
Definition: XrdClFileOperations.hh:591
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
Definition: XrdClFileOperations.hh:273
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
static const int PageSize
Definition: XrdSysPageSize.hh:36
Definition: XrdOucJson.hh:4517
Mode
Access mode.
Definition: XrdClFileSystem.hh:122
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:908
void * buffer
length of the chunk
Definition: XrdClXRootDResponses.hh:941
uint32_t length
offset in the file
Definition: XrdClXRootDResponses.hh:940
uint64_t offset
Definition: XrdClXRootDResponses.hh:939
Flags
Open flags, may be or'd when appropriate.
Definition: XrdClFileSystem.hh:76
@ Delete
Definition: XrdClFileSystem.hh:80
@ Read
Open only for reading.
Definition: XrdClFileSystem.hh:95
@ New
Definition: XrdClFileSystem.hh:86
@ Write
Open only for writing.
Definition: XrdClFileSystem.hh:97
@ None
Nothing.
Definition: XrdClFileSystem.hh:77
@ Update
Open for reading and writing.
Definition: XrdClFileSystem.hh:96
Definition: XrdClXRootDResponses.hh:947
@ OpaqueFile
Implementation dependent.
Definition: XrdClFileSystem.hh:58
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:123
Definition: XrdEcObjCfg.hh:34
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92