Amaze UI Logo

收集文章: 1998

浏览人数: 440810



Python专题-命令行(shell)编程 原创

Python专题-命令行(shell)编程

像shell脚本一样运行

  • 查看解释器路径

$ which python3
/usr/local/bin/python3
  • py文件第一行

#!/usr/local/bin/python3
  • 可执行权限

$ chmod u+x xxx.py 
$ ./xxx.py

os模块

os模块负责程序与操作系统的交互,提供了访问操作系统底层的接口。

  • os.path

os.path.isdir(name) # 判断是不是一个目录os.path.isfile(name) # 判断是不是一个文件os.path.exists(name) # 判断是否存在文件或目录os.path.getsize(name) # 获得文件大小,目录返回0os.path.abspath(name) # 获得绝对路径os.path.normpath(path) # 规范path字符串表示os.path.split(name) # 分割文件名与目录os.path.splitext() # 分离文件名与扩展名os.path.join(path,name) # 连接目录与文件名或目录os.path.basename(path) # 返回文件名os.path.dirname(path) # 返回文件路径
  • os.walk

os.walk(dir_name)# 返回一个生成器# 每个元素为元组 (dirpath,dirnames,filenames)
  • os.system

用来执行shell命令

# 解释器环境>>> os.system('ls')
Applications	Downloads	Movies		Public
#!/usr/bin/python3# test.pyimport os

os.system('ls')# ./test.py# 没有任何输出
  • os.popen

#!/usr/local/bin/python3# test.pyimport os

result = os.popen("ls -al").read()
print(result)
  • 其他

os.listdir(dirname) # 列出dirname下的目录和文件os.getcwd():# 获得当前工作目录os.curdir # 返回当前目录('.')os.chdir(dirname) # 改变工作目录到dirnameos.remove(name) # 删除文件os.rename(src, dst) # 重命名文件os.mkdir(dirname) # 创建目录os.rmdir(dirname) # 删除目录

sys模块

sys模块负责程序与python解释器的交互,提供了一系列的函数和变量,用于操控python的运行时环境。

sys.argv # 命令行参数sys.exit(n) # 退出,0代表正常sys.platform # 操作系统平台名称
import sys

print(sys.argv)# $ python3 test.py 123# ['test.py', '123']

具体例子

重定向/管道/文件接受输入

#!/usr/bin/env python3import fileinputwith fileinput.input() as f_input:    for line in f_input:
        print(line, end='')
$ ls | ./test.py          # Prints a directory listing to stdout.
$ ./test.py /etc/passwd   # Reads /etc/passwd to stdout.
$ ./test.py < /etc/passwd # Reads /etc/passwd to stdout.

脚本报错

import sys

sys.stderr.write('It failed!\n')raise SystemExit(1)# 或raise SystemExit('It failed!')

命令行选项

sys.argv只能按照顺序传递参数

import argparse

parser = argparse.ArgumentParser(description='Example with long option names')
parser.add_argument('--noarg', action="store_true", default=False,
                    help="set a boolean value")
parser.add_argument('--witharg', action="store", dest="witharg")
parser.add_argument('--witharg2', action="store", dest="witharg2", type=int)

args = parser.parse_args()# $ ./test.py -h# usage: test.py [-h] [--noarg] [--witharg WITHARG] [--witharg2 WITHARG2]# Example with long option names# optional arguments:#   -h, --help           show this help message and exit#   --noarg              set a boolean value#   --witharg WITHARG#   --witharg2 WITHARG2

密码提示

import getpass

user = getpass.getuser() #自动获取当前用户名# input("username: ") passwd = getpass.getpass()

print(user)
print(passwd)

执行外部shell命令

# ['cmd', 'arg1', 'arg2', ...]import subprocess

out_bytes = subprocess.check_output(['netstat','-a'])
out_text = out_bytes.decode('utf-8')
print(out_text)
# 'cmd string', shell=True# 复杂命令,例如管道等更适用import subprocess

out_bytes = subprocess.check_output('grep python | wc > out', 
    shell=True)

复制、移动、删除文件(夹)

通过文件名查找文件

import osimport sysdef findfile(start, name):
    for relpath, dirs, files in os.walk(start):        if name in files:
            full_path = os.path.join(relpath, name)
            print(os.path.normpath(os.path.abspath(full_path)))            
if __name__ == '__main__':
    findfile(sys.argv[1], sys.argv[2])

时间与日期

