-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathutils.py
258 lines (206 loc) · 8.57 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import os
import sys
import shlex
import subprocess
import os.path
import time
import shutil
from threading import Timer
import functools
import traceback
import pickle
def start_logger(workdir):
sys.stdout = Logger(workdir, time.strftime("%Y%m%d-%H%M%S"))
logfile = sys.stdout.getLogFile()
return logfile
class Logger(object):
def __init__(self, out_directory, time_str):
self.logfile = os.path.join(out_directory, str('run.' + time_str + '.log'))
self.terminal = sys.stdout
self.log = open(self.logfile, "w")
def write(self, message):
self.terminal.write(message.encode('utf-8'))
self.log.write(message.encode('utf-8'))
self.log.flush()
def flush(self):
pass
def getLogFile(self):
return self.logfile
def general_information(logfile, version):
# Check if output directory exists
print '\n' + '==========> getSeqENA <=========='
print '\n' + 'Program start: ' + time.ctime()
# Tells where the logfile will be stored
print '\n' + 'LOGFILE:'
print logfile
# Print command
print '\n' + 'COMMAND:'
script_path = os.path.abspath(sys.argv[0])
print sys.executable + ' ' + script_path + ' ' + ' '.join(sys.argv[1:])
# Print directory where programme was lunch
print '\n' + 'PRESENT DIRECTORY :'
present_directory = os.path.abspath(os.getcwd())
print present_directory
# Print program version
print '\n' + 'VERSION:'
scriptVersionGit(version, present_directory, script_path)
# Print PATH variable
print '\n' + 'PATH variable:'
print os.environ['PATH']
def scriptVersionGit(version, directory, script_path):
print 'Version ' + version
try:
os.chdir(os.path.dirname(script_path))
command = ['git', 'log', '-1', '--date=local', '--pretty=format:"%h (%H) - Commit by %cn, %cd) : %s"']
run_successfully, stdout, stderr = runCommandPopenCommunicate(command, False, 15, False)
print stdout
command = ['git', 'remote', 'show', 'origin']
run_successfully, stdout, stderr = runCommandPopenCommunicate(command, False, 15, False)
print stdout
os.chdir(directory)
except:
print 'HARMLESS WARNING: git command possibly not found. The GitHub repository information will not be obtained.'
def runTime(start_time):
end_time = time.time()
time_taken = end_time - start_time
hours, rest = divmod(time_taken, 3600)
minutes, seconds = divmod(rest, 60)
print 'Runtime :' + str(hours) + 'h:' + str(minutes) + 'm:' + str(round(seconds, 2)) + 's'
return time_taken
# USADO
def check_create_directory(directory):
if not os.path.isdir(directory):
os.makedirs(directory)
def kill_subprocess_Popen(subprocess_Popen, command):
print 'Command run out of time: ' + str(command)
subprocess_Popen.kill()
def runCommandPopenCommunicate(command, shell_True, timeout_sec_None, print_comand_True):
run_successfully = False
if not isinstance(command, basestring):
command = ' '.join(command)
command = shlex.split(command)
if print_comand_True:
print 'Running: ' + ' '.join(command)
if shell_True:
command = ' '.join(command)
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
else:
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
not_killed_by_timer = True
if timeout_sec_None is None:
stdout, stderr = proc.communicate()
else:
timer = Timer(timeout_sec_None, kill_subprocess_Popen, args=(proc, command,))
timer.start()
stdout, stderr = proc.communicate()
timer.cancel()
not_killed_by_timer = timer.isAlive()
if proc.returncode == 0:
run_successfully = True
else:
if not print_comand_True and not_killed_by_timer:
print 'Running: ' + str(command)
if len(stdout) > 0:
print 'STDOUT'
print stdout.decode("utf-8")
if len(stderr) > 0:
print 'STDERR'
print stderr.decode("utf-8")
return run_successfully, stdout, stderr
# Remove directory
def removeDirectory(directory):
if os.path.isdir(directory):
shutil.rmtree(directory)
# USADO
def getListIDs(fileListIDs):
list_ids = []
with open(fileListIDs, 'rtU') as lines:
for line in lines:
line = line.splitlines()[0]
if len(line) > 0:
list_ids.append(line)
if len(list_ids) == 0:
sys.exit('No runIDs were found in ' + fileListIDs)
return list_ids
# Check programs versions
def checkPrograms(programs_version_dictionary):
print '\n' + 'Checking dependencies...'
programs = programs_version_dictionary
which_program = ['which', '']
listMissings = []
for program in programs:
which_program[1] = program
run_successfully, stdout, stderr = runCommandPopenCommunicate(which_program, False, None, False)
if not run_successfully:
listMissings.append(program + ' not found in PATH.')
else:
if programs[program][0] is None:
print program + ' (impossible to determine programme version) found at: ' + stdout.splitlines()[0]
else:
check_version = [stdout.splitlines()[0], programs[program][0]]
run_successfully, stdout, stderr = runCommandPopenCommunicate(check_version, False, None, False)
if stdout == '':
stdout = stderr
if program == 'bunzip2':
version_line = stdout.splitlines()[0].rsplit(',', 1)[0].split(' ')[-1]
elif program in ['wget', 'gawk']:
version_line = stdout.splitlines()[0].split(' ', 3)[2]
elif program in ['prefetch', 'fastq-dump']:
version_line = stdout.splitlines()[1].split(' ')[-1]
else:
version_line = stdout.splitlines()[0].split(' ')[-1]
replace_characters = ['"', 'v', 'V', '+', ',']
for i in replace_characters:
version_line = version_line.replace(i, '')
print program + ' (' + version_line + ') found'
if programs[program][1] == '>=':
program_found_version = version_line.split('.')
program_version_required = programs[program][2].split('.')
if len(program_version_required) == 3:
if len(program_found_version) == 2:
program_found_version.append(0)
else:
program_found_version[2] = program_found_version[2].split('_')[0]
for i in range(0, len(program_version_required)):
if int(program_found_version[i]) > int(program_version_required[i]):
break
elif int(program_found_version[i]) == int(program_version_required[i]):
continue
else:
listMissings.append('It is required ' + program + ' with version ' + programs[program][1] + ' ' + programs[program][2])
else:
if version_line != programs[program][2]:
listMissings.append('It is required ' + program + ' with version ' + programs[program][1] + ' ' + programs[program][2])
return listMissings
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
func(*args, **kwargs)
except:
print 'Exception in ' + func.__name__
traceback.print_exc()
return wrapped_func
def saveVariableToPickle(variableToStore, outdir, prefix):
pickleFile = os.path.join(outdir, str(prefix + '.pkl'))
with open(pickleFile, 'wb') as writer:
pickle.dump(variableToStore, writer)
def extractVariableFromPickle(pickleFile):
with open(pickleFile, 'rb') as reader:
variable = pickle.load(reader)
return variable
def rchop(string, ending):
if string.endswith(ending):
string = string[:-len(ending)]
return string
def timer(function, name):
@functools.wraps(function)
def wrapper(*args, **kwargs):
print('\n' + 'RUNNING {0}\n'.format(name))
start_time = time.time()
results = list(function(*args, **kwargs)) # guarantees return is a list to allow .insert()
time_taken = runTime(start_time)
print('END {0}'.format(name))
results.insert(0, time_taken)
return results
return wrapper