1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

Fixing Security Vulnerabilities (#1941)

* Fixing Security Vulnerabilities

* Fixing Pylint Error

* Fixing Security Vulnerabilities windows

* format with black

* using returncode to locate problems

* fix pylint error
This commit is contained in:
Linlang
2025-05-28 20:43:58 +08:00
committed by GitHub
parent 3e72593b8c
commit 3525514704

View File

@@ -5,6 +5,7 @@ from pathlib import Path
__version__ = "0.9.6.99"
__version__bak = __version__ # This version is backup for QlibConfig.reset_qlib_version
import os
import re
from typing import Union
from ruamel.yaml import YAML
import logging
@@ -80,34 +81,41 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False):
LOG = get_module_logger("mount nfs", level=logging.INFO)
if mount_path is None:
raise ValueError(f"Invalid mount path: {mount_path}!")
if not re.match(r"^[a-zA-Z0-9.:/\-_]+$", provider_uri):
raise ValueError(f"Invalid provider_uri format: {provider_uri}")
# FIXME: the C["provider_uri"] is modified in this function
# If it is not modified, we can pass only provider_uri or mount_path instead of C
mount_command = "sudo mount.nfs %s %s" % (provider_uri, mount_path)
mount_command = ["sudo", "mount.nfs", provider_uri, mount_path]
# If the provider uri looks like this 172.23.233.89//data/csdesign'
# It will be a nfs path. The client provider will be used
if not auto_mount: # pylint: disable=R1702
if not Path(mount_path).exists():
raise FileNotFoundError(
f"Invalid mount path: {mount_path}! Please mount manually: {mount_command} or Set init parameter `auto_mount=True`"
f"Invalid mount path: {mount_path}! Please mount manually: {' '.join(mount_command)} or Set init parameter `auto_mount=True`"
)
else:
# Judging system type
sys_type = platform.system()
if "windows" in sys_type.lower():
# system: window
exec_result = os.popen(f"mount -o anon {provider_uri} {mount_path}")
result = exec_result.read()
if "85" in result:
LOG.warning(f"{provider_uri} on Windows:{mount_path} is already mounted")
elif "53" in result:
raise OSError("not find network path")
elif "error" in result or "错误" in result:
raise OSError("Invalid mount path")
elif provider_uri in result:
LOG.info("window success mount..")
try:
subprocess.run(
["mount", "-o", "anon", provider_uri, mount_path],
capture_output=True,
text=True,
check=True,
)
LOG.info("Mount finished.")
except subprocess.CalledProcessError as e:
error_output = (e.stdout or "") + (e.stderr or "")
if e.returncode == 85:
LOG.warning(f"{provider_uri} already mounted at {mount_path}")
elif e.returncode == 53:
raise OSError("Network path not found") from e
elif "error" in error_output.lower() or "错误" in error_output:
raise OSError("Invalid mount path") from e
else:
raise OSError(f"unknown error: {result}")
raise OSError(f"Unknown mount error: {error_output.strip()}") from e
else:
# system: linux/Unix/Mac
# check mount
@@ -119,12 +127,13 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False):
_is_mount = False
while _check_level_num:
with subprocess.Popen(
'mount | grep "{}"'.format(_remote_uri),
shell=True,
["mount"],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as shell_r:
_command_log = shell_r.stdout.readlines()
_command_log = [line for line in _command_log if _remote_uri in line]
if len(_command_log) > 0:
for _c in _command_log:
_temp_mount = _c.decode("utf-8").split(" ")[2]
@@ -152,16 +161,16 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False):
if not command_res:
raise OSError("nfs-common is not found, please install it by execute: sudo apt install nfs-common")
# manually mount
command_status = os.system(mount_command)
if command_status == 256:
raise OSError(
f"mount {provider_uri} on {mount_path} error! Needs SUDO! Please mount manually: {mount_command}"
)
elif command_status == 32512:
# LOG.error("Command error")
raise OSError(f"mount {provider_uri} on {mount_path} error! Command error")
elif command_status == 0:
LOG.info("Mount finished")
try:
subprocess.run(mount_command, check=True, capture_output=True, text=True)
LOG.info("Mount finished.")
except subprocess.CalledProcessError as e:
if e.returncode == 256:
raise OSError("Mount failed: requires sudo or permission denied") from e
elif e.returncode == 32512:
raise OSError(f"mount {provider_uri} on {mount_path} error! Command error") from e
else:
raise OSError(f"Mount failed: {e.stderr}") from e
else:
LOG.warning(f"{_remote_uri} on {_mount_path} is already mounted")