-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextract-tweets.py
251 lines (203 loc) · 10.4 KB
/
extract-tweets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import datetime
import glob
import json
from json import JSONDecodeError
import os
import re
import traceback
import twitter
import utils
# # Uncomment to test import into Elastic Search database.
# from elastic_search_utils import ElasticSearchTwitterImporter
# Number of iterations that have produced no new tweets in day before sending notification to admin.
EMPTY_ITERATION_NOTIF_THRESHOLD = 3
now = datetime.datetime.now()
config_filename = 'config.json'
with open(config_filename, 'r') as config_file:
config = json.load(config_file)
db_dir = config['db_dir']
twitter_db_dir = f"{db_dir}/twitter"
twitter_html_dir = config['twitter']['html_dir']
run_dir = config['run_dir']
rejected_expressions = []
if "black_list" in config["twitter"]:
with open(config['twitter']['black_list'], 'r', encoding='utf-8') as rejected_expressions_file:
for line in rejected_expressions_file:
rejected_expressions.append(line.strip())
processed_databases = set()
empty_extractions = []
# # Uncomment to test import into Elastic Search database.
# es_importer = {
# 'ja': ElasticSearchTwitterImporter(config['elastic_search']['host'], config['elastic_search']['port'], twitter_html_dir, 'ja'),
# 'en': ElasticSearchTwitterImporter(config['elastic_search']['host'], config['elastic_search']['port'], twitter_html_dir, 'en')
# }
def extract_date_from(db_filename):
bn = os.path.basename(db_filename)
return bn[bn.index("_") + 1:bn.index(".txt")]
def get_tweet_text(status):
text = status.full_text if status.tweet_mode == 'extended' else status.text
return text
def search_black_list(tweet_text):
for expr in rejected_expressions:
if expr in tweet_text:
return expr
return None
def write_tweet_data(tweet_id, tweet_text):
print(f"write_tweet_data tweet_id={tweet_id}")
try:
tweet = tweets[tweet_id]
timestamp = now.strftime('%Y-%m-%d-%H-%M')
tweet_timestamp = datetime.datetime.fromtimestamp(tweet['status'].created_at_in_seconds)
country_code_dir = utils.get_country_code_dir(tweet['country_code'])
timestamp_path = tweet_timestamp.strftime('%Y/%m/%d/%H-%M')
html_orig_tweet_dir = os.path.join(twitter_html_dir, country_code_dir, "orig", timestamp_path)
os.makedirs(html_orig_tweet_dir, exist_ok=True)
temp_path = html_orig_tweet_dir
while temp_path != twitter_html_dir:
# An error might happen when the folder owner is different from the user running the script.
# The permissions of such folders should already be ok so it's not needed to update them.
try:
os.chmod(temp_path, 0o775)
except OSError as e:
break
temp_path = os.path.dirname(temp_path)
print(f"tweet_path={html_orig_tweet_dir}")
# EVen if the json file already exists, it's updated.
json_filename = os.path.join(html_orig_tweet_dir, f"{tweet_id}.json")
with open(json_filename, 'w', encoding='utf-8') as json_file:
json.dump(tweet['json'], json_file)
# EVen if the metadata file already exists, it's updated.
metadata_filename = os.path.join(html_orig_tweet_dir, f"{tweet_id}.metadata")
with open(metadata_filename, 'w', encoding='utf-8') as metadata_file:
metadata = {
"id": tweet_id,
"country": tweet["country"],
"country_code": tweet["country_code"],
"count": tweet["count"]
}
json.dump(metadata, metadata_file, ensure_ascii=False)
# Write html file only if it doesn't exist yet.
html_filename = os.path.join(html_orig_tweet_dir, f"{tweet_id}.html")
if not os.path.exists(html_filename):
with open(html_filename, 'w', encoding='utf-8') as html_file:
html_str = (
"<!DOCTYPE html>\n"
f"<html lang=\"{tweet['status'].lang}\">\n"
"<head><meta charset=\"utf-8\"/></head>\n"
"<body>\n"
f"<p>{tweet_text}</p>\n"
"</body>\n"
"</html>\n"
)
html_file.write(html_str)
new_html_filename = os.path.join(run_dir, 'new-html-files', f'new-twitter-html-files-{timestamp}.txt')
with open(new_html_filename, 'a', encoding='utf-8') as new_html_file:
new_html_file.write(html_filename)
new_html_file.write("\n")
# # Uncomment to test import into Elastic Search database.
# es_index = config['elastic_search']['twitter_index_basename'] + '-' + tweet['status'].lang
# es_importer[tweet['status'].lang].update_record(html_filename[:-4]+"txt", index=es_index, is_data_stream=True)
if country_code_dir in tweet_count_per_country:
tweet_count_per_country[country_code_dir] += 1
else:
tweet_count_per_country[country_code_dir] = 1
except:
print(f"An error has occurred in write_tweet_data(tweet_id={tweet_id}): {traceback.format_exc()}")
def write_stats_file(filename, tweet_count_per_country):
if len(tweet_count_per_country) > 0:
with open(filename, 'w', encoding='utf-8') as stats_file:
json.dump(tweet_count_per_country, stats_file)
def process_tweet(tweet_id, tweet_count, tweet_lang, tweet_country, tweet_json_str):
tweet_json = json.loads(tweet_json_str)
tweet_status = twitter.models.Status.NewFromJsonDict(tweet_json)
tweet_text = get_tweet_text(tweet_status)
# Skip tweets that contain rejected words.
rejected_expr = search_black_list(tweet_text)
if rejected_expr is not None:
print(f"Tweet {tweet_id} has been skipped because it contains rejected expression: {rejected_expr}.")
return
tweet_country_code = None
try:
tweet_country_code = utils.convert_country_to_iso_3166_alpha_2(tweet_country)
except LookupError as ex:
undefined_countries.add(tweet_country)
print("Tweet {tweet_id} has been ignored because it refers to an undefined country: {tweet_country}.")
return
tweets[tweet_id] = {
"count": tweet_count,
"lang": tweet_lang,
"country": tweet_country,
"country_code": tweet_country_code,
'json': tweet_json,
"status": tweet_status
}
write_tweet_data(tweet_id, tweet_text)
#
# The heuristics should be appropriate most of the times.
#
def is_twitter_db_admin_notif_required(empty_extractions, now):
"Returns True if EMPTY_ITERATION_NOTIF_THRESHOLD iterations have been recorded as empty today with no prior empty iterations on the previous day (to prevent unneeded repeated notifs)."
today = now.strftime('%Y-%m-%d')
yesterday = (now - datetime.timedelta(1)).strftime('%Y-%m-%d')
counts_per_date = {}
for timestamp in empty_extractions:
date_str = timestamp[:10]
counts_per_date[date_str] = counts_per_date.get(date_str, 0) + 1
return counts_per_date.get(today, 0) == EMPTY_ITERATION_NOTIF_THRESHOLD and counts_per_date.get(yesterday, 0) == 0
run_filename = os.path.join(run_dir, 'twitter.json')
if os.path.exists(run_filename):
with open(run_filename, 'r') as run_file:
run_data = json.load(run_file)
print(f"run_data={run_data}")
processed_databases = set(run_data)
empty_extractions_filename = os.path.join(run_dir, 'twitter_empty_extractions.json')
if os.path.exists(empty_extractions_filename):
with open(empty_extractions_filename, 'r') as empty_extractions_file:
empty_extractions = json.load(empty_extractions_file)
total_tweets_for_all_db = 0
for db_filename in reversed(sorted(glob.glob(f'{twitter_db_dir}/tweets_*.txt'))):
print(f"Processing {db_filename}")
tweets = {}
tweet_count_per_country = {}
undefined_countries = set()
with open(db_filename, 'r', encoding='latin-1') as twitter_data_file:
for line in twitter_data_file:
match = re.search("(\d+) (\d+) (\w+) ([-)'(\w]+) (\{.*\})", line)
if match:
tweet_id = int(match.group(1))
tweet_count = int(match.group(2))
tweet_lang = match.group(3)
tweet_country = match.group(4)
tweet_json_str = match.group(5)
try:
process_tweet(tweet_id, tweet_count, tweet_lang, tweet_country, tweet_json_str)
except JSONDecodeError as json_err:
print(f"Tweet {tweet_id} had invalid json and has been ignored. json_err={json_err}")
else:
print(f"Invalid line: {line}")
processed_databases.add(extract_date_from(db_filename))
if len(undefined_countries) > 0:
utils.send_mail(config['mail']['from'], config['mail']['to'], config['mail']['cc'], None, "Undefined countries found",
(f"Some tweets are referring to these countries: {sorted(undefined_countries)} but no corresponding ISO-3166-2 Alpha-2 code are defined.\n\n"
"Adjust the country_codes.txt file accordingly to prevent this error from occurring again."))
print(f"tweet_count_per_country={tweet_count_per_country}")
total_tweets = sum(tweet_count_per_country.values())
print(f"Total tweets: {total_tweets}")
total_tweets_for_all_db += total_tweets
stats_file = os.path.join(run_dir, 'twitter-stats', f"twitter-stats-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M')}.json")
write_stats_file(stats_file, tweet_count_per_country)
# Remove database to save disk space.
if os.path.exists(db_filename):
os.remove(db_filename)
# Remember that the file has been processed.
with open(run_filename, 'w') as run_file:
json.dump(sorted(list(processed_databases)), run_file)
if total_tweets_for_all_db == 0:
empty_extractions.append(now.strftime('%Y-%m-%d-%H-%M'))
with open(empty_extractions_filename, 'w', encoding='utf-8') as empty_extractions_file:
json.dump(empty_extractions, empty_extractions_file)
if is_twitter_db_admin_notif_required(empty_extractions, now):
print("Notification sent.")
utils.send_mail(config['mail']['from'], config['mail']['to'], config['mail']['cc'], None, "No new data from Twitter databases",
(f"There was {EMPTY_ITERATION_NOTIF_THRESHOLD} extract-tweets iterations that have not produced any new tweets. Most likely that there is an issue with the updates of the Twitter databases.\n\n"
"Contact the person in charge to check this issue."))