From e2f6ed7c89da368c2f8ffdc3bce8617da1aedf42 Mon Sep 17 00:00:00 2001 From: Claromes Date: Sat, 25 May 2024 04:55:45 -0300 Subject: [PATCH] update the base code and export data --- .gitignore | 6 +- waybacktweets/__init__.py | 0 waybacktweets/export_tweets.py | 37 +++++ waybacktweets/main.py | 24 ++++ waybacktweets/request_tweets.py | 37 +++++ waybacktweets/tweet_parse.py | 233 ++++++++++++++++++++++++++++++++ waybacktweets/utils.py | 70 ++++++++++ 7 files changed, 406 insertions(+), 1 deletion(-) create mode 100644 waybacktweets/__init__.py create mode 100644 waybacktweets/export_tweets.py create mode 100644 waybacktweets/main.py create mode 100644 waybacktweets/request_tweets.py create mode 100644 waybacktweets/tweet_parse.py create mode 100644 waybacktweets/utils.py diff --git a/.gitignore b/.gitignore index 0cafc1c..64b33db 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ -.venv/ \ No newline at end of file +.venv/ +*.csv +*.json +waybacktweets/__pycache__ +waybacktweets/notes.md diff --git a/waybacktweets/__init__.py b/waybacktweets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/waybacktweets/export_tweets.py b/waybacktweets/export_tweets.py new file mode 100644 index 0000000..babb19b --- /dev/null +++ b/waybacktweets/export_tweets.py @@ -0,0 +1,37 @@ +import pandas as pd +import re +import datetime + + +def datetime_now(): + now = datetime.datetime.now() + + formatted_now = now.strftime("%Y%m%d%H%M%S") + + formatted_now = re.sub(r'\W+', '', formatted_now) + + return formatted_now + + +def response_tweets_csv(data, username): + data_transposed = list(zip(*data)) + + formatted_datetime = datetime_now() + filename = f'{username}_tweets_{formatted_datetime}' + + df = pd.DataFrame(data_transposed, + columns=[ + 'archived_urlkey', 'archived_timestamp', 'tweet', + 'archived_tweet', 'parsed_tweet', + 'parsed_archived_tweet', 'archived_mimetype', + 'archived_statuscode', 'archived_digest', + 'archived_length' + ]) + + csv_file_path = f'{filename}.csv' + df.to_csv(csv_file_path, index=False) + + json_file_path = f'{filename}.json' + df.to_json(json_file_path, orient='records', lines=False) + + print(f'Done. Check the files {filename}.csv and {filename}.json') diff --git a/waybacktweets/main.py b/waybacktweets/main.py new file mode 100644 index 0000000..c3dacfc --- /dev/null +++ b/waybacktweets/main.py @@ -0,0 +1,24 @@ +from request_tweets import * +from tweet_parse import * +from export_tweets import * + +username = 'dfrlab' +unique = False +datetime_from = '' +datetime_to = '' + + +def main(): + try: + archived_tweets = get_archived_tweets(username, unique, datetime_from, + datetime_to) + if archived_tweets: + data = parse_archived_tweets(archived_tweets, username) + + response_tweets_csv(data, username) + except TypeError as e: + print(e) + + +if __name__ == '__main__': + main() diff --git a/waybacktweets/request_tweets.py b/waybacktweets/request_tweets.py new file mode 100644 index 0000000..3986ff3 --- /dev/null +++ b/waybacktweets/request_tweets.py @@ -0,0 +1,37 @@ +import requests + + +def get_archived_tweets(username, + unique=False, + timestamp_from='', + timestamp_to=''): + + unique = f'&collapse=urlkey' if unique else '' + + if timestamp_from: + timestamp_from = f'&from={timestamp_from}' + + if timestamp_to: + timestamp_to = f'&to={timestamp_to}' + + url = f'https://web.archive.org/cdx/search/cdx?url=https://twitter.com/{username}/status/*&output=json{unique}{timestamp_from}{timestamp_to}' + print(f'Getting and parsing archived tweets from {url}') + + try: + response = requests.get(url) + response.raise_for_status() + + if not (400 <= response.status_code <= 511): + return response.json() + except requests.exceptions.Timeout as e: + print(f'{e}.\nConnection to web.archive.org timed out.') + except requests.exceptions.ConnectionError as e: + print( + f'{e}.\nFailed to establish a new connection with web.archive.org.' + ) + except requests.exceptions.HTTPError as e: + print( + f'{e}.\nTemporarily Offline: Internet Archive services are temporarily offline. Please check Internet Archive [Twitter feed](https://twitter.com/internetarchive/) for the latest information.' + ) + except UnboundLocalError as e: + print(e) diff --git a/waybacktweets/tweet_parse.py b/waybacktweets/tweet_parse.py new file mode 100644 index 0000000..b9ed287 --- /dev/null +++ b/waybacktweets/tweet_parse.py @@ -0,0 +1,233 @@ +from urllib.parse import unquote +from utils import * + + +def parse_archived_tweets(archived_tweets_response, username): + archived_urlkey = [] + archived_timestamp = [] + tweet = [] + archived_tweet = [] + parsed_tweet = [] + parsed_archived_tweet = [] + archived_mimetype = [] + archived_statuscode = [] + archived_digest = [] + archived_length = [] + + for response in archived_tweets_response[1:]: + tweet_remove_char = unquote(response[2]).replace('’', '') + cleaned_tweet = pattern_tweet(tweet_remove_char).strip('"') + + wayback_machine_url = f'https://web.archive.org/web/{response[1]}/{tweet_remove_char}' + + original_tweet = delete_tweet_pathnames( + clean_tweet(cleaned_tweet, username)) + + parsed_wayback_machine_url = f'https://web.archive.org/web/{response[1]}/{original_tweet}' + + double_status = check_double_status(wayback_machine_url, + original_tweet) + + if double_status: + original_tweet = delete_tweet_pathnames( + f'https://twitter.com/{original_tweet}') + + elif not '://' in original_tweet: + original_tweet = delete_tweet_pathnames( + f'https://{original_tweet}') + + encoded_tweet = semicolon_parse(response[2]) + encoded_archived_tweet = semicolon_parse(wayback_machine_url) + encoded_parsed_tweet = semicolon_parse(original_tweet) + encoded_parsed_archived_tweet = semicolon_parse( + parsed_wayback_machine_url) + + archived_urlkey.append(response[0]) + archived_timestamp.append(response[1]) + tweet.append(encoded_tweet) + archived_tweet.append(encoded_archived_tweet) + parsed_tweet.append(encoded_parsed_tweet) + parsed_archived_tweet.append(encoded_parsed_archived_tweet) + archived_mimetype.append(response[3]) + archived_statuscode.append(response[4]) + archived_digest.append(response[5]) + archived_length.append(response[6]) + + return archived_urlkey, archived_timestamp, tweet, archived_tweet, parsed_tweet, parsed_archived_tweet, archived_mimetype, archived_statuscode, archived_digest, archived_length + + +# def embed(tweet): +# try: +# url = f'https://publish.twitter.com/oembed?url={clean_tweet(tweet)}' +# response = requests.get(url) + +# regex = r'
]+)?>]*>(.*?)<\/p>.*?— (.*?)<\/a>' +# regex_author = r'^(.*?)\s*\(' + +# if response.status_code == 200 or response.status_code == 302: +# status_code = response.status_code +# html = response.json()['html'] +# author_name = response.json()['author_name'] + +# matches_html = re.findall(regex, html, re.DOTALL) + +# tweet_content = [] +# user_info = [] +# is_RT = [] + +# for match in matches_html: +# tweet_content_match = re.sub(r']*>|<\/a>', '', +# match[0].strip()) +# tweet_content_match = tweet_content_match.replace('
', '\n') + +# user_info_match = re.sub(r']*>|<\/a>', '', +# match[1].strip()) +# user_info_match = user_info_match.replace(')', '), ') + +# match_author = re.search(regex_author, user_info_match) +# author_tweet = match_author.group(1) + +# if tweet_content_match: +# tweet_content.append(tweet_content_match) +# if user_info_match: +# user_info.append(user_info_match) + +# is_RT_match = False +# if author_name != author_tweet: +# is_RT_match = True + +# is_RT.append(is_RT_match) + +# return status_code, tweet_content, user_info, is_RT +# else: +# return False +# except requests.exceptions.Timeout as e: +# print(f'{e}.\nConnection to web.archive.org timed out.') +# except requests.exceptions.ConnectionError as e: +# print( +# f'{e}.\nFailed to establish a new connection with web.archive.org.' +# ) +# except UnboundLocalError as e: +# print(e) + +# def display_tweet(): +# if mimetype[i] == 'application/json' or mimetype[ +# i] == 'text/html' or mimetype[i] == 'unk' or mimetype[ +# i] == 'warc/revisit': +# if is_RT[0] == True: +# st.info('*Retweet*') +# st.write(tweet_content[0]) +# st.write(f'**{user_info[0]}**') + +# st.divider() +# else: +# st.warning('MIME Type was not parsed.') + +# st.divider() + +# def display_not_tweet(): +# original_link = delete_tweet_pathnames(clean_tweet(tweet_links[i])) + +# if status: +# original_link = delete_tweet_pathnames( +# f'https://twitter.com/{tweet_links[i]}') +# elif not '://' in tweet_links[i]: +# original_link = delete_tweet_pathnames(f'https://{tweet_links[i]}') + +# response_html = requests.get(original_link) + +# if mimetype[i] == 'text/html' or mimetype[i] == 'warc/revisit' or mimetype[ +# i] == 'unk': +# if ('.jpg' in tweet_links[i] or '.png' +# in tweet_links[i]) and response_html.status_code == 200: +# components.iframe(tweet_links[i], height=500, scrolling=True) +# elif '/status/' not in original_link: +# st.info("This isn't a status or is not available") +# elif status or f'{st.session_state.current_handle}' not in original_link: +# st.info(f'Replying to {st.session_state.current_handle}') +# else: +# components.iframe(clean_link(link), height=500, scrolling=True) + +# elif mimetype[i] == 'application/json': +# try: +# response_json = requests.get(link) + +# if response_json.status_code == 200: +# json_data = response_json.json() + +# if 'data' in json_data: +# if 'text' in json_data['data']: +# json_text = json_data['data']['text'] +# else: +# json_text = json_data['data'] +# else: +# if 'text' in json_data: +# json_text = json_data['text'] +# else: +# json_text = json_data + +# st.code(json_text) +# st.json(json_data, expanded=False) + +# st.divider() +# else: +# st.error(response_json.status_code) + +# st.divider() +# except requests.exceptions.Timeout: +# st.error('Connection to web.archive.org timed out.') +# st.divider() +# except requests.exceptions.ConnectionError: +# st.error( +# 'Failed to establish a new connection with web.archive.org.') +# st.divider() +# except UnboundLocalError: +# st.empty() +# else: +# st.warning('MIME Type was not parsed.') +# st.divider() + +# try: +# links = query_api(handle, saved_at) + +# parse = parse_links(links) +# parsed_links = parse[0] +# tweet_links = parse[1] +# mimetype = parse[2] +# timestamp = parse[3] + +# if links: +# for i in range(tweets_per_page): + +# if tweet_links[i]: +# link = parsed_links[i] +# tweet = embed(tweet_links[i]) + +# status = check_double_status(link, tweet_links[i]) + +# if not not_available: +# attr(i) + +# if tweet: +# status_code = tweet[0] +# tweet_content = tweet[1] +# user_info = tweet[2] +# is_RT = tweet[3] + +# display_tweet() +# elif not tweet: +# display_not_tweet() + +# if not_available: +# if not tweet: +# return_none_count += 1 +# attr(i) + +# display_not_tweet() + +# if not links: +# print('Unable to query the Wayback Machine API.') +# except TypeError as e: +# print( +# f'{e}.\nRefresh this page and try again. If the problem persists [open an issue](https://github.com/claromes/waybacktweets/issues).' +# ) diff --git a/waybacktweets/utils.py b/waybacktweets/utils.py new file mode 100644 index 0000000..c9ca771 --- /dev/null +++ b/waybacktweets/utils.py @@ -0,0 +1,70 @@ +import re + + +def clean_tweet(tweet, username): + tweet_lower = tweet.lower() + + pattern = re.compile(r'/status/(\d+)') + match_lower_case = pattern.search(tweet_lower) + match_original_case = pattern.search(tweet) + + if match_lower_case and username in tweet_lower: + return f'https://twitter.com/{username}/status/{match_original_case.group(1)}' + else: + return tweet + + +def clean_wayback_machine_url(wayback_machine_url, archived_timestamp, + username): + wayback_machine_url = wayback_machine_url.lower() + + pattern = re.compile(r'/status/(\d+)') + match = pattern.search(wayback_machine_url) + + if match and username in wayback_machine_url: + return f'https://web.archive.org/web/{archived_timestamp}/https://twitter.com/{username}/status/{match.group(1)}' + else: + return wayback_machine_url + + +def pattern_tweet(tweet): + # Reply: /status// + # Link: /status/// + # Twimg: /status/https://pbs + + pattern = re.compile(r'/status/"([^"]+)"') + + match = pattern.search(tweet) + if match: + return match.group(1).lstrip('/') + else: + return tweet + + +def delete_tweet_pathnames(tweet): + # Delete pathnames (/photos, /likes, /retweet...) + + pattern_username = re.compile(r'https://twitter\.com/([^/]+)/status/\d+') + match_username = pattern_username.match(tweet) + + pattern_id = r'https://twitter.com/\w+/status/(\d+)' + match_id = re.search(pattern_id, tweet) + + if match_id and match_username: + tweet_id = match_id.group(1) + username = match_username.group(1) + return f'https://twitter.com/{username}/status/{tweet_id}' + else: + return tweet + + +def check_double_status(wayback_machine_url, original_tweet): + if wayback_machine_url.count( + '/status/') == 2 and not 'twitter.com' in original_tweet: + return True + + return False + + +def semicolon_parse(string): + return ''.join('%3B' if c == ';' else c for c in string) -- 2.34.1