IBMPComm.py
import os
import re
import subprocess
import logging
import time
class emulator(object):
def __init__(self, args):
self.args = args
# print(self.args)
self.socket = subprocess.Popen(self.args,
# stdout = open('x3270.log', 'a'),
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
# stderr = subprocess.PIPE)
self.cmdStatus = None
self.emuStatus = None
self.drop = ''
self.data = []
self.cmd_str = ''
self.emuStatusList = []
self.lastCmdStatus = ''
self.lastEmuStatus = ''
def exec(self, cmd_str, protect=False):
self.data = []
self.cmd_str = cmd_str.encode() + b'\n'
self.socket.stdin.write(self.cmd_str)
self.socket.stdin.flush()
if cmd_str.upper() == 'QUIT' or cmd_str.upper() == 'DISCONNECT':
return True
while True:
line = self.socket.stdout.readline().decode().rstrip('\n').rstrip()
# print(line)
if line.startswith('data'):
self.data.append(line.lstrip("data:"))
if not line.startswith('data'):
# status bar
self.emuStatus = line
self.cmdStatus = self.socket.stdout.readline().decode().rstrip('\n')
self.emuStatusList = self.emuStatus.split()
# print('COMMAND: {0} EXECUTED STATUS: {1}'.format(self.cmd_str, self.cmdStatus))
'''
# make sure emulator screen is formatted == 'F'
# and field is not protected == 'U'
# but some screen is not allowed to enter string,
# the Field Protection is default 'P' not 'U'
keyboard_status = self.emuStatusList[0]
screen_formatting = self.emuStatusList[1]
field_protection = self.emuStatusList[2]
connection_status = self.emuStatusList[3]
eumlator_mode = self.emuStatusList[4]
emulator_mode_number = self.emuStatusList[5]
number_of_rows = self.emuStatusList[6]
number_of_cols = self.emuStatusList[7]
cursor_row = self.emuStatusList[8]
cursor_col = self.emuStatusList[9]
window_id = self.emuStatusList[10]
command_execution_time = self.emuStatusList[11]
'''
if not self.emuStatusList[3].startswith('C'):
raise ConnectionError('Host not Connected')
while (not self.emuStatusList[1] == 'F') or (not self.emuStatusList[2] == 'U'):
time.sleep(0.3)
# time.sleep(1)
self.socket.stdin.write('enter'.encode() + b'\n')
self.socket.stdin.flush()
self.emuStatus = self.socket.stdout.readline().decode().rstrip('\n')
self.drop = self.socket.stdout.readline().decode().rstrip('\n')
self.emuStatusList = self.emuStatus.split()
if protect:
self.emuStatusList[2] = 'U'
# The status message consists of 12 blank-separated fields
# for debug
# print('EMULATOR STATUS: ', self.emuStatus)
self.lastCmdStatus = self.cmdStatus
self.lastEmuStatus = self.emuStatus
if self.cmdStatus == 'ok':
return True
if self.cmdStatus == 'error':
return False
def get_string(self, row, col, length, protect):
if row < 1 or row > 24 or col < 1 or col > 80:
raise IndexError('row or col out of range')
# python list is start with 0
row = row - 1
# strings in data with one more space
# as python list is start with 0
# then self.col = col
# col = col
strEnd = col + length
self.exec('PrintText(string)', protect)
# test content in list data
"""
j = 1
for i in self.data:
print('index{0}:{1}'.format(j, i))
j += 1
"""
return self.data[row][col:strEnd]
'''
def get_lines(self, row, n):
if row < 1 or row > 24:
raise IndexError('row out of range')
self.row = row - 1
self.exec('PrintText(string)')
"""
return_strings = ''
# must return string instead of list object
for line in range(self.row, self.row + n):
# if you want each line printed on the new line
# modify '{0}' ends with \n
return_strings += '{0}'.format(self.data[line].rstrip())
return return_strings
"""
return self.data[self.row:self.row + n]
'''
def send_string(self, row, col, send_str, protect):
# As s3270 cursor is started with 0
row = row - 1
col = col - 1
self.exec('MoveCursor({0}, {1})'.format(row, col), protect)
self.exec('string({0})'.format(send_str), protect)
return self.cmdStatus
def print_screen(self):
for line in self.data:
print(line)
class rcms(object):
def __init__(self, host):
self.host = host
self.args = ["s3270", "-xrm", "s3270.charset: chinese-gb18030"]
self.x3270 = emulator(self.args)
def connect(self):
# Test IBM Host is Connected.
if os.system('ping -c 3 -i 0.3 {0} >/dev/null 2>&1'.format(self.host)) != 0:
raise ConnectionError('IBM Host Can Not be Reached!')
self.x3270.exec('Connect({0})'.format(self.host))
def disconnect(self, protect=True):
self.x3270.exec('Disconnect', protect)
def save_screen(self, filename, protect=False):
self.x3270.exec('PrintText(html, file, {0})'.format(filename), protect)
def print_screen(self, protect=False):
self.x3270.exec('PrintText(string)', protect)
self.x3270.print_screen()
def get_string(self, row, col, length, protect=False):
return self.x3270.get_string(row, col, length, protect)
'''
def get_lines(self, row, n):
for line in range(1, n):
return self.x3270.get_lines(row, n)
'''
def send_string(self, row, col, string_to_send, protect=False):
string_to_send = '"' + string_to_send + '"'
self.x3270.send_string(row, col, string_to_send, protect)
def send_string_directly(self, string_to_send, protect=False):
# in order to input ','
# strings must be started with "
string_to_send = '"' + string_to_send + '"'
self.x3270.exec('string({0})'.format(string_to_send), protect)
def send_pf(self, n, protect=False):
# self.n = n
self.x3270.exec('pf({0})'.format(n), protect)
def send_enter(self, protect=False):
self.x3270.exec('enter', protect)
def quit(self, protect=True):
self.x3270.exec('quit', protect)
def wait_for_string(self, row, col, string_for_wait, protect=False):
i = 0
while i <= 5:
if self.get_string(row, col, len(string_for_wait), protect) == string_for_wait.upper():
return True
i += 1
time.sleep(1)
return self.get_string(row, col, len(string_for_wait), protect) == string_for_wait
if __name__ == '__main__':
r = rcms('hostip1.au.ibm.com')
r.connect()
r.send_string_directly('rcms')
r.send_enter()
r.send_string_directly('cavr71d')
r.send_enter()
r.send_string_directly('kiss4osu')
r.send_enter()
print('STRING is suppose to be "CMD": ', r.get_string(22, 2, 3))
print(r.wait_for_string(22, 2, 'CMD'))
r.send_string_directly('cr,p464cg1')
r.print_screen()
input('pause')
r.send_enter()
r.send_enter()
r.print_screen()
input('pause')
print(r.get_string(3, 59, 7))
input('pause')
r.send_string_directly('ca,action', True)
r.send_enter(True)
r.send_enter(True)
r.print_screen(True)
r.send_pf(1, True)
r.print_screen(True)
r.disconnect(True)
rcms_funcs.py
from functions import IBMPComm
from functions import read_config
from functions import read_ot_excel
from functions import transform_rcms_date
from openpyxl import load_workbook
from datetime import datetime
import os
core_ut_code = ['10', '11', '12', '14', '15', '17', '20', '24',
'30', '31', '32', '4010', '4011', '4012', '41',
'4473', '48', '73', '74', '75']
non_ut_code = ['05', '34', '4020', '42', '47',
'60', '80', '86', '90', '91', '93', '95']
str_mod = ['2145', '2423']
pwr_mod = ['9113', '9119', '9117']
try:
r = IBMPComm.rcms(read_config.rcms_config['host'])
except ValueError:
print('Init PComm Failed, IBM Host not reachable!')
rcms_status = False
'''
# Test IBM Host is Connected.
# Already Did in 'IBMPComm.py'
if os.system('ping -c 3 -i 0.3 hostip1.au.ibm.com >/dev/null 2>&1') != 0:
raise ConnectionError('IBM Host Can Not be Reached!')
'''
def rcms_login():
global rcms_status
if rcms_status:
return True
try:
r.connect()
r.send_string_directly('rcms')
r.send_enter()
r.send_string_directly('c' + read_config.rcms_config['serial'])
r.send_enter()
except RuntimeError:
print('Enter RCMS Menu Failed!')
try:
password = read_config.rcms_config['password']
while password == '':
password = input('Enter Your RCMS Password: ')
except IOError:
print('Get Password Failed')
try:
r.send_string_directly(password)
r.send_enter()
if not r.get_string(22, 2, 3) == 'CMD':
raise RuntimeError('RCMS Login Failed!')
except RuntimeError:
print('RCMS Login Failed!')
rcms_status = True
return True
def get_notes_id(ssr_sn=''):
if not rcms_login():
rcms_login()
if ssr_sn == '':
r.send_string_directly('red')
else:
r.send_string_directly('red,0{0}'.format(ssr_sn))
r.send_enter()
return r.get_string(4, 45, 18).rstrip()
def call_read(call_number='', close_call=False):
if not rcms_login():
rcms_login()
if call_number == '' or len(call_number) != 7:
print('Wrong Call Number!')
return None
call_number = call_number.upper()
# return strings to be defined
call_info = {'account name': '',
'customer num': '',
'contract end date': '',
'RCMS call': '',
'last call': '',
'contact': ' ',
'problem type': '',
'prod/model sn': '',
# 'prod': '\t',
# 'model': '\t',
# 'sn': '\t',
'desc': '\t',
'type': '\t',
'st': '\t',
'sev': '\t',
'recall': '',
'ass. CE': '',
'create time': '',
'update time': '',
'sc/ac': '',
# 'sc': '',
# 'ac': '',
'type of maintenance': '',
'service level': '',
'comments': '\n',
'ecurep': '\n',
}
fault_desc = []
activity_summary = []
parts_info = []
r.send_string_directly('cr,' + call_number)
r.send_enter()
call_info['contract end date'] += r.get_string(9, 34, 8)
call_info['customer num'] += r.get_string(6, 24, 6)
r.send_enter()
if not r.wait_for_string(3, 59, call_number):
raise RuntimeError('Call Read Failed!')
call_info['account name'] += r.get_string(4, 15, 20).strip()
# call_info['customer num'] += r.get_string(4, 57, 6)
call_info['RCMS call'] += r.get_string(3, 59, 7)
call_info['last call'] += r.get_string(5, 59, 18)
call_info['contact'] += r.get_string(8, 15, 25).strip()
call_info['problem type'] += r.get_string(9, 16, 1)
call_info['prod/model sn'] += r.get_string(10, 8, 4) + '/' \
+ r.get_string(10, 24, 3) + ' ' + \
r.get_string(10, 32, 7)
# call_info['prod'] += r.get_string(10, 8, 4)
# call_info['model'] += r.get_string(10, 24, 3)
# call_info['sn'] += r.get_string(10, 32, 7)
call_info['desc'] += r.get_string(11, 8, 30)
call_info['type of maintenance'] += r.get_string(13, 2, 20).strip()
call_info['service level'] += r.get_string(14, 2, 78).strip()
call_info['type'] += r.get_string(15, 11, 3)
call_info['st'] += r.get_string(15, 19, 3)
call_info['sev'] += r.get_string(15, 28, 1)
call_info['recall'] += r.get_string(15, 37, 3).strip()
call_info['ass. CE'] += r.get_string(16, 11, 30).strip()
call_info['create time'] += r.get_string(17, 44, 15)
call_info['update time'] += r.get_string(17, 61, 15)
for line in range(0, 3):
call_info['comments'] += '{0}\n'.format(r.get_string(19 + line, 1, 80).rstrip().lstrip(' comments '))
write_switch = 0
for page in range(1, int(r.get_string(1, 78, 2).strip())):
for row in range(4, 22):
if r.get_string(row, 2, 14) == '**ECUREP END**':
write_switch = 0
if write_switch == 1:
if r.get_string(row, 1, 80).strip() != '':
call_info['ecurep'] += '{0}\n'.format(r.get_string(row, 1, 80).strip())
if r.get_string(row, 2, 16) == '**ECUREP START**':
write_switch = 1
r.send_pf(8)
# put the emulator back to 'CMD' mode
r.send_pf(6)
# move to action plan screen
# get part info & fault desc
r.send_string_directly('ca,action', True)
r.send_enter(True)
r.send_enter(True)
# call_info['sc'] += r.get_string(11, 5, 2, True)
# call_info['ac'] += r.get_string(11, 11, 2, True)
call_info['sc/ac'] += r.get_string(11, 5, 2, True) + r.get_string(11, 11, 2, True)
for plan in range(0, int(r.get_string(1, 54, 1, True))):
fault_desc.append(r.get_string(8, 15, 60, True).strip())
activity_summary.append(r.get_string(9, 20, 50, True).strip())
i = 0
while not r.get_string(15 + i, 7, 1, True) == '':
parts_info.append(r.get_string(15 + i, 7, 32, True))
i += 1
r.send_pf(1, True)
# put the emulator back to 'CMD' mode
r.send_pf(6, True)
r.send_pf(6, True)
r.send_pf(6, True)
r.send_pf(6, True)
return_strings = ''
for line in call_info:
return_strings += '\033[0;31m{0}:\033[0m {1}\n'.format(line.upper(), call_info[line])
for plan in range(0, len(fault_desc)):
return_strings += '\033[0;31mPlan{0} fault desc: \033[0m{1}\n'.format(plan, fault_desc[plan])
for plan in range(0, len(activity_summary)):
return_strings += '\033[0;31mPlan{0} act summery: \033[0m{1}\n'.format(plan, activity_summary[plan])
for line in range(0, len(parts_info)):
return_strings += '\033[0;31mPart{0} no: \033[0m{1} \033[0;31mQty: \033[0m{2} \033[0;31mDesc: \033[0m{3}\n' \
.format(line, parts_info[line][0:7], parts_info[line][30:32], parts_info[line][8:29].rstrip())
if close_call:
svc_log_template = '../template/svc_log_macos.xlsx'
cr_time = call_info['create time'].split()[1] + call_info['create time'].split()[0]
onsite_times = input('Onsite Times (1-3): ')
for t in range(onsite_times):
pass
print('create_time: ', cr_time)
dir_name = call_info['create time'].split(' ')[1].replace('/', '') \
+ call_info['RCMS call'] + call_info['customer num'] \
+ call_info['prod/model sn'].split('/')[0]
workbook = load_workbook(svc_log_template)
sheet = workbook.get_sheet_by_name('RCMS')
sheet['B1'] = cr_time
sheet['B4'] = call_info['RCMS call']
sheet['B5'] = call_info['prod/model sn']
sheet['B6'] = call_info['account name']
sheet['B13'] = fault_desc
workbook.save('sssssssss.xlsx')
workbook.close()
return return_strings
def call_search(notes_id=get_notes_id()):
if not rcms_login():
rcms_login()
# 15 words limited
notes_id = notes_id[0:16]
call_number_list = []
r.send_string_directly('cs,sdlncn,,,,,,,,,,,,,' + notes_id)
# for i in range(1, 13):
# r.send_string_directly(',')
r.send_enter()
pages_of_call = int(r.get_string(1, 78, 2).strip())
for i in range(0, pages_of_call):
for j in range(0, 13):
s = r.get_string(5 + j, 5, 40).rstrip()
if not s == '':
call_number_list.append(s)
j += 1
r.send_pf(8)
'''
return_string = ''
for line in call_number_list:
return_string += '{0}\n'.format(line)
return return_string
'''
return call_number_list
def product_inventory(serial_number=''):
if not rcms_login():
rcms_login()
if serial_number == '':
return None
elif len(serial_number) != 11:
raise ValueError('Wrong Product Number! Should be MT+SN (91330600xxx).')
machine_info = {'type/mod': '',
'serial ': '',
'customer': '',
'type of maintenance': ''}
r.send_string_directly('in,' + serial_number)
r.send_enter()
machine_info['type/mod'] += r.get_string(5, 9, 4) + '/' + r.get_string(5, 14, 3)
machine_info['serial '] += r.get_string(5, 18, 2) + r.get_string(5, 23, 5)
machine_info['customer'] += r.get_string(5, 31, 6)
machine_info['type of maintenance'] += r.get_string(5, 59, 20).rstrip()
return_string = ''
for line in machine_info:
return_string += '\033[0;31m{0}: \033[0m{1}\n'.format(line.upper(), machine_info[line])
r.send_string_directly('menu')
r.send_enter()
return return_string
# red,0AVRXXX,120220
# date output format: DDMMYY, such as 130220
# date input format: YYMMDD, such as: 200213
# output format:
# 20/01/13|080_t_090_SC80_120|130_t_140_SC17_180|
def time_report_read_day(tr_date='', ssr_sn=read_config.rcms_config['serial']):
if not rcms_login():
rcms_login()
if tr_date == '' or len(tr_date) != 6:
return None
# red command needs SSR's SN started with '0'
ssr_sn = '0' + ssr_sn
# change time_report_date to input format
tr_year = tr_date[0:2]
tr_month = tr_date[2:4]
tr_day = tr_date[4:6]
if int(tr_year) < 0 or int(tr_year) > 99 or int(tr_month) < 1 \
or int(tr_month) > 12 or int(tr_day) < 1 or int(tr_day) > 31:
raise ValueError('Wrong Date Format.!')
tr_date = tr_day + tr_month + tr_year
time_report_item = []
r.send_string_directly('red,{0},{1}'.format(ssr_sn, tr_date))
r.send_enter()
if not r.get_string(4, 64, 6) == 'report':
raise RuntimeError('Show Screen is not what expected')
i = 0
while not r.get_string(7 + i, 6, 1) == '':
time_report_item.append(r.get_string(7 + i, 9, 18))
i += 1
return time_report_item
# date format 2020/2/13 will be '200213'
# end date is not include
# if you need the data 2020/2/13 -- 2020/2/15
# you should better make 'end_date' equal '200216'
def time_report_summary(start_date='', end_date='', ssr_sn=''):
if not rcms_login():
rcms_login()
if start_date == '':
start_date = read_config.get_cutoff()[0].replace('20', '', 1)
if end_date == '':
end_date = read_config.get_cutoff()[1].replace('20', '', 1)
if ssr_sn == '':
ssr_sn = read_config.rcms_config['serial']
if start_date == '' or end_date == '' or ssr_sn == '':
return None
try:
int(start_date)
int(end_date)
except ValueError:
print('Wrong Date Format!')
if int(start_date) > int(end_date):
return None
service_code_time = {'10': 0, '11': 0, '12': 0, '14': 0,
'15': 0, '17': 0, '20': 0, '24': 0,
'30': 0, '31': 0, '32': 0, '4010': 0,
'4011': 0, '4012': 0, '41': 0, '4473': 0,
'48': 0, '73': 0, '74': 0, '75': 0,
'05': 0, '34': 0, '4020': 0, '42': 0,
'44': 0, '47': 0, '60': 0, '80': 0,
'86': 0, '90': 0, '91': 0, '93': 0, '95': 0}
'''
core_ut_code = ['10', '11', '12', '14', '15', '17', '20', '24',
'30', '31', '32', '4010', '4011', '4012', '41',
'4473', '48', '73', '74', '75']
non_ut_code = ['05', '34', '4020', '42', '47',
'60', '80', '86', '90', '91', '93', '95']
'''
# exist time report items day by day
time_report_items = []
# time report situation each day within cut-off
time_report_each_day = {}
start_year = start_date[0:2]
start_month = start_date[2:4]
start_day = start_date[4:6]
start_date = '{0}{1}{2}'.format(start_day, start_month, start_year)
ssr_sn = '0' + ssr_sn
end_year = end_date[0:2]
end_month = end_date[2:4]
end_day = end_date[4:6]
end_date = '{0}/{1}/{2}'.format(end_year, end_month, end_day)
r.send_string_directly('red,{0},{1}'.format(ssr_sn, start_date))
r.send_enter()
r.send_pf(12)
available_days = 0
while True:
r.send_pf(1)
# if someday of time report is not input
# make the dict of that index a null string ''
# and if this day is weekend, make that string 'weekend'
# and as IBM says, if a weekend has no over time
# whatever the day is adjusted to workday or holiday,
# it is not necessary to input any code.
# about over time verify, see other function.
current_date = r.get_string(4, 7, 8)
week = r.get_string(4, 16, 9).rstrip()
if r.get_string(7, 6, 1) == '':
if not (week == 'Sunday' or week == 'Saturday'):
time_report_each_day.update({'{0}'.format(current_date): '\033[0;33mNeed to input\033[0m'})
else:
time_report_each_day.update({'{0}'.format(current_date): '\033[0;37mWeekend\033[0m'})
# Get workdays, from Monday to Friday
if not (week == 'Sunday' or week == 'Saturday'):
available_days += 1
i = 0
while not r.get_string(7 + i, 6, 1) == '':
# the first line must be 'update()'
# because the dict 'time_report_each_day' has no this index
# but the 2rd to the end line must be treat as 'string'
if i == 0:
time_report_each_day.update({'{0}'.format(current_date): '{0}'.format(r.get_string(7, 9, 8))})
else:
time_report_each_day['{0}'.format(current_date)] = \
time_report_each_day['{0}'.format(current_date)] + r.get_string(7 + i, 9, 8)
# get string like this only when the day has input
# 170 080 00 40 10 1 N RFX 24237500BTL70
time_report_items.append(r.get_string(7 + i, 9, 46))
i += 1
if r.get_string(4, 7, 8) == end_date:
break
# just for test
# time_report_each_day = {'20/01/13': '120 040 180 140 235 050 ', '20/01/14': '040 040 '}
for line in time_report_each_day:
# len(time_report_each_day[line]) / 6 equal 'number of time group'
# 120 040 180 040 235 050
# tot trv tot trv tot trv
# [0:3] [4:7]
# [8:11] [12:15]
# [16:19] [20:23]
# [24:-1] is ' '
if time_report_each_day[line] == '\033[0;37mWeekend\033[0m' \
or time_report_each_day[line] == '\033[0;33mNeed to input\033[0m':
number_of_time_group = 0
else:
number_of_time_group = int(len(time_report_each_day[line]) / 8)
last_time = 0
for count in range(0, number_of_time_group):
stop_time = time_report_each_day[line][0 + count * 8: 3 + count * 8]
trv_time = time_report_each_day[line][4 + count * 8: 7 + count * 8]
# test for try..except..else
try:
start_time = int(stop_time) - int(trv_time)
except IOError:
rcms_logoff()
print('stop_time or trv_time is empty.')
else:
if start_time < last_time:
time_report_each_day[line] = '\033[0;31mOverlap\033[0m'
break
else:
last_time = int(stop_time)
start_time = str('{0:>3d}'.format(start_time))
time_report_each_day.update({line: '{0}'.format(
time_report_each_day[line].replace(stop_time, start_time, 1))})
time_report_each_day.update({line: '{0}'.format(
time_report_each_day[line].replace(trv_time, stop_time, 1))})
time_report_each_day_str = '| Date |sta|stp|\n'
for line in time_report_each_day:
time_report_each_day_str += '{0}: {1}\n'.format(line, time_report_each_day[line])
"""
time_report_items = time_report_read_day(start_date, ssr_sn).copy()
# time_report_items.append(time_report_read_day(start_date, ssr_sn))
available_days = 0
while not r.get_string(4, 7, 8) == end_date:
week = r.get_string(4, 16, 9).rstrip()
if not (week == 'Sunday' or week == 'Saturday'):
print('week: ', week)
available_days += 1
r.send_pf(1)
# for debug
# r.print_screen()
i = 0
while not r.get_string(7 + i, 6, 1) == '':
time_report_items.append(r.get_string(7 + i, 9, 18))
i += 1
"""
for lines in time_report_items:
"""
lines[0:3] stop
lines[4:7] tot
lines[9:11] trv
lines[13:15] SC
lines[16:18] AC
lines[33:39] + lines[41:46] Type+SN
"""
# Based on SC, compute tot time.
sc = lines[13:15]
tot = int(lines[4:7]) / 10
if sc != '40' and sc != '44':
service_code_time['{0}'.format(sc)] += tot
else:
service_code_time['{0}{1}'.format(sc, lines[16:18])] += tot
# compute UT code time
ut_sum = 0
core_ut_time_list = {}
for index in core_ut_code:
ut_sum += service_code_time['{0}'.format(index)]
if service_code_time['{0}'.format(index)] != 0:
core_ut_time_list.update({""'{0}'"".format(index): service_code_time['{0}'.format(index)]})
# compute NON-UT code time
non_ut_sum = 0
non_ut_time_list = {}
for index in non_ut_code:
non_ut_sum += service_code_time['{0}'.format(index)]
if service_code_time['{0}'.format(index)] != 0:
non_ut_time_list.update({""'{0}'"".format(index): service_code_time['{0}'.format(index)]})
available_hrs = available_days * 8
return_strings = 'Time Report Detail: \n{6}' \
'Core UT Code Time List: \t{0}\n' \
'Core UT Time Summary: \t{1}\n' \
'Other & NON-UT Code Time List: \t{2}\n' \
'Other & NON-UT Time Summary: \t{3}\n' \
'Available Hours: \t{4}\n\n' \
'UT = {5}%'.format(core_ut_time_list,
ut_sum,
non_ut_time_list,
non_ut_sum,
available_hrs,
ut_sum / available_hrs * 100,
time_report_each_day_str)
return return_strings
def is_code_utilization(svc_code):
if svc_code in core_ut_code:
print("'{0}' is Core UT Code.".format(svc_code))
return True
else:
print("'{0}' is Non-UT Code.".format(svc_code))
return False
def rcms_logoff():
r.disconnect()
r.quit()
# rep_date = 200219, year=20, month=02, day=19
def get_rep_timemap(rep_date, ssr_sn=read_config.rcms_config['serial']):
try:
int(rep_date)
except ValueError:
print('Wrong Time Report Date Format!')
rep_year = rep_date[0:2]
rep_month = rep_date[2:4]
rep_day = rep_date[4:6]
if not rcms_login():
rcms_login()
rep_date = []
ut_code_time = []
non_ut_code_time = []
ut_code_timemap = set([])
non_ut_code_timemap = set([])
r.send_string_directly('red,0' + ssr_sn + ',' + str(rep_day) + str(rep_month) + str(rep_year))
r.send_enter()
print('Day: ', r.get_string(4, 7, 8))
row = 7
i = 0
while r.get_string(row + i, 6, 1):
if r.get_string(row + i, 22, 2) in core_ut_code \
or r.get_string(row + i, 22, 2) + r.get_string(row + i, 25, 2) in core_ut_code:
ut_code_time.append([r.get_string(row + i, 9, 3), r.get_string(row + i, 13, 3)])
else:
non_ut_code_time.append([r.get_string(row + i, 9, 3), r.get_string(row + i, 13, 3)])
i += 1
print('ut_code_time: ', ut_code_time)
print('non_ut_code_time: ', non_ut_code_time)
for timepoint in ut_code_time:
print(timepoint)
# ut_code_timemap.add(transform_rcms_date.stop_tans_to_set(timepoint[0], timepoint[1]))
ut_code_timemap = ut_code_timemap | transform_rcms_date.stop_tans_to_set(timepoint[0], timepoint[1])
temp = list(ut_code_timemap)
temp.sort()
print(temp)
return ut_code_timemap
def verify_ot_oneday(rep_date, ssr_sn=read_config.rcms_config['serial']):
try:
int(rep_date)
except ValueError:
print('Wrong Time Report Date Format!')
rep_year = rep_date[0:2]
rep_month = rep_date[2:4]
rep_day = rep_date[4:6]
ut_code_timemap = get_rep_timemap(rep_date, ssr_sn)
ot_timemap = read_ot_excel.get_one_day_ot_time(rep_date)
if ut_code_timemap & ot_timemap == ot_timemap:
print('ot if fine')
return True
else:
print('ot need input')
return False
read_config.py
import os.path
from datetime import date
# import os
config_files = {'rcms.conf', 'cut_off_date.conf', 'over_time.conf'}
if __name__ == '__main__':
os.chdir('../config')
else:
os.chdir('./config')
config_files_dir = os.getcwd() + '/'
rcms_avail_args = {'host', 'serial', 'full_name', 'password'}
# Args in RCMS config file
# Just like {'host': 'hostname'}
rcms_config = {}
# RCMS Cut-Off day
# It's just someday be split with '-'
# like ['2020', '01', '13']
cutoff_config = []
ot_file_loc = ''
for file in config_files:
if not os.path.isfile(config_files_dir + file):
raise FileNotFoundError('Config File "{0}" not Found!'.format(file))
with open(file, 'r') as f:
if f.read() == '':
raise ValueError('Config "{0}" File Empty!'.format(file))
'''
if open(file, 'r').read() == '':
raise Warning('Config "{0}" File Empty!'.format(file))
open(file, 'r').close()
'''
# Read Config Files
# Put the Strings to Different List
# Named 'rcms_config' and 'cutoff_config'
try:
with open(file, 'r') as f:
for line in f.readlines():
if (not line.lstrip().startswith('#')) and (not line.strip() == ''):
if file == 'rcms.conf':
if line.split('=')[0].strip() in rcms_avail_args:
rcms_config[line.split('=')[0].strip()] = line.split('=')[1].strip()
elif file == 'cut_off_date.conf':
tmpDate = line.strip('\n').split('-')
if len(tmpDate) == 3:
try:
co_year = int(tmpDate[0])
co_month = int(tmpDate[1])
co_day = int(tmpDate[2])
except ValueError:
print("Found Strings in 'cut_off_date.conf'!")
else:
if co_year > 2099 or co_year < 1999 or co_month < 1 \
or co_month > 12 or co_day < 1 or co_day > 31:
raise Warning("Date in 'cut_off_date.conf' Out of Range!")
else:
cutoff_config.append(line.strip('\n').split('-'))
else:
raise Warning("Wrong Format of 'cut_off_date.conf'!")
elif file == 'over_time.conf':
ot_file_loc = line.rstrip('\n')
except FileExistsError:
print('Config File Read Error!')
if rcms_config == {} or cutoff_config == []:
raise ValueError('Nothing Found in Config Files!')
def get_cutoff():
cur_date = str(date.today())
# cur_date = '2020-02-12'
cur_year = cur_date.split('-')[0]
cur_month = cur_date.split('-')[1]
cur_day = cur_date.split('-')[2]
cur_cutoff_date = []
for d in range(len(cutoff_config)):
if cutoff_config[d][0] == cur_year and cutoff_config[d][1] == cur_month:
if int(cur_day) > int(cutoff_config[d][2]):
cur_cutoff_date.append(cutoff_config[d][0] + cutoff_config[d][1]
+ str(int(cutoff_config[d][2]) + 1))
cur_cutoff_date.append(cutoff_config[d+1][0] + cutoff_config[d+1][1] + cutoff_config[d+1][2])
return cur_cutoff_date
else:
cur_cutoff_date.append(cutoff_config[d-1][0] + cutoff_config[d-1][1]
+ str(int(cutoff_config[d-1][2]) + 1))
cur_cutoff_date.append(cutoff_config[d][0] + cutoff_config[d][1] + cutoff_config[d][2])
return cur_cutoff_date
if __name__ == '__main__':
print('rcms config: ', rcms_config)
print('cut off day config: ', cutoff_config)
print(get_cutoff())
print('ot file location: ', ot_file_loc)
read_ot_excel.py
from functions import read_config
import os
from openpyxl import load_workbook
from datetime import date
import datetime
from functions import transform_rcms_date
import time
ot_files_dir = ''
ot_files = []
ot_items = []
try:
ot_files_dir = read_config.ot_file_loc
except RuntimeError:
print('Read OT Request Form Location Failed!')
for file in os.listdir(ot_files_dir):
if file.endswith('xlsx') or file.endswith('xls'):
if not file.startswith('~') and not file.startswith('.'):
ot_files.append(file)
try:
os.chdir(ot_files_dir)
except IsADirectoryError:
print("'{0}' May not Exist.".format(ot_files_dir))
for file in ot_files:
workbook = load_workbook(file, read_only=True, data_only=True)
sheet = workbook.get_sheet_by_name('Sheet1')
max_row = sheet.max_row
row = 3
while True:
row += 1
if row > max_row:
break
if not str(sheet.cell(row, 6).value).startswith('20'):
continue
'''
if not sheet.cell(row, 2).value == '张志杰':
continue
'''
ot_date = sheet.cell(row, 6).value
ot_start_time = sheet.cell(row, 7).value
ot_end_time = sheet.cell(row, 8).value
ot_time = sheet.cell(row, 9).value
ot_customer = sheet.cell(row, 4).value
ot_content = sheet.cell(row, 5).value
ot_items.append([ot_date, ot_start_time, ot_end_time, ot_time, ot_customer, ot_content])
# Change time format
# 18:00 to 180, 18:30 to 185
for line in ot_items:
try:
line[0] = line[0].strftime('%y%m%d')
line[1] = line[1].strftime('%H') + str(int(line[2].strftime('%M')) // 6)
line[2] = line[2].strftime('%H') + str(int(line[2].strftime('%M')) // 6)
if line[2] == '000':
line[2] = '240'
if type(line[3]) == datetime.time:
line[3] = '{:0>3d}'.format(int(line[3].strftime('%H')) * 10)
else:
raise ValueError('Wrong Format of Date in OT Excel!')
except ValueError:
print('Wrong Format of Date in OT Excel!')
ot_items.sort()
def get_ot_cycle(ot_month=''):
try:
int(ot_month)
except ValueError:
print('Wrong Month Format!')
cur_year = int(date.today().strftime('%y'))
cur_month = int(date.today().strftime('%m'))
if not ot_month:
ot_month = cur_month
try:
int(ot_month)
except ValueError:
print("'{0}' is Not Number!".format(ot_month))
if int(ot_month) > 12 or int(ot_month) < 1:
print('Wrong Month!')
return None
'''
If current month is 2 ad ot_month input '6'
Then program think this month is in the last year
'''
ot_month = int(ot_month)
if 2 <= ot_month <= cur_month:
last_month = ot_month - 1
last_year = cur_year
elif ot_month == 1:
last_year = cur_year - 1
last_month = ot_month + 11
else:
last_year = cur_year - 1
last_month = ot_month - 1
cur_year -= 1
sta_date = '{:0>2d}{:0>2d}{:0>2d}'.format(last_year, last_month, 21)
end_date = '{:0>2d}{:0>2d}{:0>2d}'.format(cur_year, ot_month, 20)
ot_cycle = [str(sta_date), str(end_date)]
return ot_cycle
def get_ot_in_cycle(start_stop_date=[]):
sta_date = start_stop_date[0]
stp_date = start_stop_date[1]
ot_items_in_cycle = []
for item in ot_items:
if sta_date <= item[0] <= stp_date:
if item not in ot_items_in_cycle:
ot_items_in_cycle.append(item)
return ot_items_in_cycle
def get_ot_date_list(start_stop_date=[]):
if not start_stop_date:
return None
ot_lists = get_ot_in_cycle(start_stop_date)
ot_set = set([])
for ot_list in ot_lists:
ot_set.add(ot_list[0])
ot_date_list = list(ot_set)
ot_date_list.sort()
return ot_date_list
def get_one_day_ot_time(overtime_date):
try:
int(overtime_date)
except ValueError:
print('Wrong Time Report Date Format!')
ot_month = overtime_date[2:4]
ot_day = overtime_date[4:6]
cur_month = int(date.today().strftime('%m'))
ot_month = int(ot_month)
if int(ot_day) > 20:
ot_month_tmp = ot_month + 1
else:
ot_month_tmp = ot_month
ot_cycle = get_ot_cycle(ot_month_tmp)
ot_lists = get_ot_in_cycle(ot_cycle)
ot_date_list = get_ot_date_list(ot_cycle)
tmp_date = '20' + str('{:0>2d}'.format(ot_month)) + str(ot_day)
ot_time_set = set([])
for ot_list in ot_lists:
if ot_list[0] == tmp_date:
ot_time_set = ot_time_set | transform_rcms_date.sta_trans_to_set(ot_list[1], ot_list[2])
# ot_time_set.add(transform_rcms_date.sta_trans_to_set(ot_list[1], ot_list[2]))
'''
time_point = 0
while int(ot_list[1]) + time_point <= int(ot_list[2]):
ot_time_set.add('{:0>3d}'.format(int(ot_list[1]) + time_point))
time_point += 5
'''
ot_time_list = list(ot_time_set)
ot_time_list.sort()
print(ot_time_list)
return ot_time_set
transform_rcms_date.py
def sta_trans_to_set(sta_date, end_date):
try:
int(sta_date)
int(end_date)
except ValueError:
print('Wrong Date Format!')
sta_date = int(sta_date)
end_date = int(end_date)
dates = set([])
while sta_date <= end_date:
dates.add(sta_date)
sta_date += 5
return dates
def stop_tans_to_set(stop_date, tot):
try:
int(stop_date)
int(tot)
except ValueError:
print('Wrong Date Format!')
stop_date = int(stop_date)
tot = int(tot)
sta_date = stop_date - tot
dates = set([])
while sta_date <= stop_date:
dates.add(sta_date)
sta_date += 5
return dates
def is_rep_date_overlap(rep_time_set, ot_time_set):
if type(rep_time_set) != set:
raise ValueError('Input Should be Set!')
if type(ot_time_set) != set:
raise ValueError('Input Should be Set!')
if rep_time_set & ot_time_set == ot_time_set:
return True
else:
return False
if __name__ == '__main__':
rep_time1 = [['120', '040'], ['180', '040'], ['220', '040']]
ot_time1 = [['180', '220']]
rep_time2 = [['120', '040'], ['170', '040'], ['230', '060']]
ot_time2 = [['180', '220']]
rep_time3 = [['120', '040'], ['180', '040'], ['220', '030']]
ot_time3 = [['180', '220']]
rep_set = set([])
ot_set = set([])
for line in rep_time3:
rep_set.update(stop_tans_to_set(line[0], line[1]))
ot_set = sta_trans_to_set(ot_time3[0][0], ot_time3[0][1])
print(rep_set)
print(ot_set)
print(is_rep_date_overlap(rep_set, ot_set))
rcms_menu.py
import os
from functions import read_config
from functions import rcms_funcs
main_menu = ['\033[0;37m------------IBM RCMS MENU------------\033[0m',
'\033[0;37m0. Functions\033[0m',
'\t\033[0;31mRr. Read Current Configuration\033[0m',
'\t\033[0;31mIi. Interactive Mode\033[0m',
'\t\033[0;31mQq. Quit\033[0m',
'\t\033[0;31mHh. Help\033[0m',
'\033[0;37m1. RCMS\033[0m',
'\t\033[0;36m1.1\033[0m Call Read',
'\t\033[0;36m1.2\033[0m Call Search & Read',
'\t\033[0;36m1.3\033[0m Create Call_Close Form',
'\t\033[0;36m1.4\033[0m Product Inventory',
'\033[0;37m2. Time Report\033[0m',
'\t\033[0;36m2.1\033[0m Cut-off Day Time Report Summary',
'\t\033[0;36m2.2\033[0m Time Report Summary for Others',
'\t\033[0;36m2.3\033[0m One-key Fill SC80/95',
'\t\033[0;36m2.4\033[0m Over Time Verify (from 21 to 20)',
'\t\033[0;36m2.5\033[0m UT Code or Not',
'\033[0;37m3. PMH\033[0m',
'\t\033[0;36m3.1\033[0m Create PMH',
'\n']
right_number = ['r', 'i', 'q', 'h',
'R', 'I', 'Q', 'H',
'1.1', '1.2', '1.3', '1.4',
'2.1', '2.2', '2.3', '2.4', '2.5', '2.6'
'3.1']
svc_code_list = ['10', '11', '12', '14', '15', '17', '20', '24', '30',
'31', '32', '4010', '4011', '4012', '41', '4473', '48',
'73', '74', '75', '05', '34', '4020', '42', '47', '60',
'80', '86', '90', '91', '93', '95']
last_call_number = ''
last_call_number_list = []
while True:
for line in main_menu:
print(line)
i = input('CMD==> ')
while i not in right_number:
print('[ERROR]: Input Error!')
i = input('CMD==> ')
if i.upper() == 'R':
print('\033[0;37mRCMS Config:\033[0m')
try:
for line in read_config.rcms_config:
print('\t\033[0;36m{0}:\033[0m {1}'.format(line, read_config.rcms_config[line]))
except RuntimeError:
print('Read RCMS Config Failed!')
print('\033[0;37mCurrent Cut-Off Day:\033[0m')
try:
print('\t{0} -- {1}'.format(read_config.get_cutoff()[0], read_config.get_cutoff()[1]))
except RuntimeError:
print('Read Cut-Off Config Failed!')
input('\nPress Enter to be Continue!')
elif i.upper() == 'I':
print('Interactive Mode not Support Yet!')
input('\nPress Enter to be Continue!')
elif i.upper() == 'H':
print("Mail to '[email protected]' for Any Help.")
input('\nPress Enter to be Continue!')
elif i.upper() == 'Q':
try:
rcms_funcs.rcms_logoff()
except RuntimeError:
print('Maybe RCMS is Already Logoff.')
print('Quit!')
exit(0)
# Call Read
elif i == '1.1':
call_number = input("Call Number (Last '{0}'): ".format(last_call_number))
if call_number == '':
call_number = last_call_number
print(rcms_funcs.call_read(call_number))
last_call_number = call_number
input('\nPress Enter to be Continue!')
# Call Search & Read
elif i == '1.2':
call_number_list = []
if last_call_number_list:
print('Last Call Number List: ')
for line in last_call_number_list:
print(line)
input_str = input('Index Number to Read (1-99), or Enter to go on search: ')
if input_str:
print(rcms_funcs.call_read(last_call_number_list[int(input_str) - 1][3:10]))
notes_id = rcms_funcs.get_notes_id()
input_str = input("Notes ID go on Search (Default '{0}'): ".format(notes_id))
if input_str:
notes_id = input_str
call_number_list = rcms_funcs.call_search(notes_id)
for line in call_number_list:
print(line)
input_str = input('Index Number to Read (1-99), or Enter to be Continue: ')
if input_str:
print(rcms_funcs.call_read(call_number_list[int(input_str) - 1][3:10]))
last_call_number_list = call_number_list.copy()
input('\nPress Enter to be Continue.')
# Create Call_Close Form
elif i == '1.3':
pass
# Product Inventory
elif i == '1.4':
print(rcms_funcs.product_inventory(input('Machine Model & SN (913306001DH): ')))
input('\nPress Enter to be Continue.')
# Cut-off Day Time Report Summary
elif i == '2.1':
start_date = read_config.get_cutoff()[0].replace('20', '', 1)
end_date = read_config.get_cutoff()[1].replace('20', '', 1)
ssr_sn = read_config.rcms_config['serial']
print(rcms_funcs.time_report_summary(input("Start Date (Default '{0}'): ".format(start_date)),
input("End Date (Default '{0}'): ".format(end_date)),
input("SSR Serial Number (Default '{0}'): ".format(ssr_sn))))
input('\nPress Enter to be Continue.')
# One-key Fill SC80/95
elif i == '2.2':
pass
# List Time Report Free Day
elif i == '2.3':
pass
# Over Time Verify
elif i == '2.4':
pass
elif i == '2.5':
svc_code = input('Code: ')
while svc_code not in svc_code_list:
print('Wrong Code, Try Again!')
svc_code = input('Code: ')
rcms_funcs.is_code_utilization(svc_code)
input('\nPress Enter to be Continue!')
# Create PMH
elif i == '3.1':
pass
elif i == '4':
pass
else:
print('[ERROR')
continue