+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2019-07(6)

2019-08(116)

2019-09(98)

2019-10(17)

2019-11(5)

Python3 Hadoop 操作函数

发布于2020-09-10 22:57     阅读(95)     评论(0)     点赞(21)     收藏(2)


0

1

2

3

4

5

6

7

8

9

判断Python版本

import sys
if sys.version_info[0] < 3:
	print('[Info] python2')
else:
	print('[Info] python3')

获取Import文件位置,如foo:

import foo
os.path.dirname(foo.__file__)

处理Hadoop的源码,依赖TensorFlow,如:

#!/usr/bin/env python
# -- coding: utf-8 --
"""
Copyright (c) 2020. All rights reserved.
Created by C. L. Wang on 2020/9/9
"""

import os
import re
import time
import traceback

import tensorflow as tf

HDFS_FILE_SEPARATOR = "/"


def convert_str_to_timestamp(time_str, format_str="%Y-%m-%d_%H:%M:%S"):
    if time_str is None or format_str is None:
        return None
    try:
        timestamp = int(time.mktime(time.strptime(time_str, format_str)))
        return timestamp
    except Exception as e:
        print("Convert string : [%s] , format : [%s], to timestamp failed, detail error is [%s] . " % (time_str, format_str, str(e)))
        return None
    pass


def convert_timestamp_to_str(timestamp, format_str="%Y-%m-%d_%H:%M:%S"):
    if timestamp is None or format_str is None:
        return None
    try:
        time_str = time.strftime(format_str, time.localtime(timestamp))
        return time_str
    except Exception as e:
        print("Convert timestamp : [%s] , to time string, format : [%s] failed, detail error is [%s] . " % (str(timestamp), format_str, str(e)))
        return None
    pass

class FileMeta():
    def __init__(self):
        self.file_name = None
        self.length = 0
        self.last_modify_timestamp = 0
        pass

    def __str__(self):
        s = "FileMeta:\n"
        s += "file_name: %s\n" % self.file_name
        s += "length: %d\n" % self.length
        s += "last_modify_timestamp: %d\n" % self.last_modify_timestamp
        return s

    pass


def copy_file(old_path, new_path, overwrite=False):
    if not exists(old_path):
        print("Old path[%s] does not exist when copy file." % old_path)
        return False
    if not exists(os.path.dirname(new_path)):
        print("Parent dir does not exit for new_path[%s] when copy file." % new_path)
        return False
    try:
        tf.gfile.Copy(old_path, new_path, overwrite)
    except Exception as e:
        print("Error occurred when copy file from[%s] to[%s]" % (old_path, new_path))
        print(str(e))
        return False
    return True


def read_file(path):
    try:
        fp = tf.gfile.Open(path, mode="r")
        content = fp.read()
        fp.close()
    except Exception as e:
        print('Error occurred when read_file[%s].' % path)
        __print_exception()
        return False, None
    return True, content


def is_directory(path):
    return tf.gfile.IsDirectory(path)


def append_file(path, content):
    try:
        if tf.gfile.Exists(path):
            return write_file(path, content, "a+")
        return write_file(path, content, "w+")
    except Exception as e:
        print('Error occurred when append file [%s] . ' % path)
        __print_exception()
        return False
    pass


def exists(path):
    try:
        return tf.gfile.Exists(path)
    except Exception as e:
        print('Error occurred when check path[%s] exists. ' % path)
        __print_exception()
        return False


def __print_exception():
    print('%s' % (traceback.format_exc()))
    return


def write_file(path, content, mode="w+"):
    dir_path = os.path.dirname(path)
    try:
        if exists(path):
            if not tf.gfile.IsDirectory(dir_path):
                print('Parent path[%s] of file[%s] is not directory.'
                                  % (dir_path, path))
                return False
            pass
        else:
            make_dirs(dir_path)
            pass
        fp = tf.gfile.Open(path, mode=mode)
        fp.write(content)
        fp.close()
    except tf.errors.OpError as e:
        print('Error occurred when write_file[%s]' % path)
        __print_exception()
        return False
    return True


def rename(src_path, dest_path, overwrite=True):
    try:
        tf.gfile.Rename(src_path, dest_path, overwrite=overwrite)
    except Exception as e:
        print('Error occurred when rename file from [%s] to [%s].'
                          % (src_path, dest_path))
        __print_exception()
        return False
    return True


def make_dirs(path):
    try:
        tf.gfile.MakeDirs(path)
    except Exception as e:
        print('Error occurred when make dirs [%s]. ' % path)
        __print_exception()
        return False
    return True


def remove_file(path):
    try:
        tf.gfile.Remove(path)
    except Exception as e:
        print('Error occurred when remove file [%s].' % path)
        __print_exception()
        return False
    return True


def remove_dir(path):
    try:
        tf.gfile.DeleteRecursively(path)
    except Exception as e:
        print('Error occurred when remove dir [%s].' % path)
        __print_exception()
        return False
    return True


def is_hdfs_path(path):
    return path.startswith('hdfs://')


def _call_cmds(cmd_list):
    from subprocess import check_output
    try:
        return check_output(cmd_list)
    except Exception as e:
        print('Call cmd [%s] failed. ErrorMsg:[%s].' % (' '.join(cmd_list), str(e)))
        return None


def _convert_modify_time_str_to_timestamp(time_str):
    return convert_str_to_timestamp(time_str, "%Y-%m-%d %H:%M")


