aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2021-05-07 14:20:23 +0200
committerFlorian Dold <florian@dold.me>2021-05-07 14:20:23 +0200
commit0a654d8b17e1b77f418cd61797ebf7f083f1cd93 (patch)
treead088b4ce620f4be406ba6e218ca40fbddf59a5a
parentd8ec4d322b5ecb23e992dfa6061dc015c6b4abbf (diff)
downloadwww_shared-0a654d8b17e1b77f418cd61797ebf7f083f1cd93.tar.gz
www_shared-0a654d8b17e1b77f418cd61797ebf7f083f1cd93.zip
backport copytree
-rw-r--r--sitegen/myshutil.py1468
-rw-r--r--sitegen/site.py2
2 files changed, 1469 insertions, 1 deletions
diff --git a/sitegen/myshutil.py b/sitegen/myshutil.py
new file mode 100644
index 0000000..9ab702e
--- /dev/null
+++ b/sitegen/myshutil.py
@@ -0,0 +1,1468 @@
1"""
2Backported shutil library from Python 3.9
3
4(C) 2001-2021 Python Software Foundation
5
6This file is licensed under the PSF license agreement.
7"""
8
9
10"""Utility functions for copying and archiving files and directory trees.
11
12XXX The functions here don't copy the resource fork or other metadata on Mac.
13
14"""
15
16import os
17import sys
18import stat
19import fnmatch
20import collections
21import errno
22
23try:
24 import zlib
25 del zlib
26 _ZLIB_SUPPORTED = True
27except ImportError:
28 _ZLIB_SUPPORTED = False
29
30try:
31 import bz2
32 del bz2
33 _BZ2_SUPPORTED = True
34except ImportError:
35 _BZ2_SUPPORTED = False
36
37try:
38 import lzma
39 del lzma
40 _LZMA_SUPPORTED = True
41except ImportError:
42 _LZMA_SUPPORTED = False
43
44try:
45 from pwd import getpwnam
46except ImportError:
47 getpwnam = None
48
49try:
50 from grp import getgrnam
51except ImportError:
52 getgrnam = None
53
54_WINDOWS = os.name == 'nt'
55posix = nt = None
56if os.name == 'posix':
57 import posix
58elif _WINDOWS:
59 import nt
60
61COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
62_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
63_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
64
65# CMD defaults in Windows 10
66_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
67
68__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
69 "copytree", "move", "rmtree", "Error", "SpecialFileError",
70 "ExecError", "make_archive", "get_archive_formats",
71 "register_archive_format", "unregister_archive_format",
72 "get_unpack_formats", "register_unpack_format",
73 "unregister_unpack_format", "unpack_archive",
74 "ignore_patterns", "chown", "which", "get_terminal_size",
75 "SameFileError"]
76 # disk_usage is added later, if available on the platform
77
78class Error(OSError):
79 pass
80
81class SameFileError(Error):
82 """Raised when source and destination are the same file."""
83
84class SpecialFileError(OSError):
85 """Raised when trying to do a kind of operation (e.g. copying) which is
86 not supported on a special file (e.g. a named pipe)"""
87
88class ExecError(OSError):
89 """Raised when a command could not be executed"""
90
91class ReadError(OSError):
92 """Raised when an archive cannot be read"""
93
94class RegistryError(Exception):
95 """Raised when a registry operation with the archiving
96 and unpacking registries fails"""
97
98class _GiveupOnFastCopy(Exception):
99 """Raised as a signal to fallback on using raw read()/write()
100 file copy when fast-copy functions fail to do so.
101 """
102
103def _fastcopy_fcopyfile(fsrc, fdst, flags):
104 """Copy a regular file content or metadata by using high-performance
105 fcopyfile(3) syscall (macOS).
106 """
107 try:
108 infd = fsrc.fileno()
109 outfd = fdst.fileno()
110 except Exception as err:
111 raise _GiveupOnFastCopy(err) # not a regular file
112
113 try:
114 posix._fcopyfile(infd, outfd, flags)
115 except OSError as err:
116 err.filename = fsrc.name
117 err.filename2 = fdst.name
118 if err.errno in {errno.EINVAL, errno.ENOTSUP}:
119 raise _GiveupOnFastCopy(err)
120 else:
121 raise err from None
122
123def _fastcopy_sendfile(fsrc, fdst):
124 """Copy data from one regular mmap-like fd to another by using
125 high-performance sendfile(2) syscall.
126 This should work on Linux >= 2.6.33 only.
127 """
128 # Note: copyfileobj() is left alone in order to not introduce any
129 # unexpected breakage. Possible risks by using zero-copy calls
130 # in copyfileobj() are:
131 # - fdst cannot be open in "a"(ppend) mode
132 # - fsrc and fdst may be open in "t"(ext) mode
133 # - fsrc may be a BufferedReader (which hides unread data in a buffer),
134 # GzipFile (which decompresses data), HTTPResponse (which decodes
135 # chunks).
136 # - possibly others (e.g. encrypted fs/partition?)
137 global _USE_CP_SENDFILE
138 try:
139 infd = fsrc.fileno()
140 outfd = fdst.fileno()
141 except Exception as err:
142 raise _GiveupOnFastCopy(err) # not a regular file
143
144 # Hopefully the whole file will be copied in a single call.
145 # sendfile() is called in a loop 'till EOF is reached (0 return)
146 # so a bufsize smaller or bigger than the actual file size
147 # should not make any difference, also in case the file content
148 # changes while being copied.
149 try:
150 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB
151 except OSError:
152 blocksize = 2 ** 27 # 128MiB
153 # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
154 # see bpo-38319.
155 if sys.maxsize < 2 ** 32:
156 blocksize = min(blocksize, 2 ** 30)
157
158 offset = 0
159 while True:
160 try:
161 sent = os.sendfile(outfd, infd, offset, blocksize)
162 except OSError as err:
163 # ...in oder to have a more informative exception.
164 err.filename = fsrc.name
165 err.filename2 = fdst.name
166
167 if err.errno == errno.ENOTSOCK:
168 # sendfile() on this platform (probably Linux < 2.6.33)
169 # does not support copies between regular files (only
170 # sockets).
171 _USE_CP_SENDFILE = False
172 raise _GiveupOnFastCopy(err)
173
174 if err.errno == errno.ENOSPC: # filesystem is full
175 raise err from None
176
177 # Give up on first call and if no data was copied.
178 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
179 raise _GiveupOnFastCopy(err)
180
181 raise err
182 else:
183 if sent == 0:
184 break # EOF
185 offset += sent
186
187def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
188 """readinto()/memoryview() based variant of copyfileobj().
189 *fsrc* must support readinto() method and both files must be
190 open in binary mode.
191 """
192 # Localize variable access to minimize overhead.
193 fsrc_readinto = fsrc.readinto
194 fdst_write = fdst.write
195 with memoryview(bytearray(length)) as mv:
196 while True:
197 n = fsrc_readinto(mv)
198 if not n:
199 break
200 elif n < length:
201 with mv[:n] as smv:
202 fdst.write(smv)
203 else:
204 fdst_write(mv)
205
206def copyfileobj(fsrc, fdst, length=0):
207 """copy data from file-like object fsrc to file-like object fdst"""
208 # Localize variable access to minimize overhead.
209 if not length:
210 length = COPY_BUFSIZE
211 fsrc_read = fsrc.read
212 fdst_write = fdst.write
213 while True:
214 buf = fsrc_read(length)
215 if not buf:
216 break
217 fdst_write(buf)
218
219def _samefile(src, dst):
220 # Macintosh, Unix.
221 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
222 try:
223 return os.path.samestat(src.stat(), os.stat(dst))
224 except OSError:
225 return False
226
227 if hasattr(os.path, 'samefile'):
228 try:
229 return os.path.samefile(src, dst)
230 except OSError:
231 return False
232
233 # All other platforms: check for same pathname.
234 return (os.path.normcase(os.path.abspath(src)) ==
235 os.path.normcase(os.path.abspath(dst)))
236
237def _stat(fn):
238 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
239
240def _islink(fn):
241 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
242
243def copyfile(src, dst, *, follow_symlinks=True):
244 """Copy data from src to dst in the most efficient way possible.
245
246 If follow_symlinks is not set and src is a symbolic link, a new
247 symlink will be created instead of copying the file it points to.
248
249 """
250 sys.audit("shutil.copyfile", src, dst)
251
252 if _samefile(src, dst):
253 raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
254
255 file_size = 0
256 for i, fn in enumerate([src, dst]):
257 try:
258 st = _stat(fn)
259 except OSError:
260 # File most likely does not exist
261 pass
262 else:
263 # XXX What about other special files? (sockets, devices...)
264 if stat.S_ISFIFO(st.st_mode):
265 fn = fn.path if isinstance(fn, os.DirEntry) else fn
266 raise SpecialFileError("`%s` is a named pipe" % fn)
267 if _WINDOWS and i == 0:
268 file_size = st.st_size
269
270 if not follow_symlinks and _islink(src):
271 os.symlink(os.readlink(src), dst)
272 else:
273 with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
274 # macOS
275 if _HAS_FCOPYFILE:
276 try:
277 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
278 return dst
279 except _GiveupOnFastCopy:
280 pass
281 # Linux
282 elif _USE_CP_SENDFILE:
283 try:
284 _fastcopy_sendfile(fsrc, fdst)
285 return dst
286 except _GiveupOnFastCopy:
287 pass
288 # Windows, see:
289 # https://github.com/python/cpython/pull/7160#discussion_r195405230
290 elif _WINDOWS and file_size > 0:
291 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
292 return dst
293
294 copyfileobj(fsrc, fdst)
295
296 return dst
297
298def copymode(src, dst, *, follow_symlinks=True):
299 """Copy mode bits from src to dst.
300
301 If follow_symlinks is not set, symlinks aren't followed if and only
302 if both `src` and `dst` are symlinks. If `lchmod` isn't available
303 (e.g. Linux) this method does nothing.
304
305 """
306 sys.audit("shutil.copymode", src, dst)
307
308 if not follow_symlinks and _islink(src) and os.path.islink(dst):
309 if hasattr(os, 'lchmod'):
310 stat_func, chmod_func = os.lstat, os.lchmod
311 else:
312 return
313 else:
314 stat_func, chmod_func = _stat, os.chmod
315
316 st = stat_func(src)
317 chmod_func(dst, stat.S_IMODE(st.st_mode))
318
319if hasattr(os, 'listxattr'):
320 def _copyxattr(src, dst, *, follow_symlinks=True):
321 """Copy extended filesystem attributes from `src` to `dst`.
322
323 Overwrite existing attributes.
324
325 If `follow_symlinks` is false, symlinks won't be followed.
326
327 """
328
329 try:
330 names = os.listxattr(src, follow_symlinks=follow_symlinks)
331 except OSError as e:
332 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
333 raise
334 return
335 for name in names:
336 try:
337 value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
338 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
339 except OSError as e:
340 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
341 errno.EINVAL):
342 raise
343else:
344 def _copyxattr(*args, **kwargs):
345 pass
346
347def copystat(src, dst, *, follow_symlinks=True):
348 """Copy file metadata
349
350 Copy the permission bits, last access time, last modification time, and
351 flags from `src` to `dst`. On Linux, copystat() also copies the "extended
352 attributes" where possible. The file contents, owner, and group are
353 unaffected. `src` and `dst` are path-like objects or path names given as
354 strings.
355
356 If the optional flag `follow_symlinks` is not set, symlinks aren't
357 followed if and only if both `src` and `dst` are symlinks.
358 """
359 sys.audit("shutil.copystat", src, dst)
360
361 def _nop(*args, ns=None, follow_symlinks=None):
362 pass
363
364 # follow symlinks (aka don't not follow symlinks)
365 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
366 if follow:
367 # use the real function if it exists
368 def lookup(name):
369 return getattr(os, name, _nop)
370 else:
371 # use the real function only if it exists
372 # *and* it supports follow_symlinks
373 def lookup(name):
374 fn = getattr(os, name, _nop)
375 if fn in os.supports_follow_symlinks:
376 return fn
377 return _nop
378
379 if isinstance(src, os.DirEntry):
380 st = src.stat(follow_symlinks=follow)
381 else:
382 st = lookup("stat")(src, follow_symlinks=follow)
383 mode = stat.S_IMODE(st.st_mode)
384 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
385 follow_symlinks=follow)
386 # We must copy extended attributes before the file is (potentially)
387 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
388 _copyxattr(src, dst, follow_symlinks=follow)
389 try:
390 lookup("chmod")(dst, mode, follow_symlinks=follow)
391 except NotImplementedError:
392 # if we got a NotImplementedError, it's because
393 # * follow_symlinks=False,
394 # * lchown() is unavailable, and
395 # * either
396 # * fchownat() is unavailable or
397 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
398 # (it returned ENOSUP.)
399 # therefore we're out of options--we simply cannot chown the
400 # symlink. give up, suppress the error.
401 # (which is what shutil always did in this circumstance.)
402 pass
403 if hasattr(st, 'st_flags'):
404 try:
405 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
406 except OSError as why:
407 for err in 'EOPNOTSUPP', 'ENOTSUP':
408 if hasattr(errno, err) and why.errno == getattr(errno, err):
409 break
410 else:
411 raise
412
413def copy(src, dst, *, follow_symlinks=True):
414 """Copy data and mode bits ("cp src dst"). Return the file's destination.
415
416 The destination may be a directory.
417
418 If follow_symlinks is false, symlinks won't be followed. This
419 resembles GNU's "cp -P src dst".
420
421 If source and destination are the same file, a SameFileError will be
422 raised.
423
424 """
425 if os.path.isdir(dst):
426 dst = os.path.join(dst, os.path.basename(src))
427 copyfile(src, dst, follow_symlinks=follow_symlinks)
428 copymode(src, dst, follow_symlinks=follow_symlinks)
429 return dst
430
431def copy2(src, dst, *, follow_symlinks=True):
432 """Copy data and metadata. Return the file's destination.
433
434 Metadata is copied with copystat(). Please see the copystat function
435 for more information.
436
437 The destination may be a directory.
438
439 If follow_symlinks is false, symlinks won't be followed. This
440 resembles GNU's "cp -P src dst".
441 """
442 if os.path.isdir(dst):
443 dst = os.path.join(dst, os.path.basename(src))
444 copyfile(src, dst, follow_symlinks=follow_symlinks)
445 copystat(src, dst, follow_symlinks=follow_symlinks)
446 return dst
447
448def ignore_patterns(*patterns):
449 """Function that can be used as copytree() ignore parameter.
450
451 Patterns is a sequence of glob-style patterns
452 that are used to exclude files"""
453 def _ignore_patterns(path, names):
454 ignored_names = []
455 for pattern in patterns:
456 ignored_names.extend(fnmatch.filter(names, pattern))
457 return set(ignored_names)
458 return _ignore_patterns
459
460def _copytree(entries, src, dst, symlinks, ignore, copy_function,
461 ignore_dangling_symlinks, dirs_exist_ok=False):
462 if ignore is not None:
463 ignored_names = ignore(os.fspath(src), [x.name for x in entries])
464 else:
465 ignored_names = set()
466
467 os.makedirs(dst, exist_ok=dirs_exist_ok)
468 errors = []
469 use_srcentry = copy_function is copy2 or copy_function is copy
470
471 for srcentry in entries:
472 if srcentry.name in ignored_names:
473 continue
474 srcname = os.path.join(src, srcentry.name)
475 dstname = os.path.join(dst, srcentry.name)
476 srcobj = srcentry if use_srcentry else srcname
477 try:
478 is_symlink = srcentry.is_symlink()
479 if is_symlink and os.name == 'nt':
480 # Special check for directory junctions, which appear as
481 # symlinks but we want to recurse.
482 lstat = srcentry.stat(follow_symlinks=False)
483 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
484 is_symlink = False
485 if is_symlink:
486 linkto = os.readlink(srcname)
487 if symlinks:
488 # We can't just leave it to `copy_function` because legacy
489 # code with a custom `copy_function` may rely on copytree
490 # doing the right thing.
491 os.symlink(linkto, dstname)
492 copystat(srcobj, dstname, follow_symlinks=not symlinks)
493 else:
494 # ignore dangling symlink if the flag is on
495 if not os.path.exists(linkto) and ignore_dangling_symlinks:
496 continue
497 # otherwise let the copy occur. copy2 will raise an error
498 if srcentry.is_dir():
499 copytree(srcobj, dstname, symlinks, ignore,
500 copy_function, dirs_exist_ok=dirs_exist_ok)
501 else:
502 copy_function(srcobj, dstname)
503 elif srcentry.is_dir():
504 copytree(srcobj, dstname, symlinks, ignore, copy_function,
505 dirs_exist_ok=dirs_exist_ok)
506 else:
507 # Will raise a SpecialFileError for unsupported file types
508 copy_function(srcobj, dstname)
509 # catch the Error from the recursive copytree so that we can
510 # continue with other files
511 except Error as err:
512 errors.extend(err.args[0])
513 except OSError as why:
514 errors.append((srcname, dstname, str(why)))
515 try:
516 copystat(src, dst)
517 except OSError as why:
518 # Copying file access times may fail on Windows
519 if getattr(why, 'winerror', None) is None:
520 errors.append((src, dst, str(why)))
521 if errors:
522 raise Error(errors)
523 return dst
524
525def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
526 ignore_dangling_symlinks=False, dirs_exist_ok=False):
527 """Recursively copy a directory tree and return the destination directory.
528
529 dirs_exist_ok dictates whether to raise an exception in case dst or any
530 missing parent directory already exists.
531
532 If exception(s) occur, an Error is raised with a list of reasons.
533
534 If the optional symlinks flag is true, symbolic links in the
535 source tree result in symbolic links in the destination tree; if
536 it is false, the contents of the files pointed to by symbolic
537 links are copied. If the file pointed by the symlink doesn't
538 exist, an exception will be added in the list of errors raised in
539 an Error exception at the end of the copy process.
540
541 You can set the optional ignore_dangling_symlinks flag to true if you
542 want to silence this exception. Notice that this has no effect on
543 platforms that don't support os.symlink.
544
545 The optional ignore argument is a callable. If given, it
546 is called with the `src` parameter, which is the directory
547 being visited by copytree(), and `names` which is the list of
548 `src` contents, as returned by os.listdir():
549
550 callable(src, names) -> ignored_names
551
552 Since copytree() is called recursively, the callable will be
553 called once for each directory that is copied. It returns a
554 list of names relative to the `src` directory that should
555 not be copied.
556
557 The optional copy_function argument is a callable that will be used
558 to copy each file. It will be called with the source path and the
559 destination path as arguments. By default, copy2() is used, but any
560 function that supports the same signature (like copy()) can be used.
561
562 """
563 sys.audit("shutil.copytree", src, dst)
564 with os.scandir(src) as itr:
565 entries = list(itr)
566 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
567 ignore=ignore, copy_function=copy_function,
568 ignore_dangling_symlinks=ignore_dangling_symlinks,
569 dirs_exist_ok=dirs_exist_ok)
570
571if hasattr(os.stat_result, 'st_file_attributes'):
572 # Special handling for directory junctions to make them behave like
573 # symlinks for shutil.rmtree, since in general they do not appear as
574 # regular links.
575 def _rmtree_isdir(entry):
576 try:
577 st = entry.stat(follow_symlinks=False)
578 return (stat.S_ISDIR(st.st_mode) and not
579 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
580 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
581 except OSError:
582 return False
583
584 def _rmtree_islink(path):
585 try:
586 st = os.lstat(path)
587 return (stat.S_ISLNK(st.st_mode) or
588 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
589 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
590 except OSError:
591 return False
592else:
593 def _rmtree_isdir(entry):
594 try:
595 return entry.is_dir(follow_symlinks=False)
596 except OSError:
597 return False
598
599 def _rmtree_islink(path):
600 return os.path.islink(path)
601
602# version vulnerable to race conditions
603def _rmtree_unsafe(path, onerror):
604 try:
605 with os.scandir(path) as scandir_it:
606 entries = list(scandir_it)
607 except OSError:
608 onerror(os.scandir, path, sys.exc_info())
609 entries = []
610 for entry in entries:
611 fullname = entry.path
612 if _rmtree_isdir(entry):
613 try:
614 if entry.is_symlink():
615 # This can only happen if someone replaces
616 # a directory with a symlink after the call to
617 # os.scandir or entry.is_dir above.
618 raise OSError("Cannot call rmtree on a symbolic link")
619 except OSError:
620 onerror(os.path.islink, fullname, sys.exc_info())
621 continue
622 _rmtree_unsafe(fullname, onerror)
623 else:
624 try:
625 os.unlink(fullname)
626 except OSError:
627 onerror(os.unlink, fullname, sys.exc_info())
628 try:
629 os.rmdir(path)
630 except OSError:
631 onerror(os.rmdir, path, sys.exc_info())
632
633# Version using fd-based APIs to protect against races
634def _rmtree_safe_fd(topfd, path, onerror):
635 try:
636 with os.scandir(topfd) as scandir_it:
637 entries = list(scandir_it)
638 except OSError as err:
639 err.filename = path
640 onerror(os.scandir, path, sys.exc_info())
641 return
642 for entry in entries:
643 fullname = os.path.join(path, entry.name)
644 try:
645 is_dir = entry.is_dir(follow_symlinks=False)
646 except OSError:
647 is_dir = False
648 else:
649 if is_dir:
650 try:
651 orig_st = entry.stat(follow_symlinks=False)
652 is_dir = stat.S_ISDIR(orig_st.st_mode)
653 except OSError:
654 onerror(os.lstat, fullname, sys.exc_info())
655 continue
656 if is_dir:
657 try:
658 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
659 except OSError:
660 onerror(os.open, fullname, sys.exc_info())
661 else:
662 try:
663 if os.path.samestat(orig_st, os.fstat(dirfd)):
664 _rmtree_safe_fd(dirfd, fullname, onerror)
665 try:
666 os.rmdir(entry.name, dir_fd=topfd)
667 except OSError:
668 onerror(os.rmdir, fullname, sys.exc_info())
669 else:
670 try:
671 # This can only happen if someone replaces
672 # a directory with a symlink after the call to
673 # os.scandir or stat.S_ISDIR above.
674 raise OSError("Cannot call rmtree on a symbolic "
675 "link")
676 except OSError:
677 onerror(os.path.islink, fullname, sys.exc_info())
678 finally:
679 os.close(dirfd)
680 else:
681 try:
682 os.unlink(entry.name, dir_fd=topfd)
683 except OSError:
684 onerror(os.unlink, fullname, sys.exc_info())
685
686_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
687 os.supports_dir_fd and
688 os.scandir in os.supports_fd and
689 os.stat in os.supports_follow_symlinks)
690
691def rmtree(path, ignore_errors=False, onerror=None):
692 """Recursively delete a directory tree.
693
694 If ignore_errors is set, errors are ignored; otherwise, if onerror
695 is set, it is called to handle the error with arguments (func,
696 path, exc_info) where func is platform and implementation dependent;
697 path is the argument to that function that caused it to fail; and
698 exc_info is a tuple returned by sys.exc_info(). If ignore_errors
699 is false and onerror is None, an exception is raised.
700
701 """
702 sys.audit("shutil.rmtree", path)
703 if ignore_errors:
704 def onerror(*args):
705 pass
706 elif onerror is None:
707 def onerror(*args):
708 raise
709 if _use_fd_functions:
710 # While the unsafe rmtree works fine on bytes, the fd based does not.
711 if isinstance(path, bytes):
712 path = os.fsdecode(path)
713 # Note: To guard against symlink races, we use the standard
714 # lstat()/open()/fstat() trick.
715 try:
716 orig_st = os.lstat(path)
717 except Exception:
718 onerror(os.lstat, path, sys.exc_info())
719 return
720 try:
721 fd = os.open(path, os.O_RDONLY)
722 except Exception:
723 onerror(os.open, path, sys.exc_info())
724 return
725 try:
726 if os.path.samestat(orig_st, os.fstat(fd)):
727 _rmtree_safe_fd(fd, path, onerror)
728 try:
729 os.rmdir(path)
730 except OSError:
731 onerror(os.rmdir, path, sys.exc_info())
732 else:
733 try:
734 # symlinks to directories are forbidden, see bug #1669
735 raise OSError("Cannot call rmtree on a symbolic link")
736 except OSError:
737 onerror(os.path.islink, path, sys.exc_info())
738 finally:
739 os.close(fd)
740 else:
741 try:
742 if _rmtree_islink(path):
743 # symlinks to directories are forbidden, see bug #1669
744 raise OSError("Cannot call rmtree on a symbolic link")
745 except OSError:
746 onerror(os.path.islink, path, sys.exc_info())
747 # can't continue even if onerror hook returns
748 return
749 return _rmtree_unsafe(path, onerror)
750
751# Allow introspection of whether or not the hardening against symlink
752# attacks is supported on the current platform
753rmtree.avoids_symlink_attacks = _use_fd_functions
754
755def _basename(path):
756 """A basename() variant which first strips the trailing slash, if present.
757 Thus we always get the last component of the path, even for directories.
758
759 path: Union[PathLike, str]
760
761 e.g.
762 >>> os.path.basename('/bar/foo')
763 'foo'
764 >>> os.path.basename('/bar/foo/')
765 ''
766 >>> _basename('/bar/foo/')
767 'foo'
768 """
769 path = os.fspath(path)
770 sep = os.path.sep + (os.path.altsep or '')
771 return os.path.basename(path.rstrip(sep))
772
773def move(src, dst, copy_function=copy2):
774 """Recursively move a file or directory to another location. This is
775 similar to the Unix "mv" command. Return the file or directory's
776 destination.
777
778 If the destination is a directory or a symlink to a directory, the source
779 is moved inside the directory. The destination path must not already
780 exist.
781
782 If the destination already exists but is not a directory, it may be
783 overwritten depending on os.rename() semantics.
784
785 If the destination is on our current filesystem, then rename() is used.
786 Otherwise, src is copied to the destination and then removed. Symlinks are
787 recreated under the new name if os.rename() fails because of cross
788 filesystem renames.
789
790 The optional `copy_function` argument is a callable that will be used
791 to copy the source or it will be delegated to `copytree`.
792 By default, copy2() is used, but any function that supports the same
793 signature (like copy()) can be used.
794
795 A lot more could be done here... A look at a mv.c shows a lot of
796 the issues this implementation glosses over.
797
798 """
799 sys.audit("shutil.move", src, dst)
800 real_dst = dst
801 if os.path.isdir(dst):
802 if _samefile(src, dst):
803 # We might be on a case insensitive filesystem,
804 # perform the rename anyway.
805 os.rename(src, dst)
806 return
807
808 # Using _basename instead of os.path.basename is important, as we must
809 # ignore any trailing slash to avoid the basename returning ''
810 real_dst = os.path.join(dst, _basename(src))
811
812 if os.path.exists(real_dst):
813 raise Error("Destination path '%s' already exists" % real_dst)
814 try:
815 os.rename(src, real_dst)
816 except OSError:
817 if os.path.islink(src):
818 linkto = os.readlink(src)
819 os.symlink(linkto, real_dst)
820 os.unlink(src)
821 elif os.path.isdir(src):
822 if _destinsrc(src, dst):
823 raise Error("Cannot move a directory '%s' into itself"
824 " '%s'." % (src, dst))
825 if (_is_immutable(src)
826 or (not os.access(src, os.W_OK) and os.listdir(src)
827 and sys.platform == 'darwin')):
828 raise PermissionError("Cannot move the non-empty directory "
829 "'%s': Lacking write permission to '%s'."
830 % (src, src))
831 copytree(src, real_dst, copy_function=copy_function,
832 symlinks=True)
833 rmtree(src)
834 else:
835 copy_function(src, real_dst)
836 os.unlink(src)
837 return real_dst
838
839def _destinsrc(src, dst):
840 src = os.path.abspath(src)
841 dst = os.path.abspath(dst)
842 if not src.endswith(os.path.sep):
843 src += os.path.sep
844 if not dst.endswith(os.path.sep):
845 dst += os.path.sep
846 return dst.startswith(src)
847
848def _is_immutable(src):
849 st = _stat(src)
850 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
851 return hasattr(st, 'st_flags') and st.st_flags in immutable_states
852
853def _get_gid(name):
854 """Returns a gid, given a group name."""
855 if getgrnam is None or name is None:
856 return None
857 try:
858 result = getgrnam(name)
859 except KeyError:
860 result = None
861 if result is not None:
862 return result[2]
863 return None
864
865def _get_uid(name):
866 """Returns an uid, given a user name."""
867 if getpwnam is None or name is None:
868 return None
869 try:
870 result = getpwnam(name)
871 except KeyError:
872 result = None
873 if result is not None:
874 return result[2]
875 return None
876
877def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
878 owner=None, group=None, logger=None):
879 """Create a (possibly compressed) tar file from all the files under
880 'base_dir'.
881
882 'compress' must be "gzip" (the default), "bzip2", "xz", or None.
883
884 'owner' and 'group' can be used to define an owner and a group for the
885 archive that is being built. If not provided, the current owner and group
886 will be used.
887
888 The output tar file will be named 'base_name' + ".tar", possibly plus
889 the appropriate compression extension (".gz", ".bz2", or ".xz").
890
891 Returns the output filename.
892 """
893 if compress is None:
894 tar_compression = ''
895 elif _ZLIB_SUPPORTED and compress == 'gzip':
896 tar_compression = 'gz'
897 elif _BZ2_SUPPORTED and compress == 'bzip2':
898 tar_compression = 'bz2'
899 elif _LZMA_SUPPORTED and compress == 'xz':
900 tar_compression = 'xz'
901 else:
902 raise ValueError("bad value for 'compress', or compression format not "
903 "supported : {0}".format(compress))
904
905 import tarfile # late import for breaking circular dependency
906
907 compress_ext = '.' + tar_compression if compress else ''
908 archive_name = base_name + '.tar' + compress_ext
909 archive_dir = os.path.dirname(archive_name)
910
911 if archive_dir and not os.path.exists(archive_dir):
912 if logger is not None:
913 logger.info("creating %s", archive_dir)
914 if not dry_run:
915 os.makedirs(archive_dir)
916
917 # creating the tarball
918 if logger is not None:
919 logger.info('Creating tar archive')
920
921 uid = _get_uid(owner)
922 gid = _get_gid(group)
923
924 def _set_uid_gid(tarinfo):
925 if gid is not None:
926 tarinfo.gid = gid
927 tarinfo.gname = group
928 if uid is not None:
929 tarinfo.uid = uid
930 tarinfo.uname = owner
931 return tarinfo
932
933 if not dry_run:
934 tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
935 try:
936 tar.add(base_dir, filter=_set_uid_gid)
937 finally:
938 tar.close()
939
940 return archive_name
941
942def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
943 """Create a zip file from all the files under 'base_dir'.
944
945 The output zip file will be named 'base_name' + ".zip". Returns the
946 name of the output zip file.
947 """
948 import zipfile # late import for breaking circular dependency
949
950 zip_filename = base_name + ".zip"
951 archive_dir = os.path.dirname(base_name)
952
953 if archive_dir and not os.path.exists(archive_dir):
954 if logger is not None:
955 logger.info("creating %s", archive_dir)
956 if not dry_run:
957 os.makedirs(archive_dir)
958
959 if logger is not None:
960 logger.info("creating '%s' and adding '%s' to it",
961 zip_filename, base_dir)
962
963 if not dry_run:
964 with zipfile.ZipFile(zip_filename, "w",
965 compression=zipfile.ZIP_DEFLATED) as zf:
966 path = os.path.normpath(base_dir)
967 if path != os.curdir:
968 zf.write(path, path)
969 if logger is not None:
970 logger.info("adding '%s'", path)
971 for dirpath, dirnames, filenames in os.walk(base_dir):
972 for name in sorted(dirnames):
973 path = os.path.normpath(os.path.join(dirpath, name))
974 zf.write(path, path)
975 if logger is not None:
976 logger.info("adding '%s'", path)
977 for name in filenames:
978 path = os.path.normpath(os.path.join(dirpath, name))
979 if os.path.isfile(path):
980 zf.write(path, path)
981 if logger is not None:
982 logger.info("adding '%s'", path)
983
984 return zip_filename
985
986_ARCHIVE_FORMATS = {
987 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),
988}
989
990if _ZLIB_SUPPORTED:
991 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
992 "gzip'ed tar-file")
993 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file")
994
995if _BZ2_SUPPORTED:
996 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
997 "bzip2'ed tar-file")
998
999if _LZMA_SUPPORTED:
1000 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
1001 "xz'ed tar-file")
1002
1003def get_archive_formats():
1004 """Returns a list of supported formats for archiving and unarchiving.
1005
1006 Each element of the returned sequence is a tuple (name, description)
1007 """
1008 formats = [(name, registry[2]) for name, registry in
1009 _ARCHIVE_FORMATS.items()]
1010 formats.sort()
1011 return formats
1012
1013def register_archive_format(name, function, extra_args=None, description=''):
1014 """Registers an archive format.
1015
1016 name is the name of the format. function is the callable that will be
1017 used to create archives. If provided, extra_args is a sequence of
1018 (name, value) tuples that will be passed as arguments to the callable.
1019 description can be provided to describe the format, and will be returned
1020 by the get_archive_formats() function.
1021 """
1022 if extra_args is None:
1023 extra_args = []
1024 if not callable(function):
1025 raise TypeError('The %s object is not callable' % function)
1026 if not isinstance(extra_args, (tuple, list)):
1027 raise TypeError('extra_args needs to be a sequence')
1028 for element in extra_args:
1029 if not isinstance(element, (tuple, list)) or len(element) !=2:
1030 raise TypeError('extra_args elements are : (arg_name, value)')
1031
1032 _ARCHIVE_FORMATS[name] = (function, extra_args, description)
1033
1034def unregister_archive_format(name):
1035 del _ARCHIVE_FORMATS[name]
1036
1037def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
1038 dry_run=0, owner=None, group=None, logger=None):
1039 """Create an archive file (eg. zip or tar).
1040
1041 'base_name' is the name of the file to create, minus any format-specific
1042 extension; 'format' is the archive format: one of "zip", "tar", "gztar",
1043 "bztar", or "xztar". Or any other registered format.
1044
1045 'root_dir' is a directory that will be the root directory of the
1046 archive; ie. we typically chdir into 'root_dir' before creating the
1047 archive. 'base_dir' is the directory where we start archiving from;
1048 ie. 'base_dir' will be the common prefix of all files and
1049 directories in the archive. 'root_dir' and 'base_dir' both default
1050 to the current directory. Returns the name of the archive file.
1051
1052 'owner' and 'group' are used when creating a tar archive. By default,
1053 uses the current owner and group.
1054 """
1055 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
1056 save_cwd = os.getcwd()
1057 if root_dir is not None:
1058 if logger is not None:
1059 logger.debug("changing into '%s'", root_dir)
1060 base_name = os.path.abspath(base_name)
1061 if not dry_run:
1062 os.chdir(root_dir)
1063
1064 if base_dir is None:
1065 base_dir = os.curdir
1066
1067 kwargs = {'dry_run': dry_run, 'logger': logger}
1068
1069 try:
1070 format_info = _ARCHIVE_FORMATS[format]
1071 except KeyError:
1072 raise ValueError("unknown archive format '%s'" % format) from None
1073
1074 func = format_info[0]
1075 for arg, val in format_info[1]:
1076 kwargs[arg] = val
1077
1078 if format != 'zip':
1079 kwargs['owner'] = owner
1080 kwargs['group'] = group
1081
1082 try:
1083 filename = func(base_name, base_dir, **kwargs)
1084 finally:
1085 if root_dir is not None:
1086 if logger is not None:
1087 logger.debug("changing back to '%s'", save_cwd)
1088 os.chdir(save_cwd)
1089
1090 return filename
1091
1092
1093def get_unpack_formats():
1094 """Returns a list of supported formats for unpacking.
1095
1096 Each element of the returned sequence is a tuple
1097 (name, extensions, description)
1098 """
1099 formats = [(name, info[0], info[3]) for name, info in
1100 _UNPACK_FORMATS.items()]
1101 formats.sort()
1102 return formats
1103
1104def _check_unpack_options(extensions, function, extra_args):
1105 """Checks what gets registered as an unpacker."""
1106 # first make sure no other unpacker is registered for this extension
1107 existing_extensions = {}
1108 for name, info in _UNPACK_FORMATS.items():
1109 for ext in info[0]:
1110 existing_extensions[ext] = name
1111
1112 for extension in extensions:
1113 if extension in existing_extensions:
1114 msg = '%s is already registered for "%s"'
1115 raise RegistryError(msg % (extension,
1116 existing_extensions[extension]))
1117
1118 if not callable(function):
1119 raise TypeError('The registered function must be a callable')
1120
1121
1122def register_unpack_format(name, extensions, function, extra_args=None,
1123 description=''):
1124 """Registers an unpack format.
1125
1126 `name` is the name of the format. `extensions` is a list of extensions
1127 corresponding to the format.
1128
1129 `function` is the callable that will be
1130 used to unpack archives. The callable will receive archives to unpack.
1131 If it's unable to handle an archive, it needs to raise a ReadError
1132 exception.
1133
1134 If provided, `extra_args` is a sequence of
1135 (name, value) tuples that will be passed as arguments to the callable.
1136 description can be provided to describe the format, and will be returned
1137 by the get_unpack_formats() function.
1138 """
1139 if extra_args is None:
1140 extra_args = []
1141 _check_unpack_options(extensions, function, extra_args)
1142 _UNPACK_FORMATS[name] = extensions, function, extra_args, description
1143
1144def unregister_unpack_format(name):
1145 """Removes the pack format from the registry."""
1146 del _UNPACK_FORMATS[name]
1147
1148def _ensure_directory(path):
1149 """Ensure that the parent directory of `path` exists"""
1150 dirname = os.path.dirname(path)
1151 if not os.path.isdir(dirname):
1152 os.makedirs(dirname)
1153
1154def _unpack_zipfile(filename, extract_dir):
1155 """Unpack zip `filename` to `extract_dir`
1156 """
1157 import zipfile # late import for breaking circular dependency
1158
1159 if not zipfile.is_zipfile(filename):
1160 raise ReadError("%s is not a zip file" % filename)
1161
1162 zip = zipfile.ZipFile(filename)
1163 try:
1164 for info in zip.infolist():
1165 name = info.filename
1166
1167 # don't extract absolute paths or ones with .. in them
1168 if name.startswith('/') or '..' in name:
1169 continue
1170
1171 target = os.path.join(extract_dir, *name.split('/'))
1172 if not target:
1173 continue
1174
1175 _ensure_directory(target)
1176 if not name.endswith('/'):
1177 # file
1178 data = zip.read(info.filename)
1179 f = open(target, 'wb')
1180 try:
1181 f.write(data)
1182 finally:
1183 f.close()
1184 del data
1185 finally:
1186 zip.close()
1187
1188def _unpack_tarfile(filename, extract_dir):
1189 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
1190 """
1191 import tarfile # late import for breaking circular dependency
1192 try:
1193 tarobj = tarfile.open(filename)
1194 except tarfile.TarError:
1195 raise ReadError(
1196 "%s is not a compressed or uncompressed tar file" % filename)
1197 try:
1198 tarobj.extractall(extract_dir)
1199 finally:
1200 tarobj.close()
1201
1202_UNPACK_FORMATS = {
1203 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
1204 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"),
1205}
1206
1207if _ZLIB_SUPPORTED:
1208 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [],
1209 "gzip'ed tar-file")
1210
1211if _BZ2_SUPPORTED:
1212 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [],
1213 "bzip2'ed tar-file")
1214
1215if _LZMA_SUPPORTED:
1216 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [],
1217 "xz'ed tar-file")
1218
1219def _find_unpack_format(filename):
1220 for name, info in _UNPACK_FORMATS.items():
1221 for extension in info[0]:
1222 if filename.endswith(extension):
1223 return name
1224 return None
1225
1226def unpack_archive(filename, extract_dir=None, format=None):
1227 """Unpack an archive.
1228
1229 `filename` is the name of the archive.
1230
1231 `extract_dir` is the name of the target directory, where the archive
1232 is unpacked. If not provided, the current working directory is used.
1233
1234 `format` is the archive format: one of "zip", "tar", "gztar", "bztar",
1235 or "xztar". Or any other registered format. If not provided,
1236 unpack_archive will use the filename extension and see if an unpacker
1237 was registered for that extension.
1238
1239 In case none is found, a ValueError is raised.
1240 """
1241 sys.audit("shutil.unpack_archive", filename, extract_dir, format)
1242
1243 if extract_dir is None:
1244 extract_dir = os.getcwd()
1245
1246 extract_dir = os.fspath(extract_dir)
1247 filename = os.fspath(filename)
1248
1249 if format is not None:
1250 try:
1251 format_info = _UNPACK_FORMATS[format]
1252 except KeyError:
1253 raise ValueError("Unknown unpack format '{0}'".format(format)) from None
1254
1255 func = format_info[1]
1256 func(filename, extract_dir, **dict(format_info[2]))
1257 else:
1258 # we need to look at the registered unpackers supported extensions
1259 format = _find_unpack_format(filename)
1260 if format is None:
1261 raise ReadError("Unknown archive format '{0}'".format(filename))
1262
1263 func = _UNPACK_FORMATS[format][1]
1264 kwargs = dict(_UNPACK_FORMATS[format][2])
1265 func(filename, extract_dir, **kwargs)
1266
1267
1268if hasattr(os, 'statvfs'):
1269
1270 __all__.append('disk_usage')
1271 _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1272 _ntuple_diskusage.total.__doc__ = 'Total space in bytes'
1273 _ntuple_diskusage.used.__doc__ = 'Used space in bytes'
1274 _ntuple_diskusage.free.__doc__ = 'Free space in bytes'
1275
1276 def disk_usage(path):
1277 """Return disk usage statistics about the given path.
1278
1279 Returned value is a named tuple with attributes 'total', 'used' and
1280 'free', which are the amount of total, used and free space, in bytes.
1281 """
1282 st = os.statvfs(path)
1283 free = st.f_bavail * st.f_frsize
1284 total = st.f_blocks * st.f_frsize
1285 used = (st.f_blocks - st.f_bfree) * st.f_frsize
1286 return _ntuple_diskusage(total, used, free)
1287
1288elif _WINDOWS:
1289
1290 __all__.append('disk_usage')
1291 _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1292
1293 def disk_usage(path):
1294 """Return disk usage statistics about the given path.
1295
1296 Returned values is a named tuple with attributes 'total', 'used' and
1297 'free', which are the amount of total, used and free space, in bytes.
1298 """
1299 total, free = nt._getdiskusage(path)
1300 used = total - free
1301 return _ntuple_diskusage(total, used, free)
1302
1303
1304def chown(path, user=None, group=None):
1305 """Change owner user and group of the given path.
1306
1307 user and group can be the uid/gid or the user/group names, and in that case,
1308 they are converted to their respective uid/gid.
1309 """
1310 sys.audit('shutil.chown', path, user, group)
1311
1312 if user is None and group is None:
1313 raise ValueError("user and/or group must be set")
1314
1315 _user = user
1316 _group = group
1317
1318 # -1 means don't change it
1319 if user is None:
1320 _user = -1
1321 # user can either be an int (the uid) or a string (the system username)
1322 elif isinstance(user, str):
1323 _user = _get_uid(user)
1324 if _user is None:
1325 raise LookupError("no such user: {!r}".format(user))
1326
1327 if group is None:
1328 _group = -1
1329 elif not isinstance(group, int):
1330 _group = _get_gid(group)
1331 if _group is None:
1332 raise LookupError("no such group: {!r}".format(group))
1333
1334 os.chown(path, _user, _group)
1335
1336def get_terminal_size(fallback=(80, 24)):
1337 """Get the size of the terminal window.
1338
1339 For each of the two dimensions, the environment variable, COLUMNS
1340 and LINES respectively, is checked. If the variable is defined and
1341 the value is a positive integer, it is used.
1342
1343 When COLUMNS or LINES is not defined, which is the common case,
1344 the terminal connected to sys.__stdout__ is queried
1345 by invoking os.get_terminal_size.
1346
1347 If the terminal size cannot be successfully queried, either because
1348 the system doesn't support querying, or because we are not
1349 connected to a terminal, the value given in fallback parameter
1350 is used. Fallback defaults to (80, 24) which is the default
1351 size used by many terminal emulators.
1352
1353 The value returned is a named tuple of type os.terminal_size.
1354 """
1355 # columns, lines are the working values
1356 try:
1357 columns = int(os.environ['COLUMNS'])
1358 except (KeyError, ValueError):
1359 columns = 0
1360
1361 try:
1362 lines = int(os.environ['LINES'])
1363 except (KeyError, ValueError):
1364 lines = 0
1365
1366 # only query if necessary
1367 if columns <= 0 or lines <= 0:
1368 try:
1369 size = os.get_terminal_size(sys.__stdout__.fileno())
1370 except (AttributeError, ValueError, OSError):
1371 # stdout is None, closed, detached, or not a terminal, or
1372 # os.get_terminal_size() is unsupported
1373 size = os.terminal_size(fallback)
1374 if columns <= 0:
1375 columns = size.columns
1376 if lines <= 0:
1377 lines = size.lines
1378
1379 return os.terminal_size((columns, lines))
1380
1381
1382# Check that a given file can be accessed with the correct mode.
1383# Additionally check that `file` is not a directory, as on Windows
1384# directories pass the os.access check.
1385def _access_check(fn, mode):
1386 return (os.path.exists(fn) and os.access(fn, mode)
1387 and not os.path.isdir(fn))
1388
1389
1390def which(cmd, mode=os.F_OK | os.X_OK, path=None):
1391 """Given a command, mode, and a PATH string, return the path which
1392 conforms to the given mode on the PATH, or None if there is no such
1393 file.
1394
1395 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
1396 of os.environ.get("PATH"), or can be overridden with a custom search
1397 path.
1398
1399 """
1400 # If we're given a path with a directory part, look it up directly rather
1401 # than referring to PATH directories. This includes checking relative to the
1402 # current directory, e.g. ./script
1403 if os.path.dirname(cmd):
1404 if _access_check(cmd, mode):
1405 return cmd
1406 return None
1407
1408 use_bytes = isinstance(cmd, bytes)
1409
1410 if path is None:
1411 path = os.environ.get("PATH", None)
1412 if path is None:
1413 try:
1414 path = os.confstr("CS_PATH")
1415 except (AttributeError, ValueError):
1416 # os.confstr() or CS_PATH is not available
1417 path = os.defpath
1418 # bpo-35755: Don't use os.defpath if the PATH environment variable is
1419 # set to an empty string
1420
1421 # PATH='' doesn't match, whereas PATH=':' looks in the current directory
1422 if not path:
1423 return None
1424
1425 if use_bytes:
1426 path = os.fsencode(path)
1427 path = path.split(os.fsencode(os.pathsep))
1428 else:
1429 path = os.fsdecode(path)
1430 path = path.split(os.pathsep)
1431
1432 if sys.platform == "win32":
1433 # The current directory takes precedence on Windows.
1434 curdir = os.curdir
1435 if use_bytes:
1436 curdir = os.fsencode(curdir)
1437 if curdir not in path:
1438 path.insert(0, curdir)
1439
1440 # PATHEXT is necessary to check on Windows.
1441 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
1442 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
1443
1444 if use_bytes:
1445 pathext = [os.fsencode(ext) for ext in pathext]
1446 # See if the given file matches any of the expected path extensions.
1447 # This will allow us to short circuit when given "python.exe".
1448 # If it does match, only test that one, otherwise we have to try
1449 # others.
1450 if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
1451 files = [cmd]
1452 else:
1453 files = [cmd + ext for ext in pathext]
1454 else:
1455 # On other platforms you don't have things like PATHEXT to tell you
1456 # what file suffixes are executable, so just pass on cmd as-is.
1457 files = [cmd]
1458
1459 seen = set()
1460 for dir in path:
1461 normdir = os.path.normcase(dir)
1462 if not normdir in seen:
1463 seen.add(normdir)
1464 for thefile in files:
1465 name = os.path.join(dir, thefile)
1466 if _access_check(name, mode):
1467 return name
1468 return None
diff --git a/sitegen/site.py b/sitegen/site.py
index 5e009dc..d50ec47 100644
--- a/sitegen/site.py
+++ b/sitegen/site.py
@@ -17,7 +17,7 @@
17# 17#
18# SPDX-License-Identifier: 0BSD 18# SPDX-License-Identifier: 0BSD
19import os 19import os
20import shutil 20import sitegen.myshutil as shutil
21import os.path 21import os.path
22import sys 22import sys
23import re 23import re