>>> import datetime>>> today = datetime.date.today()>>> today + datetime.timedelta(days=1)
datetime.date(2017, 7, 9)>>> now = datetime.datetime.now() 
>>> now.second56>>> now.microsecond591835>>> now.month7
from datetime import datetime

theDate = '2016-08-09'dateObj = datetime.strptime(theDate,'%Y-%m-%d')
dateStr = datetime.strftime(dateObj,'%m/%d/%Y')

配置文件

# 配置文件样例
[debug]
log_errors=true
show_warnings=False

[server]
port: 8080
nworkers: 32

日志

限制内存和CPU使用量

综合示例

读取日志信息,存入数据库。多线程执行插入操作。500为步长,500条数据插入后 commit一次。测试过程中,共计30w条数据,耗时2.52s(测试主机,Mac Pro,I7 4核 16G)

#!/usr/local/bin/python3import pymysqlimport osimport sysimport threadingimport queueimport datetimeclass MysqlUtil:
    def __init__(self, host="127.0.0.1", username="***", 
                 password="***", db_name="***"):
        self.host = host
        self.username = username
        self.password = password
        self.db_name = db_name    def __str__(self):
        return 'this is mysql database connection management'

    def get_mysql_con(self):
        try:
            db = pymysql.connect(self.host, self.username, 
                                 self.password, self.db_name)        except Exception as e:
            print("连接失败")            raise SystemError(e)        return db    def close_mysql_con(self, db):
        db.close()    def search_fields_by_date(self, fields=("*"), created_time="2017-07-08"):
        db = self.get_mysql_con()
        cursor = db.cursor()
        sql = 'SELECT %s FROM WHERE CREATED_TIME LIKE "%s%"' \
              % (','.join(fields), created_time)
        cursor.execute(sql)
        search_list = cursor.fetchall()
        cursor.close()        return search_list    def insert_log(self, created_time, level, msg):
        '''
        插入一条日志记录
        :param created_time: 日志时间
        :param level: 日志级别
        :param msg: 具体信息
        :return: None
        '''
        db = self.get_mysql_con()
        cursor = db.cursor()
        sql = 'INSERT INTO LOG (CREATED_TIME, LEVEL, MSG) ' \              'values ("%s","%s","%s")'
        try:
            cursor.execute(sql, (created_time, level, msg))
            db.commit()        except:
            db.rollback()        finally:
            cursor.close()    def insert_logs(self, logs):
        '''
        一次插入多条日志记录
        :param logs: 多条日志记录
        :return: None
        '''

        db = self.get_mysql_con()
        cursor = db.cursor()
        values = []        for log in logs:
            escape_log = [db.escape_string(field) for field in log]
            values.append('("' + '","'.join(escape_log) + '")')
        sql = 'INSERT INTO LOG (CREATED_TIME, LEVEL, MSG) ' \              'values %s' % ','.join(values)        try:
            cursor.execute(sql)
            db.commit()        except Exception as e:
            db.rollback()            raise e        finally:
            cursor.close()def walk_dir(root_dir, msg_queue, step):
    '''
    :param root_dir: 扫描的日志根目录
    :param msg_queue: 同步队列
    :param step: 步长
    :return: None
    '''
    for (dirpath, dirnames, filenames) in os.walk(root_dir):
        full_paths = (os.path.join(dirpath, name) 
                      for name in filenames if name.endswith(".txt"))
        logs = []        for path in full_paths:
            print(path)            with open(path, encoding="utf-8") as f:                for line in f:                    if len(line) > 30:
                        logs.append(line)                    if len(logs) >= step:
                        msg_queue.put(logs)
                        logs = []        if len(logs) > 0:
            msg_queue.put(logs)def start_insert_thread(dbUtil, msg_queue):
    while not msg_queue.empty():
        logs = msg_queue.get()
        values = [(log[:23], log[25:30].strip(), 
                   log[30:].strip()) for log in logs]
        dbUtil.insert_logs(values)if __name__ == "__main__":
    log_dir = sys.argv[1]
    dbUtil = MysqlUtil(username="***", password="***", db_name="***")
    step = 500
    msg_queue = queue.Queue()

    time_start = datetime.datetime.now()
    walk_dir(log_dir, msg_queue, step)
    t_list = []    for index in range(4):
        t_list.append(threading.Thread(target=start_insert_thread, 
                                       args=(dbUtil, msg_queue)))    for t in t_list:
        t.setDaemon(True)
        t.start()    for t in t_list:
        t.join()

    print(datetime.datetime.now() - time_start)


反对(0) 支持(0)
  python


评论