python自动化巡检服务器
# python自动化巡检服务器
需求:通过 SSH 远程连接到服务器执行命令收集结果,进行自动化多主机巡检
# 1.安装python依赖包
pip install paramiko
pip install pyinstaller
# 2.代码编写
import os
import paramiko
from datetime import datetime
# 定义服务器列表
servers = [
{"hostname": "10.0.0.50", "port": 22, "username": "zzl", "password": "J@LE*_UOWK95upDH"},
{"hostname": "10.0.0.51", "port": 22, "username": "zzl", "password": "%@w(264EHi91XRnB"},
{"hostname": "10.0.0.52", "port": 22, "username": "zzl", "password": "5EPAmZeF9!7&yncT"},
# 添加更多服务器...
]
# 定义检测命令
commands = {
"firewall": "systemctl status iptables || echo 'iptables service not found'", # 防火墙检测命令
"disk": "df -h", # 磁盘检测命令
"memory": "free -h", # 内存检测命令
"cpu": "top -bn1 | grep 'Cpu(s)'", # CPU 检测命令
"ssh_status": "systemctl status sshd || echo 'SSH service not found'", # SSH状态检测
"uptime": "uptime", # 系统负载检测
"user_activity": "last", # 用户活动和登录记录检测
"system_time": "timedatectl", # 系统时间检测
"network_connections": "netstat -tunpl", # 网络连接检测
"jar_processes": "ps aux | grep jar | grep -v grep | grep -v 'sa-agent'" # 检测运行中的jar进程
}
def ssh_execute_and_save_output(servers, commands):
try:
# 动态生成日期目录
current_date = datetime.now()
year = current_date.strftime("%Y") # 年份,如 "2025"
month = current_date.strftime("%m") # 月份,如 "04"
day = current_date.strftime("%d") # 日期,如 "24"
# 构造目标目录路径
output_dir = os.path.join(os.getcwd(), year, month, day)
os.makedirs(output_dir, exist_ok=True)
# 初始化文件句柄
file_handles = {}
for item in commands.keys():
file_name = f"{item}_检测.txt"
file_path = os.path.join(output_dir, file_name)
file_handles[item] = open(file_path, "w", encoding="utf-8")
# 遍历每个服务器
for server in servers:
hostname = server["hostname"]
port = server["port"]
username = server["username"]
password = server["password"]
print(f"正在连接到服务器: {hostname}")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname, port=port, username=username, password=password)
# 遍历每个检测项
for item, command in commands.items():
print(f"在服务器 {hostname} 上执行命令: {command}")
stdin, stdout, stderr = ssh.exec_command(command)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
# 写入文件
file_handles[item].write(f"{hostname}\n")
if output:
file_handles[item].write(f"{output}\n")
if error:
file_handles[item].write(f"错误输出:\n{error}\n")
file_handles[item].write("\n" + "-" * 50 + "\n") # 分隔线
# 关闭当前服务器连接
ssh.close()
print(f"完成服务器 {hostname} 的检测\n")
# 关闭所有文件句柄
for file in file_handles.values():
file.close()
print(f"所有检测已完成,结果保存在目录: {output_dir}")
except Exception as e:
print(f"发生错误: {e}")
# 示例调用
if __name__ == "__main__":
ssh_execute_and_save_output(servers, commands)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
执行结果展示

# 3.打包exe
将代码打包exe文件,脱离python环境也能运行
pyinstaller --onefile demo2_ssh.py
1