def _list_hdfs_files_recursively(dirname):
    content = _call_cmds(['/usr/lib/hadoop-2.6.0/bin/hdfs', 'dfs', '-ls', '-R', dirname])
    if content is None:
        error_msg = "Path [%s] is empty, not found files. " % dirname
        raise Exception(error_msg)
    lines = content.split('\n')
    dir_list = []
    file_meta_list = []
    for line in lines:
        items = line.split()
        if len(items) != 8:
            continue
        if items[1] == '-':
            continue
        else:
            meta = FileMeta()
            meta.length = int(items[4])

            last_modified_time = ' '.join([items[5], items[6]])
            meta.last_modify_timestamp = _convert_modify_time_str_to_timestamp(last_modified_time)
            meta.file_name = items[7]
            file_meta_list.append(meta)
            pass
        pass
    file_meta_list.sort(key=lambda x: x.file_name)
    return file_meta_list


def _list_local_files_recursively(path):
    try:
        if not is_directory(path):
            _file_list = [path]
            pass
        else:
            _file_list = [os.path.join(path, file_name) for file_name in os.listdir(path)]
            pass
        _file_list.sort()
        file_list = list()
        for file_name in _file_list:
            if not os.path.exists(file_name):
                print("File [%s] is not exist." % file_name)
            elif os.path.isfile(file_name):
                meta = FileMeta()
                meta.file_name = file_name
                meta.length = os.path.getsize(file_name)
                meta.last_modify_timestamp = int(os.path.getmtime(file_name))
                file_list.append(meta)
                pass
            else:
                sub_files = _list_local_files_recursively(file_name)
                file_list.extend(sub_files)
                pass
            pass
        file_list.sort(key=lambda x: x.file_name)
        return file_list
    except Exception as e:
        error_msg = 'Error occurred when list dir: %s, error message is :%s, please check or try again. ' % (
        path, str(e))
        raise Exception(error_msg)
    pass


def list_files_recursively(path):
    if not exists(path):
        error_msg = "Input Path: %s is not exist, please check input config or data source config is invalid " % path
        raise Exception(error_msg)

    if is_hdfs_path(path):
        return _list_hdfs_files_recursively(path)
    else:
        return _list_local_files_recursively(path)


def _is_temp_file(path):
    temp_filter = {"_temporary", "_SUCCESS"}
    if is_hdfs_path(path):
        path_split = path.split(HDFS_FILE_SEPARATOR)
    else:
        path_split = path.split(os.sep)

    for name in path_split:
        if name in temp_filter:
            return True
    return False


def _compile_pattern(filter_pattern):
    if not filter_pattern:
        return None
    try:
        return re.compile(filter_pattern)
    except Exception as e:
        error_msg = 'Input param filter_pattern : [%s] is a malformed regular expression , ErrorMsg: [%s]. ' % (filter_pattern, str(e))
        raise Exception(error_msg)
    pass


def list_files_recursively_with_filter(path, begin_time=None, end_time=None, latest_file_count=0, filter_pattern=None):
    file_meta_list = list_files_recursively(path)
    file_meta_list = [file_meta for file_meta in file_meta_list if not _is_temp_file(file_meta.file_name)]

    regex = _compile_pattern(filter_pattern)
    if regex is not None:
        file_meta_list = [file_meta for file_meta in file_meta_list if regex.search(file_meta.file_name) is None]

    begin_timestamp = 0

    if begin_time is not None:
        begin_time_trans_res = convert_str_to_timestamp(begin_time)
        if begin_time_trans_res is not None:
            begin_timestamp = begin_time_trans_res
        pass
    pass

    end_timestamp = 0
    if end_time is not None:
        end_time_trans_res = convert_str_to_timestamp(end_time)
        if end_time_trans_res is not None:
            end_timestamp = end_time_trans_res
        pass
    pass

    if end_timestamp > 0 and end_timestamp < begin_timestamp:
        error_msg = 'End time : [%s] is less than begin time : [%s], get file list failed. ' % (end_time, begin_time)
        raise Exception(error_msg)
    if begin_timestamp > 0 and end_timestamp > begin_timestamp:
        file_meta_list = [meta for meta in file_meta_list if (meta.last_modify_timestamp >= begin_timestamp
                                                              and meta.last_modify_timestamp <= end_timestamp)]
    elif begin_timestamp > 0 and end_timestamp == 0:
        file_meta_list = [meta for meta in file_meta_list if meta.last_modify_timestamp >= begin_timestamp]
    elif begin_timestamp == 0 and end_timestamp > 0:
        file_meta_list = [meta for meta in file_meta_list if meta.last_modify_timestamp <= end_timestamp]

    file_meta_list.sort(key=lambda x: x.last_modify_timestamp)
    if latest_file_count > 0 and len(file_meta_list) > latest_file_count:
        file_meta_list = file_meta_list[-latest_file_count:]
        pass
    return file_meta_list


def list_directory(path):
    try:
        if exists(path):
            files = tf.gfile.ListDirectory(path)
            return True, files
        else:
            return False, None
    except Exception as e:
        print('Error occured when list path[%s] . ' % path)
        __print_exception()
        return False, None

原文链接:https://blog.csdn.net/u012515223/article/details/108486126

0

1

2

3

4

5

6

7

8

9



所属网站分类: 技术文章 > 博客

作者:python是我的菜

链接: https://www.pythonheidong.com/blog/article/516296/69f308d1bfe31d49a09e/

来源: python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

21 0
收藏该文
已收藏

评论内容:(最多支持255个字符)