"""Utility functions for copying and archiving files and directory trees.XXX The functions here don't copy the resource fork or other metadata on Mac."""importosimportsysimportstatimportfnmatchimportcollectionsimporterrnotry:importzlibdelzlib_ZLIB_SUPPORTED=TrueexceptImportError:_ZLIB_SUPPORTED=Falsetry:importbz2delbz2_BZ2_SUPPORTED=TrueexceptImportError:_BZ2_SUPPORTED=Falsetry:importlzmadellzma_LZMA_SUPPORTED=TrueexceptImportError:_LZMA_SUPPORTED=Falsetry:frompwdimportgetpwnamexceptImportError:getpwnam=Nonetry:fromgrpimportgetgrnamexceptImportError:getgrnam=None__all__=["copyfileobj","copyfile","copymode","copystat","copy","copy2","copytree","move","rmtree","Error","SpecialFileError","ExecError","make_archive","get_archive_formats","register_archive_format","unregister_archive_format","get_unpack_formats","register_unpack_format","unregister_unpack_format","unpack_archive","ignore_patterns","chown","which","get_terminal_size","SameFileError"]# disk_usage is added later, if available on the platformclassError(OSError):passclassSameFileError(Error):"""Raised when source and destination are the same file."""classSpecialFileError(OSError):"""Raised when trying to do a kind of operation (e.g. copying) which is not supported on a special file (e.g. a named pipe)"""classExecError(OSError):"""Raised when a command could not be executed"""classReadError(OSError):"""Raised when an archive cannot be read"""classRegistryError(Exception):"""Raised when a registry operation with the archiving and unpacking registries fails"""
[docs]defcopyfileobj(fsrc,fdst,length=16*1024):"""copy data from file-like object fsrc to file-like object fdst"""while1:buf=fsrc.read(length)ifnotbuf:breakfdst.write(buf)
def_samefile(src,dst):# Macintosh, Unix.ifhasattr(os.path,'samefile'):try:returnos.path.samefile(src,dst)exceptOSError:returnFalse# All other platforms: check for same pathname.return(os.path.normcase(os.path.abspath(src))==os.path.normcase(os.path.abspath(dst)))defcopyfile(src,dst,*,follow_symlinks=True):"""Copy data from src to dst. If follow_symlinks is not set and src is a symbolic link, a new symlink will be created instead of copying the file it points to. """if_samefile(src,dst):raiseSameFileError("{!r} and {!r} are the same file".format(src,dst))forfnin[src,dst]:try:st=os.stat(fn)exceptOSError:# File most likely does not existpasselse:# XXX What about other special files? (sockets, devices...)ifstat.S_ISFIFO(st.st_mode):raiseSpecialFileError("`%s` is a named pipe"%fn)ifnotfollow_symlinksandos.path.islink(src):os.symlink(os.readlink(src),dst)else:withopen(src,'rb')asfsrc:withopen(dst,'wb')asfdst:copyfileobj(fsrc,fdst)returndstdefcopymode(src,dst,*,follow_symlinks=True):"""Copy mode bits from src to dst. If follow_symlinks is not set, symlinks aren't followed if and only if both `src` and `dst` are symlinks. If `lchmod` isn't available (e.g. Linux) this method does nothing. """ifnotfollow_symlinksandos.path.islink(src)andos.path.islink(dst):ifhasattr(os,'lchmod'):stat_func,chmod_func=os.lstat,os.lchmodelse:returnelifhasattr(os,'chmod'):stat_func,chmod_func=os.stat,os.chmodelse:returnst=stat_func(src)chmod_func(dst,stat.S_IMODE(st.st_mode))ifhasattr(os,'listxattr'):def_copyxattr(src,dst,*,follow_symlinks=True):"""Copy extended filesystem attributes from `src` to `dst`. Overwrite existing attributes. If `follow_symlinks` is false, symlinks won't be followed. """try:names=os.listxattr(src,follow_symlinks=follow_symlinks)exceptOSErrorase:ife.errnonotin(errno.ENOTSUP,errno.ENODATA):raisereturnfornameinnames:try:value=os.getxattr(src,name,follow_symlinks=follow_symlinks)os.setxattr(dst,name,value,follow_symlinks=follow_symlinks)exceptOSErrorase:ife.errnonotin(errno.EPERM,errno.ENOTSUP,errno.ENODATA):raiseelse:def_copyxattr(*args,**kwargs):passdefcopystat(src,dst,*,follow_symlinks=True):"""Copy all stat info (mode bits, atime, mtime, flags) from src to dst. If the optional flag `follow_symlinks` is not set, symlinks aren't followed if and only if both `src` and `dst` are symlinks. """def_nop(*args,ns=None,follow_symlinks=None):pass# follow symlinks (aka don't not follow symlinks)follow=follow_symlinksornot(os.path.islink(src)andos.path.islink(dst))iffollow:# use the real function if it existsdeflookup(name):returngetattr(os,name,_nop)else:# use the real function only if it exists# *and* it supports follow_symlinksdeflookup(name):fn=getattr(os,name,_nop)iffninos.supports_follow_symlinks:returnfnreturn_nopst=lookup("stat")(src,follow_symlinks=follow)mode=stat.S_IMODE(st.st_mode)lookup("utime")(dst,ns=(st.st_atime_ns,st.st_mtime_ns),follow_symlinks=follow)try:lookup("chmod")(dst,mode,follow_symlinks=follow)exceptNotImplementedError:# if we got a NotImplementedError, it's because# * follow_symlinks=False,# * lchown() is unavailable, and# * either# * fchownat() is unavailable or# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.# (it returned ENOSUP.)# therefore we're out of options--we simply cannot chown the# symlink. give up, suppress the error.# (which is what shutil always did in this circumstance.)passifhasattr(st,'st_flags'):try:lookup("chflags")(dst,st.st_flags,follow_symlinks=follow)exceptOSErroraswhy:forerrin'EOPNOTSUPP','ENOTSUP':ifhasattr(errno,err)andwhy.errno==getattr(errno,err):breakelse:raise_copyxattr(src,dst,follow_symlinks=follow)defcopy(src,dst,*,follow_symlinks=True):"""Copy data and mode bits ("cp src dst"). Return the file's destination. The destination may be a directory. If follow_symlinks is false, symlinks won't be followed. This resembles GNU's "cp -P src dst". If source and destination are the same file, a SameFileError will be raised. """ifos.path.isdir(dst):dst=os.path.join(dst,os.path.basename(src))copyfile(src,dst,follow_symlinks=follow_symlinks)copymode(src,dst,follow_symlinks=follow_symlinks)returndstdefcopy2(src,dst,*,follow_symlinks=True):"""Copy data and all stat info ("cp -p src dst"). Return the file's destination." The destination may be a directory. If follow_symlinks is false, symlinks won't be followed. This resembles GNU's "cp -P src dst". """ifos.path.isdir(dst):dst=os.path.join(dst,os.path.basename(src))copyfile(src,dst,follow_symlinks=follow_symlinks)copystat(src,dst,follow_symlinks=follow_symlinks)returndstdefignore_patterns(*patterns):"""Function that can be used as copytree() ignore parameter. Patterns is a sequence of glob-style patterns that are used to exclude files"""def_ignore_patterns(path,names):ignored_names=[]forpatterninpatterns:ignored_names.extend(fnmatch.filter(names,pattern))returnset(ignored_names)return_ignore_patternsdefcopytree(src,dst,symlinks=False,ignore=None,copy_function=copy2,ignore_dangling_symlinks=False):"""Recursively copy a directory tree. The destination directory must not already exist. If exception(s) occur, an Error is raised with a list of reasons. If the optional symlinks flag is true, symbolic links in the source tree result in symbolic links in the destination tree; if it is false, the contents of the files pointed to by symbolic links are copied. If the file pointed by the symlink doesn't exist, an exception will be added in the list of errors raised in an Error exception at the end of the copy process. You can set the optional ignore_dangling_symlinks flag to true if you want to silence this exception. Notice that this has no effect on platforms that don't support os.symlink. The optional ignore argument is a callable. If given, it is called with the `src` parameter, which is the directory being visited by copytree(), and `names` which is the list of `src` contents, as returned by os.listdir(): callable(src, names) -> ignored_names Since copytree() is called recursively, the callable will be called once for each directory that is copied. It returns a list of names relative to the `src` directory that should not be copied. The optional copy_function argument is a callable that will be used to copy each file. It will be called with the source path and the destination path as arguments. By default, copy2() is used, but any function that supports the same signature (like copy()) can be used. """names=os.listdir(src)ifignoreisnotNone:ignored_names=ignore(src,names)else:ignored_names=set()os.makedirs(dst)errors=[]fornameinnames:ifnameinignored_names:continuesrcname=os.path.join(src,name)dstname=os.path.join(dst,name)try:ifos.path.islink(srcname):linkto=os.readlink(srcname)ifsymlinks:# We can't just leave it to `copy_function` because legacy# code with a custom `copy_function` may rely on copytree# doing the right thing.os.symlink(linkto,dstname)copystat(srcname,dstname,follow_symlinks=notsymlinks)else:# ignore dangling symlink if the flag is onifnotos.path.exists(linkto)andignore_dangling_symlinks:continue# otherwise let the copy occurs. copy2 will raise an errorifos.path.isdir(srcname):copytree(srcname,dstname,symlinks,ignore,copy_function)else:copy_function(srcname,dstname)elifos.path.isdir(srcname):copytree(srcname,dstname,symlinks,ignore,copy_function)else:# Will raise a SpecialFileError for unsupported file typescopy_function(srcname,dstname)# catch the Error from the recursive copytree so that we can# continue with other filesexceptErroraserr:errors.extend(err.args[0])exceptOSErroraswhy:errors.append((srcname,dstname,str(why)))try:copystat(src,dst)exceptOSErroraswhy:# Copying file access times may fail on Windowsifgetattr(why,'winerror',None)isNone:errors.append((src,dst,str(why)))iferrors:raiseError(errors)returndst# version vulnerable to race conditionsdef_rmtree_unsafe(path,onerror):try:ifos.path.islink(path):# symlinks to directories are forbidden, see bug #1669raiseOSError("Cannot call rmtree on a symbolic link")exceptOSError:onerror(os.path.islink,path,sys.exc_info())# can't continue even if onerror hook returnsreturnnames=[]try:names=os.listdir(path)exceptOSError:onerror(os.listdir,path,sys.exc_info())fornameinnames:fullname=os.path.join(path,name)try:mode=os.lstat(fullname).st_modeexceptOSError:mode=0ifstat.S_ISDIR(mode):_rmtree_unsafe(fullname,onerror)else:try:os.unlink(fullname)exceptOSError:onerror(os.unlink,fullname,sys.exc_info())try:os.rmdir(path)exceptOSError:onerror(os.rmdir,path,sys.exc_info())# Version using fd-based APIs to protect against racesdef_rmtree_safe_fd(topfd,path,onerror):names=[]try:names=os.listdir(topfd)exceptOSErroraserr:err.filename=pathonerror(os.listdir,path,sys.exc_info())fornameinnames:fullname=os.path.join(path,name)try:orig_st=os.stat(name,dir_fd=topfd,follow_symlinks=False)mode=orig_st.st_modeexceptOSError:mode=0ifstat.S_ISDIR(mode):try:dirfd=os.open(name,os.O_RDONLY,dir_fd=topfd)exceptOSError:onerror(os.open,fullname,sys.exc_info())else:try:ifos.path.samestat(orig_st,os.fstat(dirfd)):_rmtree_safe_fd(dirfd,fullname,onerror)try:os.rmdir(name,dir_fd=topfd)exceptOSError:onerror(os.rmdir,fullname,sys.exc_info())else:try:# This can only happen if someone replaces# a directory with a symlink after the call to# stat.S_ISDIR above.raiseOSError("Cannot call rmtree on a symbolic ""link")exceptOSError:onerror(os.path.islink,fullname,sys.exc_info())finally:os.close(dirfd)else:try:os.unlink(name,dir_fd=topfd)exceptOSError:onerror(os.unlink,fullname,sys.exc_info())_use_fd_functions=({os.open,os.stat,os.unlink,os.rmdir}<=os.supports_dir_fdandos.listdirinos.supports_fdandos.statinos.supports_follow_symlinks)defrmtree(path,ignore_errors=False,onerror=None):"""Recursively delete a directory tree. If ignore_errors is set, errors are ignored; otherwise, if onerror is set, it is called to handle the error with arguments (func, path, exc_info) where func is platform and implementation dependent; path is the argument to that function that caused it to fail; and exc_info is a tuple returned by sys.exc_info(). If ignore_errors is false and onerror is None, an exception is raised. """ifignore_errors:defonerror(*args):passelifonerrorisNone:defonerror(*args):raiseif_use_fd_functions:# While the unsafe rmtree works fine on bytes, the fd based does not.ifisinstance(path,bytes):path=os.fsdecode(path)# Note: To guard against symlink races, we use the standard# lstat()/open()/fstat() trick.try:orig_st=os.lstat(path)exceptException:onerror(os.lstat,path,sys.exc_info())returntry:fd=os.open(path,os.O_RDONLY)exceptException:onerror(os.lstat,path,sys.exc_info())returntry:ifos.path.samestat(orig_st,os.fstat(fd)):_rmtree_safe_fd(fd,path,onerror)try:os.rmdir(path)exceptOSError:onerror(os.rmdir,path,sys.exc_info())else:try:# symlinks to directories are forbidden, see bug #1669raiseOSError("Cannot call rmtree on a symbolic link")exceptOSError:onerror(os.path.islink,path,sys.exc_info())finally:os.close(fd)else:return_rmtree_unsafe(path,onerror)# Allow introspection of whether or not the hardening against symlink# attacks is supported on the current platformrmtree.avoids_symlink_attacks=_use_fd_functionsdef_basename(path):# A basename() variant which first strips the trailing slash, if present.# Thus we always get the last component of the path, even for directories.sep=os.path.sep+(os.path.altsepor'')returnos.path.basename(path.rstrip(sep))defmove(src,dst,copy_function=copy2):"""Recursively move a file or directory to another location. This is similar to the Unix "mv" command. Return the file or directory's destination. If the destination is a directory or a symlink to a directory, the source is moved inside the directory. The destination path must not already exist. If the destination already exists but is not a directory, it may be overwritten depending on os.rename() semantics. If the destination is on our current filesystem, then rename() is used. Otherwise, src is copied to the destination and then removed. Symlinks are recreated under the new name if os.rename() fails because of cross filesystem renames. The optional `copy_function` argument is a callable that will be used to copy the source or it will be delegated to `copytree`. By default, copy2() is used, but any function that supports the same signature (like copy()) can be used. A lot more could be done here... A look at a mv.c shows a lot of the issues this implementation glosses over. """real_dst=dstifos.path.isdir(dst):if_samefile(src,dst):# We might be on a case insensitive filesystem,# perform the rename anyway.os.rename(src,dst)returnreal_dst=os.path.join(dst,_basename(src))ifos.path.exists(real_dst):raiseError("Destination path '%s' already exists"%real_dst)try:os.rename(src,real_dst)exceptOSError:ifos.path.islink(src):linkto=os.readlink(src)os.symlink(linkto,real_dst)os.unlink(src)elifos.path.isdir(src):if_destinsrc(src,dst):raiseError("Cannot move a directory '%s' into itself"" '%s'."%(src,dst))copytree(src,real_dst,copy_function=copy_function,symlinks=True)rmtree(src)else:copy_function(src,real_dst)os.unlink(src)returnreal_dstdef_destinsrc(src,dst):src=os.path.abspath(src)dst=os.path.abspath(dst)ifnotsrc.endswith(os.path.sep):src+=os.path.sepifnotdst.endswith(os.path.sep):dst+=os.path.sepreturndst.startswith(src)def_get_gid(name):"""Returns a gid, given a group name."""ifgetgrnamisNoneornameisNone:returnNonetry:result=getgrnam(name)exceptKeyError:result=NoneifresultisnotNone:returnresult[2]returnNonedef_get_uid(name):"""Returns an uid, given a user name."""ifgetpwnamisNoneornameisNone:returnNonetry:result=getpwnam(name)exceptKeyError:result=NoneifresultisnotNone:returnresult[2]returnNonedef_make_tarball(base_name,base_dir,compress="gzip",verbose=0,dry_run=0,owner=None,group=None,logger=None):"""Create a (possibly compressed) tar file from all the files under 'base_dir'. 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 'owner' and 'group' can be used to define an owner and a group for the archive that is being built. If not provided, the current owner and group will be used. The output tar file will be named 'base_name' + ".tar", possibly plus the appropriate compression extension (".gz", ".bz2", or ".xz"). Returns the output filename. """ifcompressisNone:tar_compression=''elif_ZLIB_SUPPORTEDandcompress=='gzip':tar_compression='gz'elif_BZ2_SUPPORTEDandcompress=='bzip2':tar_compression='bz2'elif_LZMA_SUPPORTEDandcompress=='xz':tar_compression='xz'else:raiseValueError("bad value for 'compress', or compression format not ""supported : {0}".format(compress))importtarfile# late import for breaking circular dependencycompress_ext='.'+tar_compressionifcompresselse''archive_name=base_name+'.tar'+compress_extarchive_dir=os.path.dirname(archive_name)ifarchive_dirandnotos.path.exists(archive_dir):ifloggerisnotNone:logger.info("creating %s",archive_dir)ifnotdry_run:os.makedirs(archive_dir)# creating the tarballifloggerisnotNone:logger.info('Creating tar archive')uid=_get_uid(owner)gid=_get_gid(group)def_set_uid_gid(tarinfo):ifgidisnotNone:tarinfo.gid=gidtarinfo.gname=groupifuidisnotNone:tarinfo.uid=uidtarinfo.uname=ownerreturntarinfoifnotdry_run:tar=tarfile.open(archive_name,'w|%s'%tar_compression)try:tar.add(base_dir,filter=_set_uid_gid)finally:tar.close()returnarchive_namedef_make_zipfile(base_name,base_dir,verbose=0,dry_run=0,logger=None):"""Create a zip file from all the files under 'base_dir'. The output zip file will be named 'base_name' + ".zip". Returns the name of the output zip file. """importzipfile# late import for breaking circular dependencyzip_filename=base_name+".zip"archive_dir=os.path.dirname(base_name)ifarchive_dirandnotos.path.exists(archive_dir):ifloggerisnotNone:logger.info("creating %s",archive_dir)ifnotdry_run:os.makedirs(archive_dir)ifloggerisnotNone:logger.info("creating '%s' and adding '%s' to it",zip_filename,base_dir)ifnotdry_run:withzipfile.ZipFile(zip_filename,"w",compression=zipfile.ZIP_DEFLATED)aszf:path=os.path.normpath(base_dir)ifpath!=os.curdir:zf.write(path,path)ifloggerisnotNone:logger.info("adding '%s'",path)fordirpath,dirnames,filenamesinos.walk(base_dir):fornameinsorted(dirnames):path=os.path.normpath(os.path.join(dirpath,name))zf.write(path,path)ifloggerisnotNone:logger.info("adding '%s'",path)fornameinfilenames:path=os.path.normpath(os.path.join(dirpath,name))ifos.path.isfile(path):zf.write(path,path)ifloggerisnotNone:logger.info("adding '%s'",path)returnzip_filename_ARCHIVE_FORMATS={'tar':(_make_tarball,[('compress',None)],"uncompressed tar file"),}if_ZLIB_SUPPORTED:_ARCHIVE_FORMATS['gztar']=(_make_tarball,[('compress','gzip')],"gzip'ed tar-file")_ARCHIVE_FORMATS['zip']=(_make_zipfile,[],"ZIP file")if_BZ2_SUPPORTED:_ARCHIVE_FORMATS['bztar']=(_make_tarball,[('compress','bzip2')],"bzip2'ed tar-file")if_LZMA_SUPPORTED:_ARCHIVE_FORMATS['xztar']=(_make_tarball,[('compress','xz')],"xz'ed tar-file")defget_archive_formats():"""Returns a list of supported formats for archiving and unarchiving. Each element of the returned sequence is a tuple (name, description) """formats=[(name,registry[2])forname,registryin_ARCHIVE_FORMATS.items()]formats.sort()returnformatsdefregister_archive_format(name,function,extra_args=None,description=''):"""Registers an archive format. name is the name of the format. function is the callable that will be used to create archives. If provided, extra_args is a sequence of (name, value) tuples that will be passed as arguments to the callable. description can be provided to describe the format, and will be returned by the get_archive_formats() function. """ifextra_argsisNone:extra_args=[]ifnotcallable(function):raiseTypeError('The %s object is not callable'%function)ifnotisinstance(extra_args,(tuple,list)):raiseTypeError('extra_args needs to be a sequence')forelementinextra_args:ifnotisinstance(element,(tuple,list))orlen(element)!=2:raiseTypeError('extra_args elements are : (arg_name, value)')_ARCHIVE_FORMATS[name]=(function,extra_args,description)defunregister_archive_format(name):del_ARCHIVE_FORMATS[name]defmake_archive(base_name,format,root_dir=None,base_dir=None,verbose=0,dry_run=0,owner=None,group=None,logger=None):"""Create an archive file (eg. zip or tar). 'base_name' is the name of the file to create, minus any format-specific extension; 'format' is the archive format: one of "zip", "tar", "gztar", "bztar", or "xztar". Or any other registered format. 'root_dir' is a directory that will be the root directory of the archive; ie. we typically chdir into 'root_dir' before creating the archive. 'base_dir' is the directory where we start archiving from; ie. 'base_dir' will be the common prefix of all files and directories in the archive. 'root_dir' and 'base_dir' both default to the current directory. Returns the name of the archive file. 'owner' and 'group' are used when creating a tar archive. By default, uses the current owner and group. """save_cwd=os.getcwd()ifroot_dirisnotNone:ifloggerisnotNone:logger.debug("changing into '%s'",root_dir)base_name=os.path.abspath(base_name)ifnotdry_run:os.chdir(root_dir)ifbase_dirisNone:base_dir=os.curdirkwargs={'dry_run':dry_run,'logger':logger}try:format_info=_ARCHIVE_FORMATS[format]exceptKeyError:raiseValueError("unknown archive format '%s'"%format)func=format_info[0]forarg,valinformat_info[1]:kwargs[arg]=valifformat!='zip':kwargs['owner']=ownerkwargs['group']=grouptry:filename=func(base_name,base_dir,**kwargs)finally:ifroot_dirisnotNone:ifloggerisnotNone:logger.debug("changing back to '%s'",save_cwd)os.chdir(save_cwd)returnfilenamedefget_unpack_formats():"""Returns a list of supported formats for unpacking. Each element of the returned sequence is a tuple (name, extensions, description) """formats=[(name,info[0],info[3])forname,infoin_UNPACK_FORMATS.items()]formats.sort()returnformatsdef_check_unpack_options(extensions,function,extra_args):"""Checks what gets registered as an unpacker."""# first make sure no other unpacker is registered for this extensionexisting_extensions={}forname,infoin_UNPACK_FORMATS.items():forextininfo[0]:existing_extensions[ext]=nameforextensioninextensions:ifextensioninexisting_extensions:msg='%s is already registered for "%s"'raiseRegistryError(msg%(extension,existing_extensions[extension]))ifnotcallable(function):raiseTypeError('The registered function must be a callable')defregister_unpack_format(name,extensions,function,extra_args=None,description=''):"""Registers an unpack format. `name` is the name of the format. `extensions` is a list of extensions corresponding to the format. `function` is the callable that will be used to unpack archives. The callable will receive archives to unpack. If it's unable to handle an archive, it needs to raise a ReadError exception. If provided, `extra_args` is a sequence of (name, value) tuples that will be passed as arguments to the callable. description can be provided to describe the format, and will be returned by the get_unpack_formats() function. """ifextra_argsisNone:extra_args=[]_check_unpack_options(extensions,function,extra_args)_UNPACK_FORMATS[name]=extensions,function,extra_args,descriptiondefunregister_unpack_format(name):"""Removes the pack format from the registry."""del_UNPACK_FORMATS[name]def_ensure_directory(path):"""Ensure that the parent directory of `path` exists"""dirname=os.path.dirname(path)ifnotos.path.isdir(dirname):os.makedirs(dirname)def_unpack_zipfile(filename,extract_dir):"""Unpack zip `filename` to `extract_dir` """importzipfile# late import for breaking circular dependencyifnotzipfile.is_zipfile(filename):raiseReadError("%s is not a zip file"%filename)zip=zipfile.ZipFile(filename)try:forinfoinzip.infolist():name=info.filename# don't extract absolute paths or ones with .. in themifname.startswith('/')or'..'inname:continuetarget=os.path.join(extract_dir,*name.split('/'))ifnottarget:continue_ensure_directory(target)ifnotname.endswith('/'):# filedata=zip.read(info.filename)f=open(target,'wb')try:f.write(data)finally:f.close()deldatafinally:zip.close()def_unpack_tarfile(filename,extract_dir):"""Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` """importtarfile# late import for breaking circular dependencytry:tarobj=tarfile.open(filename)excepttarfile.TarError:raiseReadError("%s is not a compressed or uncompressed tar file"%filename)try:tarobj.extractall(extract_dir)finally:tarobj.close()_UNPACK_FORMATS={'tar':(['.tar'],_unpack_tarfile,[],"uncompressed tar file"),'zip':(['.zip'],_unpack_zipfile,[],"ZIP file"),}if_ZLIB_SUPPORTED:_UNPACK_FORMATS['gztar']=(['.tar.gz','.tgz'],_unpack_tarfile,[],"gzip'ed tar-file")if_BZ2_SUPPORTED:_UNPACK_FORMATS['bztar']=(['.tar.bz2','.tbz2'],_unpack_tarfile,[],"bzip2'ed tar-file")if_LZMA_SUPPORTED:_UNPACK_FORMATS['xztar']=(['.tar.xz','.txz'],_unpack_tarfile,[],"xz'ed tar-file")def_find_unpack_format(filename):forname,infoin_UNPACK_FORMATS.items():forextensionininfo[0]:iffilename.endswith(extension):returnnamereturnNonedefunpack_archive(filename,extract_dir=None,format=None):"""Unpack an archive. `filename` is the name of the archive. `extract_dir` is the name of the target directory, where the archive is unpacked. If not provided, the current working directory is used. `format` is the archive format: one of "zip", "tar", "gztar", "bztar", or "xztar". Or any other registered format. If not provided, unpack_archive will use the filename extension and see if an unpacker was registered for that extension. In case none is found, a ValueError is raised. """ifextract_dirisNone:extract_dir=os.getcwd()ifformatisnotNone:try:format_info=_UNPACK_FORMATS[format]exceptKeyError:raiseValueError("Unknown unpack format '{0}'".format(format))func=format_info[1]func(filename,extract_dir,**dict(format_info[2]))else:# we need to look at the registered unpackers supported extensionsformat=_find_unpack_format(filename)ifformatisNone:raiseReadError("Unknown archive format '{0}'".format(filename))func=_UNPACK_FORMATS[format][1]kwargs=dict(_UNPACK_FORMATS[format][2])func(filename,extract_dir,**kwargs)ifhasattr(os,'statvfs'):__all__.append('disk_usage')_ntuple_diskusage=collections.namedtuple('usage','total used free')_ntuple_diskusage.total.__doc__='Total space in bytes'_ntuple_diskusage.used.__doc__='Used space in bytes'_ntuple_diskusage.free.__doc__='Free space in bytes'defdisk_usage(path):"""Return disk usage statistics about the given path. Returned value is a named tuple with attributes 'total', 'used' and 'free', which are the amount of total, used and free space, in bytes. """st=os.statvfs(path)free=st.f_bavail*st.f_frsizetotal=st.f_blocks*st.f_frsizeused=(st.f_blocks-st.f_bfree)*st.f_frsizereturn_ntuple_diskusage(total,used,free)elifos.name=='nt':importnt__all__.append('disk_usage')_ntuple_diskusage=collections.namedtuple('usage','total used free')defdisk_usage(path):"""Return disk usage statistics about the given path. Returned values is a named tuple with attributes 'total', 'used' and 'free', which are the amount of total, used and free space, in bytes. """total,free=nt._getdiskusage(path)used=total-freereturn_ntuple_diskusage(total,used,free)defchown(path,user=None,group=None):"""Change owner user and group of the given path. user and group can be the uid/gid or the user/group names, and in that case, they are converted to their respective uid/gid. """ifuserisNoneandgroupisNone:raiseValueError("user and/or group must be set")_user=user_group=group# -1 means don't change itifuserisNone:_user=-1# user can either be an int (the uid) or a string (the system username)elifisinstance(user,str):_user=_get_uid(user)if_userisNone:raiseLookupError("no such user: {!r}".format(user))ifgroupisNone:_group=-1elifnotisinstance(group,int):_group=_get_gid(group)if_groupisNone:raiseLookupError("no such group: {!r}".format(group))os.chown(path,_user,_group)defget_terminal_size(fallback=(80,24)):"""Get the size of the terminal window. For each of the two dimensions, the environment variable, COLUMNS and LINES respectively, is checked. If the variable is defined and the value is a positive integer, it is used. When COLUMNS or LINES is not defined, which is the common case, the terminal connected to sys.__stdout__ is queried by invoking os.get_terminal_size. If the terminal size cannot be successfully queried, either because the system doesn't support querying, or because we are not connected to a terminal, the value given in fallback parameter is used. Fallback defaults to (80, 24) which is the default size used by many terminal emulators. The value returned is a named tuple of type os.terminal_size. """# columns, lines are the working valuestry:columns=int(os.environ['COLUMNS'])except(KeyError,ValueError):columns=0try:lines=int(os.environ['LINES'])except(KeyError,ValueError):lines=0# only query if necessaryifcolumns<=0orlines<=0:try:size=os.get_terminal_size(sys.__stdout__.fileno())except(AttributeError,ValueError,OSError):# stdout is None, closed, detached, or not a terminal, or# os.get_terminal_size() is unsupportedsize=os.terminal_size(fallback)ifcolumns<=0:columns=size.columnsiflines<=0:lines=size.linesreturnos.terminal_size((columns,lines))defwhich(cmd,mode=os.F_OK|os.X_OK,path=None):"""Given a command, mode, and a PATH string, return the path which conforms to the given mode on the PATH, or None if there is no such file. `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result of os.environ.get("PATH"), or can be overridden with a custom search path. """# Check that a given file can be accessed with the correct mode.# Additionally check that `file` is not a directory, as on Windows# directories pass the os.access check.def_access_check(fn,mode):return(os.path.exists(fn)andos.access(fn,mode)andnotos.path.isdir(fn))# If we're given a path with a directory part, look it up directly rather# than referring to PATH directories. This includes checking relative to the# current directory, e.g. ./scriptifos.path.dirname(cmd):if_access_check(cmd,mode):returncmdreturnNoneifpathisNone:path=os.environ.get("PATH",os.defpath)ifnotpath:returnNonepath=path.split(os.pathsep)ifsys.platform=="win32":# The current directory takes precedence on Windows.ifnotos.curdirinpath:path.insert(0,os.curdir)# PATHEXT is necessary to check on Windows.pathext=os.environ.get("PATHEXT","").split(os.pathsep)# See if the given file matches any of the expected path extensions.# This will allow us to short circuit when given "python.exe".# If it does match, only test that one, otherwise we have to try# others.ifany(cmd.lower().endswith(ext.lower())forextinpathext):files=[cmd]else:files=[cmd+extforextinpathext]else:# On other platforms you don't have things like PATHEXT to tell you# what file suffixes are executable, so just pass on cmd as-is.files=[cmd]seen=set()fordirinpath:normdir=os.path.normcase(dir)ifnotnormdirinseen:seen.add(normdir)forthefileinfiles:name=os.path.join(dir,thefile)if_access_check(name,mode):returnnamereturnNone