北肙

当你不能够再拥有,唯一可以做的,就是令自己不要忘记。

IBM RCMS Command Line Interface <1st Edition>

IBMPComm.py rcms_funcs.py read_config.py read_ot_excel.py transform_rcms_date.py rcms_menu.py import osfrom functions import read_configfrom functions import rcms_funcsmain_menu = ['\033[0;37m------------IBM RCMS MENU------------\033[0m', '\033[0;37m0. Functions\033[0m', '\t\033[0;31mRr. Read Current Configuration\033[0m', '\t\033[0;31mIi. Interactive Mode\033[0m', '\t\033[0;31mQq. Quit\033[0m', '\t\033[0;31mHh. Help\033[0m', '\033[0;37m1. RCMS\033[0m', '\t\033[0;36m1.1\033[0m Call Read', '\t\033[0;36m1.2\033[0m Call Search & Read', '\t\033[0;36m1.3\033[0m Create Call_Close Form', '\t\033[0;36m1.4\033[0m Product Inventory', '\033[0;37m2. Time Report\033[0m', '\t\033[0;36m2.1\033[0m Cut-off Day Time […]

IBMPComm.py

import os
import re
import subprocess
import logging
import time
class emulator(object):
    def __init__(self, args):
        self.args = args
        # print(self.args)
        self.socket = subprocess.Popen(self.args,
                                       # stdout = open('x3270.log', 'a'),
                                       stdout=subprocess.PIPE,
                                       stdin=subprocess.PIPE,
                                       stderr=subprocess.PIPE)
        # stderr = subprocess.PIPE)
        self.cmdStatus = None
        self.emuStatus = None
        self.drop = ''
        self.data = []
        self.cmd_str = ''
        self.emuStatusList = []
        self.lastCmdStatus = ''
        self.lastEmuStatus = ''
    def exec(self, cmd_str, protect=False):
        self.data = []
        self.cmd_str = cmd_str.encode() + b'\n'
        self.socket.stdin.write(self.cmd_str)
        self.socket.stdin.flush()
        if cmd_str.upper() == 'QUIT' or cmd_str.upper() == 'DISCONNECT':
            return True
        while True:
            line = self.socket.stdout.readline().decode().rstrip('\n').rstrip()
            # print(line)
            if line.startswith('data'):
                self.data.append(line.lstrip("data:"))
            if not line.startswith('data'):
                # status bar
                self.emuStatus = line
                self.cmdStatus = self.socket.stdout.readline().decode().rstrip('\n')
                self.emuStatusList = self.emuStatus.split()
                # print('COMMAND: {0} EXECUTED STATUS: {1}'.format(self.cmd_str, self.cmdStatus))
                '''
                # make sure emulator screen is formatted == 'F'
                # and field is not protected == 'U'
                # but some screen is not allowed to enter string,
                # the Field Protection is default 'P' not 'U'
                keyboard_status = self.emuStatusList[0]
                screen_formatting = self.emuStatusList[1]
                field_protection = self.emuStatusList[2]
                connection_status = self.emuStatusList[3]
                eumlator_mode = self.emuStatusList[4]
                emulator_mode_number = self.emuStatusList[5]
                number_of_rows = self.emuStatusList[6]
                number_of_cols = self.emuStatusList[7]
                cursor_row = self.emuStatusList[8]
                cursor_col = self.emuStatusList[9]
                window_id = self.emuStatusList[10]
                command_execution_time = self.emuStatusList[11]
                '''
                if not self.emuStatusList[3].startswith('C'):
                    raise ConnectionError('Host not Connected')
                while (not self.emuStatusList[1] == 'F') or (not self.emuStatusList[2] == 'U'):
                    time.sleep(0.3)
                    # time.sleep(1)
                    self.socket.stdin.write('enter'.encode() + b'\n')
                    self.socket.stdin.flush()
                    self.emuStatus = self.socket.stdout.readline().decode().rstrip('\n')
                    self.drop = self.socket.stdout.readline().decode().rstrip('\n')
                    self.emuStatusList = self.emuStatus.split()
                    if protect:
                        self.emuStatusList[2] = 'U'
                # The status message consists of 12 blank-separated fields
                # for debug
                # print('EMULATOR STATUS: ', self.emuStatus)
                self.lastCmdStatus = self.cmdStatus
                self.lastEmuStatus = self.emuStatus
                if self.cmdStatus == 'ok':
                    return True
                if self.cmdStatus == 'error':
                    return False
    def get_string(self, row, col, length, protect):
        if row < 1 or row > 24 or col < 1 or col > 80:
            raise IndexError('row or col out of range')
        # python list is start with 0
        row = row - 1
        # strings in data with one more space
        # as python list is start with 0
        # then self.col = col
        # col = col
        strEnd = col + length
        self.exec('PrintText(string)', protect)
        # test content in list data
        """
        j = 1
        for i in self.data:
            print('index{0}:{1}'.format(j, i))
            j += 1
        """
        return self.data[row][col:strEnd]
    '''
    def get_lines(self, row, n):
        if row < 1 or row > 24:
            raise IndexError('row out of range')
        self.row = row - 1
        self.exec('PrintText(string)')
        """
        return_strings = ''
        # must return string instead of list object
        for line in range(self.row, self.row + n):
            # if you want each line printed on the new line
            # modify '{0}' ends with \n
            return_strings += '{0}'.format(self.data[line].rstrip())
        return return_strings
        """
        return self.data[self.row:self.row + n]
    '''
    def send_string(self, row, col, send_str, protect):
        # As s3270 cursor is started with 0
        row = row - 1
        col = col - 1
        self.exec('MoveCursor({0}, {1})'.format(row, col), protect)
        self.exec('string({0})'.format(send_str), protect)
        return self.cmdStatus
    def print_screen(self):
        for line in self.data:
            print(line)
class rcms(object):
    def __init__(self, host):
        self.host = host
        self.args = ["s3270", "-xrm", "s3270.charset: chinese-gb18030"]
        self.x3270 = emulator(self.args)
    def connect(self):
        # Test IBM Host is Connected.
        if os.system('ping -c 3 -i 0.3 {0} >/dev/null 2>&1'.format(self.host)) != 0:
            raise ConnectionError('IBM Host Can Not be Reached!')
        self.x3270.exec('Connect({0})'.format(self.host))
    def disconnect(self, protect=True):
        self.x3270.exec('Disconnect', protect)
    def save_screen(self, filename, protect=False):
        self.x3270.exec('PrintText(html, file, {0})'.format(filename), protect)
    def print_screen(self, protect=False):
        self.x3270.exec('PrintText(string)', protect)
        self.x3270.print_screen()
    def get_string(self, row, col, length, protect=False):
        return self.x3270.get_string(row, col, length, protect)
    '''
    def get_lines(self, row, n):
        for line in range(1, n):
        return self.x3270.get_lines(row, n)
    '''
    def send_string(self, row, col, string_to_send, protect=False):
        string_to_send = '"' + string_to_send + '"'
        self.x3270.send_string(row, col, string_to_send, protect)
    def send_string_directly(self, string_to_send, protect=False):
        # in order to input ','
        # strings must be started with "
        string_to_send = '"' + string_to_send + '"'
        self.x3270.exec('string({0})'.format(string_to_send), protect)
    def send_pf(self, n, protect=False):
        # self.n = n
        self.x3270.exec('pf({0})'.format(n), protect)
    def send_enter(self, protect=False):
        self.x3270.exec('enter', protect)
    def quit(self, protect=True):
        self.x3270.exec('quit', protect)
    def wait_for_string(self, row, col, string_for_wait, protect=False):
        i = 0
        while i <= 5:
            if self.get_string(row, col, len(string_for_wait), protect) == string_for_wait.upper():
                return True
            i += 1
            time.sleep(1)
        return self.get_string(row, col, len(string_for_wait), protect) == string_for_wait
if __name__ == '__main__':
    r = rcms('hostip1.au.ibm.com')
    r.connect()
    r.send_string_directly('rcms')
    r.send_enter()
    r.send_string_directly('cavr71d')
    r.send_enter()
    r.send_string_directly('kiss4osu')
    r.send_enter()
    print('STRING is suppose to be "CMD": ', r.get_string(22, 2, 3))
    print(r.wait_for_string(22, 2, 'CMD'))
    r.send_string_directly('cr,p464cg1')
    r.print_screen()
    input('pause')
    r.send_enter()
    r.send_enter()
    r.print_screen()
    input('pause')
    print(r.get_string(3, 59, 7))
    input('pause')
    r.send_string_directly('ca,action', True)
    r.send_enter(True)
    r.send_enter(True)
    r.print_screen(True)
    r.send_pf(1, True)
    r.print_screen(True)
    r.disconnect(True)

rcms_funcs.py

from functions import IBMPComm
from functions import read_config
from functions import read_ot_excel
from functions import transform_rcms_date
from openpyxl import load_workbook
from datetime import datetime
import os
core_ut_code = ['10', '11', '12', '14', '15', '17', '20', '24',
                '30', '31', '32', '4010', '4011', '4012', '41',
                '4473', '48', '73', '74', '75']
non_ut_code = ['05', '34', '4020', '42', '47',
               '60', '80', '86', '90', '91', '93', '95']
str_mod = ['2145', '2423']
pwr_mod = ['9113', '9119', '9117']
try:
    r = IBMPComm.rcms(read_config.rcms_config['host'])
except ValueError:
    print('Init PComm Failed, IBM Host not reachable!')
rcms_status = False
'''
# Test IBM Host is Connected.
# Already Did in 'IBMPComm.py'
if os.system('ping -c 3 -i 0.3 hostip1.au.ibm.com >/dev/null 2>&1') != 0:
    raise ConnectionError('IBM Host Can Not be Reached!')
'''
def rcms_login():
    global rcms_status
    if rcms_status:
        return True
    try:
        r.connect()
        r.send_string_directly('rcms')
        r.send_enter()
        r.send_string_directly('c' + read_config.rcms_config['serial'])
        r.send_enter()
    except RuntimeError:
        print('Enter RCMS Menu Failed!')
    try:
        password = read_config.rcms_config['password']
        while password == '':
            password = input('Enter Your RCMS Password: ')
    except IOError:
        print('Get Password Failed')
    try:
        r.send_string_directly(password)
        r.send_enter()
        if not r.get_string(22, 2, 3) == 'CMD':
            raise RuntimeError('RCMS Login Failed!')
    except RuntimeError:
        print('RCMS Login Failed!')
    rcms_status = True
    return True
def get_notes_id(ssr_sn=''):
    if not rcms_login():
        rcms_login()
    if ssr_sn == '':
        r.send_string_directly('red')
    else:
        r.send_string_directly('red,0{0}'.format(ssr_sn))
    r.send_enter()
    return r.get_string(4, 45, 18).rstrip()
def call_read(call_number='', close_call=False):
    if not rcms_login():
        rcms_login()
    if call_number == '' or len(call_number) != 7:
        print('Wrong Call Number!')
        return None
    call_number = call_number.upper()
    # return strings to be defined
    call_info = {'account name': '',
                 'customer num': '',
                 'contract end date': '',
                 'RCMS call': '',
                 'last call': '',
                 'contact': '  ',
                 'problem type': '',
                 'prod/model sn': '',
                 # 'prod': '\t',
                 # 'model': '\t',
                 # 'sn': '\t',
                 'desc': '\t',
                 'type': '\t',
                 'st': '\t',
                 'sev': '\t',
                 'recall': '',
                 'ass. CE': '',
                 'create time': '',
                 'update time': '',
                 'sc/ac': '',
                 # 'sc': '',
                 # 'ac': '',
                 'type of maintenance': '',
                 'service level': '',
                 'comments': '\n',
                 'ecurep': '\n',
                 }
    fault_desc = []
    activity_summary = []
    parts_info = []
    r.send_string_directly('cr,' + call_number)
    r.send_enter()
    call_info['contract end date'] += r.get_string(9, 34, 8)
    call_info['customer num'] += r.get_string(6, 24, 6)
    r.send_enter()
    if not r.wait_for_string(3, 59, call_number):
        raise RuntimeError('Call Read Failed!')
    call_info['account name'] += r.get_string(4, 15, 20).strip()
    # call_info['customer num'] += r.get_string(4, 57, 6)
    call_info['RCMS call'] += r.get_string(3, 59, 7)
    call_info['last call'] += r.get_string(5, 59, 18)
    call_info['contact'] += r.get_string(8, 15, 25).strip()
    call_info['problem type'] += r.get_string(9, 16, 1)
    call_info['prod/model sn'] += r.get_string(10, 8, 4) + '/' \
                                  + r.get_string(10, 24, 3) + ' ' + \
                                  r.get_string(10, 32, 7)
    # call_info['prod'] += r.get_string(10, 8, 4)
    # call_info['model'] += r.get_string(10, 24, 3)
    # call_info['sn'] += r.get_string(10, 32, 7)
    call_info['desc'] += r.get_string(11, 8, 30)
    call_info['type of maintenance'] += r.get_string(13, 2, 20).strip()
    call_info['service level'] += r.get_string(14, 2, 78).strip()
    call_info['type'] += r.get_string(15, 11, 3)
    call_info['st'] += r.get_string(15, 19, 3)
    call_info['sev'] += r.get_string(15, 28, 1)
    call_info['recall'] += r.get_string(15, 37, 3).strip()
    call_info['ass. CE'] += r.get_string(16, 11, 30).strip()
    call_info['create time'] += r.get_string(17, 44, 15)
    call_info['update time'] += r.get_string(17, 61, 15)
    for line in range(0, 3):
        call_info['comments'] += '{0}\n'.format(r.get_string(19 + line, 1, 80).rstrip().lstrip(' comments '))
    write_switch = 0
    for page in range(1, int(r.get_string(1, 78, 2).strip())):
        for row in range(4, 22):
            if r.get_string(row, 2, 14) == '**ECUREP END**':
                write_switch = 0
            if write_switch == 1:
                if r.get_string(row, 1, 80).strip() != '':
                    call_info['ecurep'] += '{0}\n'.format(r.get_string(row, 1, 80).strip())
            if r.get_string(row, 2, 16) == '**ECUREP START**':
                write_switch = 1
        r.send_pf(8)
    # put the emulator back to 'CMD' mode
    r.send_pf(6)
    # move to action plan screen
    # get part info & fault desc
    r.send_string_directly('ca,action', True)
    r.send_enter(True)
    r.send_enter(True)
    # call_info['sc'] += r.get_string(11, 5, 2, True)
    # call_info['ac'] += r.get_string(11, 11, 2, True)
    call_info['sc/ac'] += r.get_string(11, 5, 2, True) + r.get_string(11, 11, 2, True)
    for plan in range(0, int(r.get_string(1, 54, 1, True))):
        fault_desc.append(r.get_string(8, 15, 60, True).strip())
        activity_summary.append(r.get_string(9, 20, 50, True).strip())
        i = 0
        while not r.get_string(15 + i, 7, 1, True) == '':
            parts_info.append(r.get_string(15 + i, 7, 32, True))
            i += 1
        r.send_pf(1, True)
    # put the emulator back to 'CMD' mode
    r.send_pf(6, True)
    r.send_pf(6, True)
    r.send_pf(6, True)
    r.send_pf(6, True)
    return_strings = ''
    for line in call_info:
        return_strings += '\033[0;31m{0}:\033[0m {1}\n'.format(line.upper(), call_info[line])
    for plan in range(0, len(fault_desc)):
        return_strings += '\033[0;31mPlan{0} fault desc: \033[0m{1}\n'.format(plan, fault_desc[plan])
    for plan in range(0, len(activity_summary)):
        return_strings += '\033[0;31mPlan{0} act summery: \033[0m{1}\n'.format(plan, activity_summary[plan])
    for line in range(0, len(parts_info)):
        return_strings += '\033[0;31mPart{0} no: \033[0m{1} \033[0;31mQty: \033[0m{2} \033[0;31mDesc: \033[0m{3}\n' \
            .format(line, parts_info[line][0:7], parts_info[line][30:32], parts_info[line][8:29].rstrip())
    if close_call:
        svc_log_template = '../template/svc_log_macos.xlsx'
        cr_time = call_info['create time'].split()[1] + call_info['create time'].split()[0]
        onsite_times = input('Onsite Times (1-3): ')
        for t in range(onsite_times):
            pass
        print('create_time: ', cr_time)
        dir_name = call_info['create time'].split(' ')[1].replace('/', '') \
                   + call_info['RCMS call'] + call_info['customer num'] \
                   + call_info['prod/model sn'].split('/')[0]
        workbook = load_workbook(svc_log_template)
        sheet = workbook.get_sheet_by_name('RCMS')
        sheet['B1'] = cr_time
        sheet['B4'] = call_info['RCMS call']
        sheet['B5'] = call_info['prod/model sn']
        sheet['B6'] = call_info['account name']
        sheet['B13'] = fault_desc
        workbook.save('sssssssss.xlsx')
        workbook.close()
    return return_strings
def call_search(notes_id=get_notes_id()):
    if not rcms_login():
        rcms_login()
    # 15 words limited
    notes_id = notes_id[0:16]
    call_number_list = []
    r.send_string_directly('cs,sdlncn,,,,,,,,,,,,,' + notes_id)
    # for i in range(1, 13):
    #    r.send_string_directly(',')
    r.send_enter()
    pages_of_call = int(r.get_string(1, 78, 2).strip())
    for i in range(0, pages_of_call):
        for j in range(0, 13):
            s = r.get_string(5 + j, 5, 40).rstrip()
            if not s == '':
                call_number_list.append(s)
            j += 1
        r.send_pf(8)
    '''
    return_string = ''
    for line in call_number_list:
        return_string += '{0}\n'.format(line)
    return return_string
    '''
    return call_number_list
def product_inventory(serial_number=''):
    if not rcms_login():
        rcms_login()
    if serial_number == '':
        return None
    elif len(serial_number) != 11:
        raise ValueError('Wrong Product Number! Should be MT+SN (91330600xxx).')
    machine_info = {'type/mod': '',
                    'serial  ': '',
                    'customer': '',
                    'type of maintenance': ''}
    r.send_string_directly('in,' + serial_number)
    r.send_enter()
    machine_info['type/mod'] += r.get_string(5, 9, 4) + '/' + r.get_string(5, 14, 3)
    machine_info['serial  '] += r.get_string(5, 18, 2) + r.get_string(5, 23, 5)
    machine_info['customer'] += r.get_string(5, 31, 6)
    machine_info['type of maintenance'] += r.get_string(5, 59, 20).rstrip()
    return_string = ''
    for line in machine_info:
        return_string += '\033[0;31m{0}: \033[0m{1}\n'.format(line.upper(), machine_info[line])
    r.send_string_directly('menu')
    r.send_enter()
    return return_string
# red,0AVRXXX,120220
# date output format: DDMMYY, such as 130220
# date input format: YYMMDD, such as: 200213
# output format:
# 20/01/13|080_t_090_SC80_120|130_t_140_SC17_180|
def time_report_read_day(tr_date='', ssr_sn=read_config.rcms_config['serial']):
    if not rcms_login():
        rcms_login()
    if tr_date == '' or len(tr_date) != 6:
        return None
    # red command needs SSR's SN started with '0'
    ssr_sn = '0' + ssr_sn
    # change time_report_date to input format
    tr_year = tr_date[0:2]
    tr_month = tr_date[2:4]
    tr_day = tr_date[4:6]
    if int(tr_year) < 0 or int(tr_year) > 99 or int(tr_month) < 1 \
            or int(tr_month) > 12 or int(tr_day) < 1 or int(tr_day) > 31:
        raise ValueError('Wrong Date Format.!')
    tr_date = tr_day + tr_month + tr_year
    time_report_item = []
    r.send_string_directly('red,{0},{1}'.format(ssr_sn, tr_date))
    r.send_enter()
    if not r.get_string(4, 64, 6) == 'report':
        raise RuntimeError('Show Screen is not what expected')
    i = 0
    while not r.get_string(7 + i, 6, 1) == '':
        time_report_item.append(r.get_string(7 + i, 9, 18))
        i += 1
    return time_report_item
# date format 2020/2/13 will be '200213'
# end date is not include
# if you need the data 2020/2/13 -- 2020/2/15
# you should better make 'end_date' equal '200216'
def time_report_summary(start_date='', end_date='', ssr_sn=''):
    if not rcms_login():
        rcms_login()
    if start_date == '':
        start_date = read_config.get_cutoff()[0].replace('20', '', 1)
    if end_date == '':
        end_date = read_config.get_cutoff()[1].replace('20', '', 1)
    if ssr_sn == '':
        ssr_sn = read_config.rcms_config['serial']
    if start_date == '' or end_date == '' or ssr_sn == '':
        return None
    try:
        int(start_date)
        int(end_date)
    except ValueError:
        print('Wrong Date Format!')
    if int(start_date) > int(end_date):
        return None
    service_code_time = {'10': 0, '11': 0, '12': 0, '14': 0,
                         '15': 0, '17': 0, '20': 0, '24': 0,
                         '30': 0, '31': 0, '32': 0, '4010': 0,
                         '4011': 0, '4012': 0, '41': 0, '4473': 0,
                         '48': 0, '73': 0, '74': 0, '75': 0,
                         '05': 0, '34': 0, '4020': 0, '42': 0,
                         '44': 0, '47': 0, '60': 0, '80': 0,
                         '86': 0, '90': 0, '91': 0, '93': 0, '95': 0}
    '''
    core_ut_code = ['10', '11', '12', '14', '15', '17', '20', '24',
                    '30', '31', '32', '4010', '4011', '4012', '41',
                    '4473', '48', '73', '74', '75']
    non_ut_code = ['05', '34', '4020', '42', '47',
                   '60', '80', '86', '90', '91', '93', '95']
    '''
    # exist time report items day by day
    time_report_items = []
    # time report situation each day within cut-off
    time_report_each_day = {}
    start_year = start_date[0:2]
    start_month = start_date[2:4]
    start_day = start_date[4:6]
    start_date = '{0}{1}{2}'.format(start_day, start_month, start_year)
    ssr_sn = '0' + ssr_sn
    end_year = end_date[0:2]
    end_month = end_date[2:4]
    end_day = end_date[4:6]
    end_date = '{0}/{1}/{2}'.format(end_year, end_month, end_day)
    r.send_string_directly('red,{0},{1}'.format(ssr_sn, start_date))
    r.send_enter()
    r.send_pf(12)
    available_days = 0
    while True:
        r.send_pf(1)
        # if someday of time report is not input
        # make the dict of that index a null string ''
        # and if this day is weekend, make that string 'weekend'
        # and as IBM says, if a weekend has no over time
        # whatever the day is adjusted to workday or holiday,
        # it is not necessary to input any code.
        # about over time verify, see other function.
        current_date = r.get_string(4, 7, 8)
        week = r.get_string(4, 16, 9).rstrip()
        if r.get_string(7, 6, 1) == '':
            if not (week == 'Sunday' or week == 'Saturday'):
                time_report_each_day.update({'{0}'.format(current_date): '\033[0;33mNeed to input\033[0m'})
            else:
                time_report_each_day.update({'{0}'.format(current_date): '\033[0;37mWeekend\033[0m'})
        # Get workdays, from Monday to Friday
        if not (week == 'Sunday' or week == 'Saturday'):
            available_days += 1
        i = 0
        while not r.get_string(7 + i, 6, 1) == '':
            # the first line must be 'update()'
            # because the dict 'time_report_each_day' has no this index
            # but the 2rd to the end line must be treat as 'string'
            if i == 0:
                time_report_each_day.update({'{0}'.format(current_date): '{0}'.format(r.get_string(7, 9, 8))})
            else:
                time_report_each_day['{0}'.format(current_date)] = \
                    time_report_each_day['{0}'.format(current_date)] + r.get_string(7 + i, 9, 8)
            # get string like this only when the day has input
            # 170 080  00  40 10  1 N     RFX  24237500BTL70
            time_report_items.append(r.get_string(7 + i, 9, 46))
            i += 1
        if r.get_string(4, 7, 8) == end_date:
            break
    # just for test
    # time_report_each_day = {'20/01/13': '120 040 180 140 235 050 ', '20/01/14': '040 040 '}
    for line in time_report_each_day:
        # len(time_report_each_day[line]) / 6 equal 'number of time group'
        # 120 040 180 040 235 050
        # tot trv tot trv tot trv
        # [0:3] [4:7]
        # [8:11] [12:15]
        # [16:19] [20:23]
        # [24:-1] is ' '
        if time_report_each_day[line] == '\033[0;37mWeekend\033[0m' \
                or time_report_each_day[line] == '\033[0;33mNeed to input\033[0m':
            number_of_time_group = 0
        else:
            number_of_time_group = int(len(time_report_each_day[line]) / 8)
        last_time = 0
        for count in range(0, number_of_time_group):
            stop_time = time_report_each_day[line][0 + count * 8: 3 + count * 8]
            trv_time = time_report_each_day[line][4 + count * 8: 7 + count * 8]
            # test for try..except..else
            try:
                start_time = int(stop_time) - int(trv_time)
            except IOError:
                rcms_logoff()
                print('stop_time or trv_time is empty.')
            else:
                if start_time < last_time:
                    time_report_each_day[line] = '\033[0;31mOverlap\033[0m'
                    break
                else:
                    last_time = int(stop_time)
                    start_time = str('{0:>3d}'.format(start_time))
                    time_report_each_day.update({line: '{0}'.format(
                        time_report_each_day[line].replace(stop_time, start_time, 1))})
                    time_report_each_day.update({line: '{0}'.format(
                        time_report_each_day[line].replace(trv_time, stop_time, 1))})
    time_report_each_day_str = '|  Date  |sta|stp|\n'
    for line in time_report_each_day:
        time_report_each_day_str += '{0}: {1}\n'.format(line, time_report_each_day[line])
    """
    time_report_items = time_report_read_day(start_date, ssr_sn).copy()
    # time_report_items.append(time_report_read_day(start_date, ssr_sn))
    available_days = 0
    while not r.get_string(4, 7, 8) == end_date:
        week = r.get_string(4, 16, 9).rstrip()
        if not (week == 'Sunday' or week == 'Saturday'):
            print('week: ', week)
            available_days += 1
        r.send_pf(1)
        # for debug
        # r.print_screen()
        i = 0
        while not r.get_string(7 + i, 6, 1) == '':
            time_report_items.append(r.get_string(7 + i, 9, 18))
            i += 1
    """
    for lines in time_report_items:
        """
        lines[0:3]                      stop
        lines[4:7]                      tot
        lines[9:11]                     trv
        lines[13:15]                    SC
        lines[16:18]                    AC
        lines[33:39] + lines[41:46]     Type+SN
        """
        # Based on SC, compute tot time.
        sc = lines[13:15]
        tot = int(lines[4:7]) / 10
        if sc != '40' and sc != '44':
            service_code_time['{0}'.format(sc)] += tot
        else:
            service_code_time['{0}{1}'.format(sc, lines[16:18])] += tot
    # compute UT code time
    ut_sum = 0
    core_ut_time_list = {}
    for index in core_ut_code:
        ut_sum += service_code_time['{0}'.format(index)]
        if service_code_time['{0}'.format(index)] != 0:
            core_ut_time_list.update({""'{0}'"".format(index): service_code_time['{0}'.format(index)]})
    # compute NON-UT code time
    non_ut_sum = 0
    non_ut_time_list = {}
    for index in non_ut_code:
        non_ut_sum += service_code_time['{0}'.format(index)]
        if service_code_time['{0}'.format(index)] != 0:
            non_ut_time_list.update({""'{0}'"".format(index): service_code_time['{0}'.format(index)]})
    available_hrs = available_days * 8
    return_strings = 'Time Report Detail: \n{6}' \
                     'Core UT Code Time List: \t{0}\n' \
                     'Core UT Time Summary: \t{1}\n' \
                     'Other & NON-UT Code Time List: \t{2}\n' \
                     'Other & NON-UT Time Summary: \t{3}\n' \
                     'Available Hours: \t{4}\n\n' \
                     'UT = {5}%'.format(core_ut_time_list,
                                        ut_sum,
                                        non_ut_time_list,
                                        non_ut_sum,
                                        available_hrs,
                                        ut_sum / available_hrs * 100,
                                        time_report_each_day_str)
    return return_strings
def is_code_utilization(svc_code):
    if svc_code in core_ut_code:
        print("'{0}' is Core UT Code.".format(svc_code))
        return True
    else:
        print("'{0}' is Non-UT Code.".format(svc_code))
        return False
def rcms_logoff():
    r.disconnect()
    r.quit()
# rep_date = 200219, year=20, month=02, day=19
def get_rep_timemap(rep_date, ssr_sn=read_config.rcms_config['serial']):
    try:
        int(rep_date)
    except ValueError:
        print('Wrong Time Report Date Format!')
    rep_year = rep_date[0:2]
    rep_month = rep_date[2:4]
    rep_day = rep_date[4:6]
    if not rcms_login():
        rcms_login()
    rep_date = []
    ut_code_time = []
    non_ut_code_time = []
    ut_code_timemap = set([])
    non_ut_code_timemap = set([])
    r.send_string_directly('red,0' + ssr_sn + ',' + str(rep_day) + str(rep_month) + str(rep_year))
    r.send_enter()
    print('Day: ', r.get_string(4, 7, 8))
    row = 7
    i = 0
    while r.get_string(row + i, 6, 1):
        if r.get_string(row + i, 22, 2) in core_ut_code \
                or r.get_string(row + i, 22, 2) + r.get_string(row + i, 25, 2) in core_ut_code:
            ut_code_time.append([r.get_string(row + i, 9, 3), r.get_string(row + i, 13, 3)])
        else:
            non_ut_code_time.append([r.get_string(row + i, 9, 3), r.get_string(row + i, 13, 3)])
        i += 1
    print('ut_code_time: ', ut_code_time)
    print('non_ut_code_time: ', non_ut_code_time)
    for timepoint in ut_code_time:
        print(timepoint)
        # ut_code_timemap.add(transform_rcms_date.stop_tans_to_set(timepoint[0], timepoint[1]))
        ut_code_timemap = ut_code_timemap | transform_rcms_date.stop_tans_to_set(timepoint[0], timepoint[1])
    temp = list(ut_code_timemap)
    temp.sort()
    print(temp)
    return ut_code_timemap
def verify_ot_oneday(rep_date, ssr_sn=read_config.rcms_config['serial']):
    try:
        int(rep_date)
    except ValueError:
        print('Wrong Time Report Date Format!')
    rep_year = rep_date[0:2]
    rep_month = rep_date[2:4]
    rep_day = rep_date[4:6]
    ut_code_timemap = get_rep_timemap(rep_date, ssr_sn)
    ot_timemap = read_ot_excel.get_one_day_ot_time(rep_date)
    if ut_code_timemap & ot_timemap == ot_timemap:
        print('ot if fine')
        return True
    else:
        print('ot need input')
        return False

read_config.py

import os.path
from datetime import date
# import os
config_files = {'rcms.conf', 'cut_off_date.conf', 'over_time.conf'}
if __name__ == '__main__':
    os.chdir('../config')
else:
    os.chdir('./config')
config_files_dir = os.getcwd() + '/'
rcms_avail_args = {'host', 'serial', 'full_name', 'password'}
# Args in RCMS config file
# Just like {'host': 'hostname'}
rcms_config = {}
# RCMS Cut-Off day
# It's just someday be split with '-'
# like ['2020', '01', '13']
cutoff_config = []
ot_file_loc = ''
for file in config_files:
    if not os.path.isfile(config_files_dir + file):
        raise FileNotFoundError('Config File "{0}" not Found!'.format(file))
    with open(file, 'r') as f:
        if f.read() == '':
            raise ValueError('Config "{0}" File Empty!'.format(file))
    '''
    if open(file, 'r').read() == '':
        raise Warning('Config "{0}" File Empty!'.format(file))
    open(file, 'r').close()
    '''
    # Read Config Files
    # Put the Strings to Different List
    # Named 'rcms_config' and 'cutoff_config'
    try:
        with open(file, 'r') as f:
            for line in f.readlines():
                if (not line.lstrip().startswith('#')) and (not line.strip() == ''):
                    if file == 'rcms.conf':
                        if line.split('=')[0].strip() in rcms_avail_args:
                            rcms_config[line.split('=')[0].strip()] = line.split('=')[1].strip()
                    elif file == 'cut_off_date.conf':
                        tmpDate = line.strip('\n').split('-')
                        if len(tmpDate) == 3:
                            try:
                                co_year = int(tmpDate[0])
                                co_month = int(tmpDate[1])
                                co_day = int(tmpDate[2])
                            except ValueError:
                                print("Found Strings in 'cut_off_date.conf'!")
                            else:
                                if co_year > 2099 or co_year < 1999 or co_month < 1 \
                                        or co_month > 12 or co_day < 1 or co_day > 31:
                                    raise Warning("Date in 'cut_off_date.conf' Out of Range!")
                                else:
                                    cutoff_config.append(line.strip('\n').split('-'))
                        else:
                            raise Warning("Wrong Format of 'cut_off_date.conf'!")
                    elif file == 'over_time.conf':
                        ot_file_loc = line.rstrip('\n')
    except FileExistsError:
        print('Config File Read Error!')
if rcms_config == {} or cutoff_config == []:
    raise ValueError('Nothing Found in Config Files!')
def get_cutoff():
    cur_date = str(date.today())
    # cur_date = '2020-02-12'
    cur_year = cur_date.split('-')[0]
    cur_month = cur_date.split('-')[1]
    cur_day = cur_date.split('-')[2]
    cur_cutoff_date = []
    for d in range(len(cutoff_config)):
        if cutoff_config[d][0] == cur_year and cutoff_config[d][1] == cur_month:
            if int(cur_day) > int(cutoff_config[d][2]):
                cur_cutoff_date.append(cutoff_config[d][0] + cutoff_config[d][1]
                                       + str(int(cutoff_config[d][2]) + 1))
                cur_cutoff_date.append(cutoff_config[d+1][0] + cutoff_config[d+1][1] + cutoff_config[d+1][2])
                return cur_cutoff_date
            else:
                cur_cutoff_date.append(cutoff_config[d-1][0] + cutoff_config[d-1][1]
                                       + str(int(cutoff_config[d-1][2]) + 1))
                cur_cutoff_date.append(cutoff_config[d][0] + cutoff_config[d][1] + cutoff_config[d][2])
                return cur_cutoff_date
if __name__ == '__main__':
    print('rcms config: ', rcms_config)
    print('cut off day config: ', cutoff_config)
    print(get_cutoff())
    print('ot file location: ', ot_file_loc)

read_ot_excel.py

from functions import read_config
import os
from openpyxl import load_workbook
from datetime import date
import datetime
from functions import transform_rcms_date
import time
ot_files_dir = ''
ot_files = []
ot_items = []
try:
    ot_files_dir = read_config.ot_file_loc
except RuntimeError:
    print('Read OT Request Form Location Failed!')
for file in os.listdir(ot_files_dir):
    if file.endswith('xlsx') or file.endswith('xls'):
        if not file.startswith('~') and not file.startswith('.'):
            ot_files.append(file)
try:
    os.chdir(ot_files_dir)
except IsADirectoryError:
    print("'{0}' May not Exist.".format(ot_files_dir))
for file in ot_files:
    workbook = load_workbook(file, read_only=True, data_only=True)
    sheet = workbook.get_sheet_by_name('Sheet1')
    max_row = sheet.max_row
    row = 3
    while True:
        row += 1
        if row > max_row:
            break
        if not str(sheet.cell(row, 6).value).startswith('20'):
            continue
        '''
        if not sheet.cell(row, 2).value == '张志杰':
            continue
        '''
        ot_date = sheet.cell(row, 6).value
        ot_start_time = sheet.cell(row, 7).value
        ot_end_time = sheet.cell(row, 8).value
        ot_time = sheet.cell(row, 9).value
        ot_customer = sheet.cell(row, 4).value
        ot_content = sheet.cell(row, 5).value
        ot_items.append([ot_date, ot_start_time, ot_end_time, ot_time, ot_customer, ot_content])
# Change time format
# 18:00 to 180, 18:30 to 185
for line in ot_items:
    try:
        line[0] = line[0].strftime('%y%m%d')
        line[1] = line[1].strftime('%H') + str(int(line[2].strftime('%M')) // 6)
        line[2] = line[2].strftime('%H') + str(int(line[2].strftime('%M')) // 6)
        if line[2] == '000':
            line[2] = '240'
        if type(line[3]) == datetime.time:
            line[3] = '{:0>3d}'.format(int(line[3].strftime('%H')) * 10)
        else:
            raise ValueError('Wrong Format of Date in OT Excel!')
    except ValueError:
        print('Wrong Format of Date in OT Excel!')
ot_items.sort()
def get_ot_cycle(ot_month=''):
    try:
        int(ot_month)
    except ValueError:
        print('Wrong Month Format!')
    cur_year = int(date.today().strftime('%y'))
    cur_month = int(date.today().strftime('%m'))
    if not ot_month:
        ot_month = cur_month
    try:
        int(ot_month)
    except ValueError:
        print("'{0}' is Not Number!".format(ot_month))
    if int(ot_month) > 12 or int(ot_month) < 1:
        print('Wrong Month!')
        return None
    '''
    If current month is 2 ad ot_month input '6'
    Then program think this month is in the last year
    '''
    ot_month = int(ot_month)
    if 2 <= ot_month <= cur_month:
        last_month = ot_month - 1
        last_year = cur_year
    elif ot_month == 1:
        last_year = cur_year - 1
        last_month = ot_month + 11
    else:
        last_year = cur_year - 1
        last_month = ot_month - 1
        cur_year -= 1
    sta_date = '{:0>2d}{:0>2d}{:0>2d}'.format(last_year, last_month, 21)
    end_date = '{:0>2d}{:0>2d}{:0>2d}'.format(cur_year, ot_month, 20)
    ot_cycle = [str(sta_date), str(end_date)]
    return ot_cycle
def get_ot_in_cycle(start_stop_date=[]):
    sta_date = start_stop_date[0]
    stp_date = start_stop_date[1]
    ot_items_in_cycle = []
    for item in ot_items:
        if sta_date <= item[0] <= stp_date:
            if item not in ot_items_in_cycle:
                ot_items_in_cycle.append(item)
    return ot_items_in_cycle
def get_ot_date_list(start_stop_date=[]):
    if not start_stop_date:
        return None
    ot_lists = get_ot_in_cycle(start_stop_date)
    ot_set = set([])
    for ot_list in ot_lists:
        ot_set.add(ot_list[0])
    ot_date_list = list(ot_set)
    ot_date_list.sort()
    return ot_date_list
def get_one_day_ot_time(overtime_date):
    try:
        int(overtime_date)
    except ValueError:
        print('Wrong Time Report Date Format!')
    ot_month = overtime_date[2:4]
    ot_day = overtime_date[4:6]
    cur_month = int(date.today().strftime('%m'))
    ot_month = int(ot_month)
    if int(ot_day) > 20:
        ot_month_tmp = ot_month + 1
    else:
        ot_month_tmp = ot_month
    ot_cycle = get_ot_cycle(ot_month_tmp)
    ot_lists = get_ot_in_cycle(ot_cycle)
    ot_date_list = get_ot_date_list(ot_cycle)
    tmp_date = '20' + str('{:0>2d}'.format(ot_month)) + str(ot_day)
    ot_time_set = set([])
    for ot_list in ot_lists:
        if ot_list[0] == tmp_date:
            ot_time_set = ot_time_set | transform_rcms_date.sta_trans_to_set(ot_list[1], ot_list[2])
            # ot_time_set.add(transform_rcms_date.sta_trans_to_set(ot_list[1], ot_list[2]))
            '''
            time_point = 0
            while int(ot_list[1]) + time_point <= int(ot_list[2]):
                ot_time_set.add('{:0>3d}'.format(int(ot_list[1]) + time_point))
                time_point += 5
            '''
    ot_time_list = list(ot_time_set)
    ot_time_list.sort()
    print(ot_time_list)
    return ot_time_set

transform_rcms_date.py


def sta_trans_to_set(sta_date, end_date):
    try:
        int(sta_date)
        int(end_date)
    except ValueError:
        print('Wrong Date Format!')
    sta_date = int(sta_date)
    end_date = int(end_date)
    dates = set([])
    while sta_date <= end_date:
        dates.add(sta_date)
        sta_date += 5
    return dates
def stop_tans_to_set(stop_date, tot):
    try:
        int(stop_date)
        int(tot)
    except ValueError:
        print('Wrong Date Format!')
    stop_date = int(stop_date)
    tot = int(tot)
    sta_date = stop_date - tot
    dates = set([])
    while sta_date <= stop_date:
        dates.add(sta_date)
        sta_date += 5
    return dates
def is_rep_date_overlap(rep_time_set, ot_time_set):
    if type(rep_time_set) != set:
        raise ValueError('Input Should be Set!')
    if type(ot_time_set) != set:
        raise ValueError('Input Should be Set!')
    if rep_time_set & ot_time_set == ot_time_set:
        return True
    else:
        return False
if __name__ == '__main__':
    rep_time1 = [['120', '040'], ['180', '040'], ['220', '040']]
    ot_time1 = [['180', '220']]
    rep_time2 = [['120', '040'], ['170', '040'], ['230', '060']]
    ot_time2 = [['180', '220']]
    rep_time3 = [['120', '040'], ['180', '040'], ['220', '030']]
    ot_time3 = [['180', '220']]
    rep_set = set([])
    ot_set = set([])
    for line in rep_time3:
        rep_set.update(stop_tans_to_set(line[0], line[1]))
    ot_set = sta_trans_to_set(ot_time3[0][0], ot_time3[0][1])
    print(rep_set)
    print(ot_set)
    print(is_rep_date_overlap(rep_set, ot_set))

rcms_menu.py

import os
from functions import read_config
from functions import rcms_funcs

main_menu = ['\033[0;37m------------IBM RCMS MENU------------\033[0m',
'\033[0;37m0. Functions\033[0m',
'\t\033[0;31mRr. Read Current Configuration\033[0m',
'\t\033[0;31mIi. Interactive Mode\033[0m',
'\t\033[0;31mQq. Quit\033[0m',
'\t\033[0;31mHh. Help\033[0m',
'\033[0;37m1. RCMS\033[0m',
'\t\033[0;36m1.1\033[0m Call Read',
'\t\033[0;36m1.2\033[0m Call Search & Read',
'\t\033[0;36m1.3\033[0m Create Call_Close Form',
'\t\033[0;36m1.4\033[0m Product Inventory',
'\033[0;37m2. Time Report\033[0m',
'\t\033[0;36m2.1\033[0m Cut-off Day Time Report Summary',
'\t\033[0;36m2.2\033[0m Time Report Summary for Others',
'\t\033[0;36m2.3\033[0m One-key Fill SC80/95',
'\t\033[0;36m2.4\033[0m Over Time Verify (from 21 to 20)',
'\t\033[0;36m2.5\033[0m UT Code or Not',
'\033[0;37m3. PMH\033[0m',
'\t\033[0;36m3.1\033[0m Create PMH',
'\n']

right_number = ['r', 'i', 'q', 'h',
'R', 'I', 'Q', 'H',
'1.1', '1.2', '1.3', '1.4',
'2.1', '2.2', '2.3', '2.4', '2.5', '2.6'
'3.1']

svc_code_list = ['10', '11', '12', '14', '15', '17', '20', '24', '30',
'31', '32', '4010', '4011', '4012', '41', '4473', '48',
'73', '74', '75', '05', '34', '4020', '42', '47', '60',
'80', '86', '90', '91', '93', '95']

last_call_number = ''
last_call_number_list = []

while True:
for line in main_menu:
print(line)

i = input('CMD==> ')
while i not in right_number:
print('[ERROR]: Input Error!')
i = input('CMD==> ')

if i.upper() == 'R':
print('\033[0;37mRCMS Config:\033[0m')
try:
for line in read_config.rcms_config:
print('\t\033[0;36m{0}:\033[0m {1}'.format(line, read_config.rcms_config[line]))
except RuntimeError:
print('Read RCMS Config Failed!')

print('\033[0;37mCurrent Cut-Off Day:\033[0m')
try:
print('\t{0} -- {1}'.format(read_config.get_cutoff()[0], read_config.get_cutoff()[1]))
except RuntimeError:
print('Read Cut-Off Config Failed!')

input('\nPress Enter to be Continue!')

elif i.upper() == 'I':
print('Interactive Mode not Support Yet!')
input('\nPress Enter to be Continue!')

elif i.upper() == 'H':
print("Mail to '[email protected]' for Any Help.")
input('\nPress Enter to be Continue!')

elif i.upper() == 'Q':
try:
rcms_funcs.rcms_logoff()
except RuntimeError:
print('Maybe RCMS is Already Logoff.')
print('Quit!')
exit(0)

# Call Read
elif i == '1.1':
call_number = input("Call Number (Last '{0}'): ".format(last_call_number))
if call_number == '':
call_number = last_call_number
print(rcms_funcs.call_read(call_number))
last_call_number = call_number
input('\nPress Enter to be Continue!')

# Call Search & Read
elif i == '1.2':
call_number_list = []
if last_call_number_list:
print('Last Call Number List: ')
for line in last_call_number_list:
print(line)

input_str = input('Index Number to Read (1-99), or Enter to go on search: ')
if input_str:
print(rcms_funcs.call_read(last_call_number_list[int(input_str) - 1][3:10]))

notes_id = rcms_funcs.get_notes_id()
input_str = input("Notes ID go on Search (Default '{0}'): ".format(notes_id))
if input_str:
notes_id = input_str
call_number_list = rcms_funcs.call_search(notes_id)
for line in call_number_list:
print(line)
input_str = input('Index Number to Read (1-99), or Enter to be Continue: ')
if input_str:
print(rcms_funcs.call_read(call_number_list[int(input_str) - 1][3:10]))

last_call_number_list = call_number_list.copy()
input('\nPress Enter to be Continue.')

# Create Call_Close Form
elif i == '1.3':
pass

# Product Inventory
elif i == '1.4':
print(rcms_funcs.product_inventory(input('Machine Model & SN (913306001DH): ')))
input('\nPress Enter to be Continue.')

# Cut-off Day Time Report Summary
elif i == '2.1':
start_date = read_config.get_cutoff()[0].replace('20', '', 1)
end_date = read_config.get_cutoff()[1].replace('20', '', 1)
ssr_sn = read_config.rcms_config['serial']

print(rcms_funcs.time_report_summary(input("Start Date (Default '{0}'): ".format(start_date)),
input("End Date (Default '{0}'): ".format(end_date)),
input("SSR Serial Number (Default '{0}'): ".format(ssr_sn))))
input('\nPress Enter to be Continue.')

# One-key Fill SC80/95
elif i == '2.2':
pass

# List Time Report Free Day
elif i == '2.3':
pass

# Over Time Verify
elif i == '2.4':
pass

elif i == '2.5':
svc_code = input('Code: ')
while svc_code not in svc_code_list:
print('Wrong Code, Try Again!')
svc_code = input('Code: ')
rcms_funcs.is_code_utilization(svc_code)

input('\nPress Enter to be Continue!')

# Create PMH
elif i == '3.1':
pass

elif i == '4':
pass

else:
print('[ERROR')
continue

Leave a Reply

Your email address will not be published. Required fields are marked *