+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2019-07(1)

2019-08(109)

2019-09(120)

2019-10(17)

2019-11(1)

windows本地磁盘、内存、CPU、进程监控

发布于2020-06-25 21:36     阅读(327)     评论(0)     点赞(24)     收藏(5)


0

1

2

3

4

# -*- coding: utf-8 -*-
# !/usr/bin/env python
# Version = 1.0.0
# __auth__ = 'wangDaoYuan'
import time
import psutil
import collections
import smtplib
from datetime import datetime
from email.mime.text import MIMEText
from email.utils import formataddr


# 警戒值 *%
SEIZURE_RATE = 95
# 需要监控的进程列表
L_PROCESS_NAME = ['QQ.exe', 'MySQLNotifier.exe', 'System Idle Process', 'smss.exe']


class Email(object):
    def __init__(self):
        self.error_time = '2020, 1, 1, 0, 0, 0, 0'
        self.interval_time = 20 * 60  # 连续报错的情况下,发送邮件的时间间隔/s

    def sendmail(self, mail_msg):
        """
        发送预警邮件
        """
        if isinstance(self.error_time, str):
            interval = (datetime.now()-datetime(*eval(self.error_time))).seconds
        else:
            interval = (datetime.now() - self.error_time).seconds

        if not self.error_time or interval >= self.interval_time:
            receivers = ['mm_xhn@163.com', ]  # 接收邮件的邮箱

            # 第三方 SMTP 服务
            mail_post = 465
            mail_host = "smtp.qq.com"  # 设置服务器
            mail_user = "************"  # 发件人邮箱账号
            mail_pass = "*************"  # 发件人邮箱密码口令

            try:
                msg = MIMEText(mail_msg, 'html', 'utf-8')
                msg['From'] = formataddr(["磁盘监控", mail_user])  # 括号里的对应发件人邮箱昵称、发件人邮箱账号
                msg['To'] = formataddr(["Administrator", mail_user])  # 括号里的对应收件人邮箱昵称、收件人邮箱账号
                msg['Subject'] = "磁盘监控"  # 邮件标题

                server = smtplib.SMTP_SSL(mail_host, mail_post)  # 发件人邮箱中的SMTP服务器,端口是25
                server.login(mail_user, mail_pass)  # 括号中对应的是发件人邮箱账号、邮箱密码
                server.sendmail(mail_user, receivers, msg.as_string())  # 括号中对应的是发件人邮箱账号、收件人邮箱账号、发送邮件
                server.quit()  # 关闭连接
                self.error_time = datetime.now()
                print("邮件发送成功")
            except smtplib.SMTPException:
                print("Error: 无法发送邮件")
        else:
            print('时间间隔过短,未发送邮件通知')


class Control(object):
    """本地系统监控"""
    def __init__(self):
        self.disk_used = collections.OrderedDict()  # 信息列表
        self.count = 0  # 记录CPU占用率连续超过警戒值的次数
        self.obj = Email()
    
    def get_disk_info(self):
        """磁盘、内存监控"""
        # 磁盘
        try:
            disk_info = psutil.disk_usage('C:\\')

            self.disk_used['C盘占用率:'] = f'{disk_info.percent}%'
            self.disk_used['C盘剩余空间:'] = f'{disk_info.free // 1024 // 1024 // 1024}GB'

            # 内存
            mem = psutil.virtual_memory()

            self.disk_used['内存使用率:'] = f'{mem.percent}%'
            self.disk_used['空闲内存数:'] = f'{mem.free / 1024 / 1024 / 1024:.2f}GB'
        except Exception as e:
            print(f'{e}:查询磁盘信息失败,请重试')
            return self.disk_used

        if disk_info.percent < SEIZURE_RATE:
            if mem.percent < SEIZURE_RATE:
                return self.disk_used
            else:
                self.obj.sendmail(mail_msg="""
                        <p>  你好</p>
                        <p>  内存占用率已达95%以上,请注意及时清理!</p>
                        """)
                print('WARNING:内存占用率已超95%')
                return self.disk_used
        else:
            self.obj.sendmail(mail_msg="""
                        <p>  你好</p>
                        <p>  C磁盘占用率已达95%以上,请注意及时清理!</p>
                        """)
            print('WARNING:C盘占用率已超95%')
            return self.disk_used

    def get_cpu_info(self):
        """CPU监控"""
        try:
            self.disk_used['CPU使用率:'] = f'{psutil.cpu_percent()}%'
        except Exception as e:
            print(f'{e}:查询CPU使用率失败,请重试')
            return
        # 连续三次使用率超过警戒值则发出预警
        if float(self.disk_used['CPU使用率:'].replace('%', '')) > SEIZURE_RATE:
            self.count += 1
            if self.count > 2:
                self.obj.sendmail(mail_msg="""
                        <p>  你好</p>
                        <p>  CPU使用率已达95%以上,请注意及时处理!</p>
                        """)
                print('WARNING:CPU使用率过高,请注意及时处理')
        else:
            self.count = 0

    def get_process_info(self):
        """进程监控"""
        try:
            for pid in psutil.pids():
                p = psutil.Process(pid)
                # print(p.name)
                if p.name() in L_PROCESS_NAME:
                    self.disk_used[f'进程{p.name()}:'] = f'状态:{p.status()};PID:{pid}'
                    if p.status() != 'running':
                        self.obj.sendmail(mail_msg="""
                                            <p>  你好</p>
                                            <p>  你监控的进程挂了,请注意及时处理!</p>
                                            """)
                        print(f'WARNING:你监控的{p.name()}这个进程挂了,请注意及时处理')
        except Exception as e:
            print(f'{e}:查询进程失败,请重试')


if __name__ == '__main__':
    control = Control()
    while True:
        control.get_process_info()
        control.get_cpu_info()
        ret = control.get_disk_info()

        for k, v in ret.items():
            print(f'{k}{v}。')
        print(datetime.now())
        time.sleep(30)


0

1

2

3

4



所属网站分类: 技术文章 > 博客

作者:你太美丽

链接: https://www.pythonheidong.com/blog/article/430510/c575f6545252332070ec/

来源: python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

24 0
收藏该文
已收藏

评论内容:(最多支持255个字符)