From 352551470457146c59de0635d89d8bd0dc639bba Mon Sep 17 00:00:00 2001 From: Linlang <30293408+SunsetWolf@users.noreply.github.com> Date: Wed, 28 May 2025 20:43:58 +0800 Subject: [PATCH] Fixing Security Vulnerabilities (#1941) * Fixing Security Vulnerabilities * Fixing Pylint Error * Fixing Security Vulnerabilities windows * format with black * using returncode to locate problems * fix pylint error --- qlib/__init__.py | 63 +++++++++++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/qlib/__init__.py b/qlib/__init__.py index c625907e7..69b91de40 100644 --- a/qlib/__init__.py +++ b/qlib/__init__.py @@ -5,6 +5,7 @@ from pathlib import Path __version__ = "0.9.6.99" __version__bak = __version__ # This version is backup for QlibConfig.reset_qlib_version import os +import re from typing import Union from ruamel.yaml import YAML import logging @@ -80,34 +81,41 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False): LOG = get_module_logger("mount nfs", level=logging.INFO) if mount_path is None: raise ValueError(f"Invalid mount path: {mount_path}!") + if not re.match(r"^[a-zA-Z0-9.:/\-_]+$", provider_uri): + raise ValueError(f"Invalid provider_uri format: {provider_uri}") # FIXME: the C["provider_uri"] is modified in this function # If it is not modified, we can pass only provider_uri or mount_path instead of C - mount_command = "sudo mount.nfs %s %s" % (provider_uri, mount_path) + mount_command = ["sudo", "mount.nfs", provider_uri, mount_path] # If the provider uri looks like this 172.23.233.89//data/csdesign' # It will be a nfs path. The client provider will be used if not auto_mount: # pylint: disable=R1702 if not Path(mount_path).exists(): raise FileNotFoundError( - f"Invalid mount path: {mount_path}! Please mount manually: {mount_command} or Set init parameter `auto_mount=True`" + f"Invalid mount path: {mount_path}! Please mount manually: {' '.join(mount_command)} or Set init parameter `auto_mount=True`" ) else: # Judging system type sys_type = platform.system() if "windows" in sys_type.lower(): # system: window - exec_result = os.popen(f"mount -o anon {provider_uri} {mount_path}") - result = exec_result.read() - if "85" in result: - LOG.warning(f"{provider_uri} on Windows:{mount_path} is already mounted") - elif "53" in result: - raise OSError("not find network path") - elif "error" in result or "错误" in result: - raise OSError("Invalid mount path") - elif provider_uri in result: - LOG.info("window success mount..") - else: - raise OSError(f"unknown error: {result}") - + try: + subprocess.run( + ["mount", "-o", "anon", provider_uri, mount_path], + capture_output=True, + text=True, + check=True, + ) + LOG.info("Mount finished.") + except subprocess.CalledProcessError as e: + error_output = (e.stdout or "") + (e.stderr or "") + if e.returncode == 85: + LOG.warning(f"{provider_uri} already mounted at {mount_path}") + elif e.returncode == 53: + raise OSError("Network path not found") from e + elif "error" in error_output.lower() or "错误" in error_output: + raise OSError("Invalid mount path") from e + else: + raise OSError(f"Unknown mount error: {error_output.strip()}") from e else: # system: linux/Unix/Mac # check mount @@ -119,12 +127,13 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False): _is_mount = False while _check_level_num: with subprocess.Popen( - 'mount | grep "{}"'.format(_remote_uri), - shell=True, + ["mount"], + text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) as shell_r: _command_log = shell_r.stdout.readlines() + _command_log = [line for line in _command_log if _remote_uri in line] if len(_command_log) > 0: for _c in _command_log: _temp_mount = _c.decode("utf-8").split(" ")[2] @@ -152,16 +161,16 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False): if not command_res: raise OSError("nfs-common is not found, please install it by execute: sudo apt install nfs-common") # manually mount - command_status = os.system(mount_command) - if command_status == 256: - raise OSError( - f"mount {provider_uri} on {mount_path} error! Needs SUDO! Please mount manually: {mount_command}" - ) - elif command_status == 32512: - # LOG.error("Command error") - raise OSError(f"mount {provider_uri} on {mount_path} error! Command error") - elif command_status == 0: - LOG.info("Mount finished") + try: + subprocess.run(mount_command, check=True, capture_output=True, text=True) + LOG.info("Mount finished.") + except subprocess.CalledProcessError as e: + if e.returncode == 256: + raise OSError("Mount failed: requires sudo or permission denied") from e + elif e.returncode == 32512: + raise OSError(f"mount {provider_uri} on {mount_path} error! Command error") from e + else: + raise OSError(f"Mount failed: {e.stderr}") from e else: LOG.warning(f"{_remote_uri} on {_mount_path} is already mounted")