Source code for tartufo.util

# -*- coding: utf-8 -*-
import json
import os
import pathlib
import platform
import stat
import sys
import tempfile
from types import ModuleType
import uuid
from datetime import datetime
from functools import lru_cache, partial
from hashlib import blake2s
from typing import (
    Any,
    Callable,
    Dict,
    Generator,
    List,
    Optional,
    NoReturn,
    Tuple,
    TYPE_CHECKING,
    Pattern,
)

import click
import git
import pygit2

from tartufo import types

if TYPE_CHECKING:
    from tartufo.scanner import Issue  # pylint: disable=cyclic-import
    from tartufo.scanner import ScannerBase  # pylint: disable=cyclic-import


DATETIME_FORMAT: str = "%Y-%m-%d %H:%M:%S"


[docs]def del_rw(_func: Callable, name: str, _exc: Exception) -> None: """Attempt to grant permission to and force deletion of a file. This is used as an error handler for `shutil.rmtree`. :param _func: The original calling function :param name: The name of the file to try removing :param _exc: The exception raised originally when the file was removed """ os.chmod(name, stat.S_IWRITE) os.remove(name)
def get_version() -> str: metadata: Optional[ModuleType] try: from importlib import metadata # type: ignore # pylint: disable=import-outside-toplevel except ImportError: # Python < 3.8 import importlib_metadata as metadata # type: ignore # pylint: disable=import-outside-toplevel if metadata: return metadata.version(__package__) # type: ignore return "" def echo_report_result(scanner: "ScannerBase", now: str): click.echo(f"Tartufo Scan Results (Time: {now})") for issue in scanner.scan(): click.echo(str(issue)) if scanner.issue_count == 0: click.echo("All clear. No secrets detected.") click.echo("\nConfiguration:") version = get_version() click.echo(f" version: {version}") if scanner.global_options.entropy: click.echo(" entropy: Enabled") click.echo(f" sensitivity: {scanner.global_options.entropy_sensitivity}") else: click.echo(" entropy: Disabled") click.echo( f" regex: {'Enabled' if scanner.global_options.regex else 'Disabled'}" ) click.echo("\nExcluded paths:") if scanner.global_options.exclude_path_patterns: for item in scanner.global_options.exclude_path_patterns: if isinstance(item, dict): path_pattern = item.get("path-pattern") reason = item.get("reason") else: path_pattern = item reason = "Unknown reason" click.echo(f" {path_pattern}: {reason}") click.echo("\nExcluded signatures:") if scanner.global_options.exclude_signatures: for item in scanner.global_options.exclude_signatures: if isinstance(item, dict): signature = item.get("signature") reason = item.get("reason") else: signature = item reason = "Unknown reason" click.echo(f" {signature}: {reason}") click.echo("\nExcluded entropy patterns:") for e_item in scanner.excluded_entropy: pattern = e_item.pattern.pattern if e_item.pattern else "" path_pattern = e_item.path_pattern.pattern if e_item.path_pattern else "" m_scope = e_item.re_match_scope.value if e_item.re_match_scope else "" m_type = e_item.re_match_type.value if e_item.re_match_type else "" reason = e_item.name click.echo( f" {pattern} (path={path_pattern}, scope={m_scope}, type={m_type}): {reason}" )
[docs]def echo_result( options: "types.GlobalOptions", scanner: "ScannerBase", repo_path: str, output_dir: Optional[pathlib.Path], ) -> None: """Print all found issues out to the console, optionally as JSON. :param options: Global options object :param scanner: ScannerBase containing issues and excluded paths from config tree :param repo_path: The path to the repository the issues were found in :param output_dir: The directory that issue details were written out to """ now = datetime.now().isoformat("T", "microseconds") if options.output_format == types.OutputFormat.Json.value: output = { "scan_time": now, "project_path": repo_path, "output_dir": str(output_dir) if output_dir else None, "excluded_paths": [str(path.pattern) for path in scanner.excluded_paths], "excluded_signatures": [ str(signature) for signature in scanner.excluded_signatures ], "exclude_entropy_patterns": [ str(pattern) for pattern in options.exclude_entropy_patterns ], # This member is for reference. Read below... # "found_issues": [ # issue.as_dict(compact=options.compact) for issue in scanner.issues # ], } # Observation: We want to "stream" JSON; the only generator output is the # "found_issues" list (which is at the top level). Dump the "static" part # minus the closing "}", then generate issues individually, then emit the # closing "}". static_part = json.dumps(output) click.echo(f'{static_part[:-1]}, "found_issues": [', nl=False) delimiter = "" for issue in scanner.scan(): compact = options.output_format == types.OutputFormat.Compact.value live_part = json.dumps(issue.as_dict(compact=compact)) click.echo(f"{delimiter}{live_part}", nl=False) delimiter = ", " click.echo("]}") elif options.output_format == types.OutputFormat.Compact.value: for issue in scanner.scan(): click.echo( f"[{issue.issue_type.value}] {issue.chunk.file_path}: {issue.matched_string} " f"({issue.signature}, {issue.issue_detail})" ) elif options.output_format == types.OutputFormat.Report.value: echo_report_result(scanner, now) else: for issue in scanner.scan(): click.echo(str(issue)) if scanner.issue_count == 0: if not options.quiet: click.echo(f"Time: {now}\nAll clear. No secrets detected.") if options.verbose > 0: click.echo("\nExcluded paths:") click.echo("\n".join([str(path) for path in scanner.excluded_paths])) click.echo("\nExcluded signatures:") click.echo("\n".join(scanner.excluded_signatures)) click.echo("\nExcluded entropy patterns:") click.echo("\n".join(str(path) for path in scanner.excluded_entropy))
[docs]def write_outputs( issues: Generator["Issue", None, None], output_dir: pathlib.Path ) -> List[str]: """Write details of the issues to individual files in the specified directory. :param found_issues: A list of issues to be written out :param output_dir: The directory where the files should be written """ result_files = [] for issue in issues: result_file = output_dir / f"{uuid.uuid4()}.json" result_file.write_text(json.dumps(issue.as_dict())) result_files.append(str(result_file)) return result_files
[docs]def clone_git_repo( git_url: str, target_dir: Optional[pathlib.Path] = None ) -> Tuple[pathlib.Path, str]: """Clone a remote git repository and return its filesystem path. :param git_url: The URL of the git repository to be cloned :param target_dir: Where to clone the repository to :returns: Filesystem path of local clone and name of remote source :raises types.GitRemoteException: If there was an error cloning the repository """ if not target_dir: project_path = tempfile.mkdtemp() else: project_path = str(target_dir) try: repo = git.Repo.clone_from(git_url, project_path) origin = repo.remotes[0].name except git.GitCommandError as exc: raise types.GitRemoteException(exc.stderr.strip()) from exc return pathlib.Path(project_path), origin
if sys.stdout.isatty(): style_ok = partial(click.style, fg="bright_green") style_error = partial(click.style, fg="red", bold=True) style_warning = partial(click.style, fg="bright_yellow") else: # If stdout is not a TTY, don't include color - just pass the string back def _style_func(msg: str, *_: Any, **__: Any) -> str: # We define this func and pass it to partial still to preserve # typing integrity and prevent issues when callers expect to be # able to pass the same args as click.style accepts return msg style_ok = style_error = style_warning = partial(_style_func)
[docs]def fail(msg: str, ctx: click.Context, code: int = 1) -> NoReturn: """Print out a styled error message and exit. :param msg: The message to print out to the user :param ctx: A context from a currently executing Click command :param code: The exit code to use; must be >= 1 """ click.echo(style_error(msg), err=True) ctx.exit(code)
[docs]@lru_cache(maxsize=None) def generate_signature(snippet: str, filename: str) -> str: """Generate a stable hash signature for an issue found in a commit. These signatures are used for configuring excluded/approved issues, such as secrets intentionally embedded in tests. :param snippet: A string which was found as a potential issue during a scan :param filename: The file where the issue was found """ return blake2s(f"{snippet}$${filename}".encode("utf-8")).hexdigest()
[docs]def extract_commit_metadata(commit: pygit2.Commit, branch_name: str) -> Dict[str, Any]: """Grab a consistent set of metadata from a git commit, for user output. :param commit: The commit to extract the data from :param branch_name: What branch the commit was found on """ return { "commit_time": datetime.fromtimestamp(commit.commit_time).strftime( DATETIME_FORMAT ), "commit_message": commit.message, "commit_hash": commit.hex, "branch": branch_name, }
[docs]def find_strings_by_regex( text: str, regex: Pattern, threshold: int = 20 ) -> Generator[str, None, None]: """Locate strings ("words") of interest in input text Each returned string must have a length, at minimum, equal to `threshold`. This is meant to return longer strings which are likely to be things like auto-generated passwords, tokens, hashes, etc. :param text: The text string to be analyzed :param regex: A pattern which matches all character sequences of interest :param threshold: The minimum acceptable length of a matching string """ for match in regex.finditer(text): substring = match.group() if len(substring) >= threshold: yield substring
[docs]def path_contains_git(path: str) -> bool: """Determine whether a filesystem path contains a git repository. :param path: The fully qualified path to be checked """ try: return git.Repo(path) is not None except git.GitError: return False
[docs]def process_issues( repo_path: str, scan: "ScannerBase", options: types.GlobalOptions, ) -> None: """Handle post-scan processing/reporting of a batch of issues. :param repo_path: The repository that was scanned :param scan: The scanner that performed the scan :param options: The options to use for determining output """ now = datetime.now().isoformat("T", "microseconds") output_dir = None if options.output_dir: if platform.system().lower() == "windows": # pragma: no cover # Make sure we aren't using illegal characters for Windows folder names now = now.replace(":", "") output_dir = pathlib.Path(options.output_dir) / f"tartufo-scan-results-{now}" output_dir.mkdir(parents=True) echo_result(options, scan, repo_path, output_dir) if output_dir: write_outputs(scan.scan(), output_dir) if options.output_format != types.OutputFormat.Json.value: click.echo(f"Results have been saved in {output_dir}")
[docs]def is_shallow_clone(repo: pygit2.Repository) -> bool: """Determine whether a repository is a shallow clone This is used to work around https://github.com/libgit2/libgit2/issues/3058 Basically, any time a git repository is a "shallow" clone (it was cloned with `--max-depth N`), git will create a file at `.git/shallow`. So we simply need to test whether that file exists to know whether we are interacting with a shallow repository. :param repo: The repository to check for "shallowness" """ return (pathlib.Path(repo.path) / "shallow").exists()