.venv/
*.csv
*.json
+*.html
waybacktweets/__pycache__
waybacktweets/notes.md
import re
import datetime
+from viz_tweets import *
+
def datetime_now():
now = datetime.datetime.now()
return data_transposed
-def response_tweets_csv(data, username):
+def response_tweets(data, username):
data_transposed = transpose_matrix(data)
formatted_datetime = datetime_now()
json_file_path = f'{filename}.json'
df.to_json(json_file_path, orient='records', lines=False)
- print(f'Done. Check the files {filename}.csv and {filename}.json')
+ html_file_path = f'{filename}.html'
+
+ json_content = read_json(json_file_path)
+ html_content = generate_html(json_content, username)
+ save_html(html_file_path, html_content)
+
+ print(
+ f'Done. Check the files {filename}.csv, {filename}.json and {filename}.html'
+ )
from request_tweets import *
-from tweet_parse import *
+from parse_tweets import *
from export_tweets import *
username = 'claromes'
if archived_tweets:
data = parse_archived_tweets(archived_tweets, username)
- response_tweets_csv(data, username)
+ response_tweets(data, username)
print(
f'\nNeed help? Open an issue: https://github.com/claromes/waybacktweets/issues.'
--- /dev/null
+import requests
+import re
+from urllib.parse import unquote
+from utils import *
+
+
+def embed(tweet):
+ try:
+ url = f'https://publish.twitter.com/oembed?url={tweet}'
+ response = requests.get(url)
+
+ regex = r'<blockquote class="twitter-tweet"(?: [^>]+)?><p[^>]*>(.*?)<\/p>.*?— (.*?)<\/a>'
+ regex_author = r'^(.*?)\s*\('
+
+ if not (400 <= response.status_code <= 511):
+ html = response.json()['html']
+ author_name = response.json()['author_name']
+
+ matches_html = re.findall(regex, html, re.DOTALL)
+
+ tweet_content = []
+ user_info = []
+ is_RT = []
+
+ for match in matches_html:
+ tweet_content_match = re.sub(r'<a[^>]*>|<\/a>', '',
+ match[0].strip())
+ tweet_content_match = tweet_content_match.replace('<br>', '\n')
+
+ user_info_match = re.sub(r'<a[^>]*>|<\/a>', '',
+ match[1].strip())
+ user_info_match = user_info_match.replace(')', '), ')
+
+ match_author = re.search(regex_author, user_info_match)
+ author_tweet = match_author.group(1)
+
+ if tweet_content_match:
+ tweet_content.append(tweet_content_match)
+ if user_info_match:
+ user_info.append(user_info_match)
+
+ is_RT_match = False
+ if author_name != author_tweet:
+ is_RT_match = True
+
+ is_RT.append(is_RT_match)
+
+ return tweet_content, is_RT, user_info
+ except:
+ return None
+
+
+def parse_json_mimetype(tweet):
+ response_json = requests.get(tweet)
+
+ if not (400 <= response_json.status_code <= 511):
+ json_data = response_json.json()
+
+ if 'data' in json_data:
+ if 'text' in json_data['data']:
+ json_text = json_data['data']['text']
+ return json_text
+ else:
+ json_text = json_data['data']
+ return json_text
+ else:
+ if 'text' in json_data:
+ json_text = json_data['text']
+ return json_text
+ else:
+ json_text = json_data
+ return json_text
+
+
+def parse_archived_tweets(archived_tweets_response, username):
+ archived_urlkey = []
+ archived_timestamp = []
+ tweet = []
+ archived_tweet = []
+ parsed_tweet = []
+ parsed_tweet_mimetype_json = []
+ available_tweet_content = []
+ available_tweet_is_RT = []
+ available_tweet_username = []
+ parsed_archived_tweet = []
+ archived_mimetype = []
+ archived_statuscode = []
+ archived_digest = []
+ archived_length = []
+
+ for response in archived_tweets_response[1:]:
+ tweet_remove_char = unquote(response[2]).replace('’', '')
+ cleaned_tweet = pattern_tweet(tweet_remove_char).strip('"')
+
+ wayback_machine_url = f'https://web.archive.org/web/{response[1]}/{tweet_remove_char}'
+
+ original_tweet = delete_tweet_pathnames(
+ clean_tweet(cleaned_tweet, username))
+
+ parsed_wayback_machine_url = f'https://web.archive.org/web/{response[1]}/{original_tweet}'
+
+ double_status = check_double_status(wayback_machine_url,
+ original_tweet)
+
+ if double_status:
+ original_tweet = delete_tweet_pathnames(
+ f'https://twitter.com/{original_tweet}')
+
+ elif not '://' in original_tweet:
+ original_tweet = delete_tweet_pathnames(
+ f'https://{original_tweet}')
+
+ encoded_tweet = semicolon_parse(response[2])
+ encoded_archived_tweet = semicolon_parse(wayback_machine_url)
+ encoded_parsed_tweet = semicolon_parse(original_tweet)
+ encoded_parsed_archived_tweet = semicolon_parse(
+ parsed_wayback_machine_url)
+
+ content = embed(encoded_tweet)
+ if content:
+ available_tweet_content.append(content[0][0])
+ available_tweet_is_RT.append(content[1][0])
+ available_tweet_username.append(content[2][0])
+
+ if response[3] == 'application/json':
+ json_mimetype = parse_json_mimetype(encoded_archived_tweet)
+ parsed_tweet_mimetype_json.append(json_mimetype)
+
+ archived_urlkey.append(response[0])
+ archived_timestamp.append(response[1])
+ tweet.append(encoded_tweet)
+ archived_tweet.append(encoded_archived_tweet)
+ parsed_tweet.append(encoded_parsed_tweet)
+ parsed_archived_tweet.append(encoded_parsed_archived_tweet)
+ archived_mimetype.append(response[3])
+ archived_statuscode.append(response[4])
+ archived_digest.append(response[5])
+ archived_length.append(response[6])
+
+ return archived_urlkey, archived_timestamp, tweet, archived_tweet, parsed_tweet, parsed_tweet_mimetype_json, parsed_archived_tweet, archived_mimetype, archived_statuscode, archived_digest, archived_length, available_tweet_content, available_tweet_is_RT, available_tweet_username
+
+
+# if tweet_links[i]:
+# link = parsed_links[i]
+# tweet = embed(tweet_links[i])
+
+# parse = parse_links(links)
+# parsed_links = parse[0]
+# tweet_links = parse[1]
+# mimetype = parse[2]
+# timestamp = parse[3]
+
+# def display_not_tweet():
+# original_link = delete_tweet_pathnames(clean_tweet(tweet_links[i]))
+
+# if status:
+# original_link = delete_tweet_pathnames(
+# f'https://twitter.com/{tweet_links[i]}')
+# elif not '://' in tweet_links[i]:
+# original_link = delete_tweet_pathnames(f'https://{tweet_links[i]}')
+
+# response_html = requests.get(original_link)
+
+# if mimetype[i] == 'text/html' or mimetype[i] == 'warc/revisit' or mimetype[
+# i] == 'unk':
+# if ('.jpg' in tweet_links[i] or '.png'
+# in tweet_links[i]) and response_html.status_code == 200:
+# components.iframe(tweet_links[i], height=500, scrolling=True)
+# elif '/status/' not in original_link:
+# st.info("This isn't a status or is not available")
+# elif status or f'{st.session_state.current_handle}' not in original_link:
+# st.info(f'Replying to {st.session_state.current_handle}')
+# else:
+# components.iframe(clean_link(link), height=500, scrolling=True)
if timestamp_to:
timestamp_to = f'&to={timestamp_to}'
- url = f'https://web.archive.org/cdx/search/cdx?url=https://twitter.com/{username}/status/*&output=json{unique}{timestamp_from}{timestamp_to}&limit=10'
+ url = f'https://web.archive.org/cdx/search/cdx?url=https://twitter.com/{username}/status/*&output=json{unique}{timestamp_from}{timestamp_to}&limit=100'
print(f'Getting and parsing archived tweets from {url}')
try:
+++ /dev/null
-import requests
-import re
-from urllib.parse import unquote
-from utils import *
-
-
-def embed(tweet):
- try:
- url = f'https://publish.twitter.com/oembed?url={tweet}'
- response = requests.get(url)
-
- regex = r'<blockquote class="twitter-tweet"(?: [^>]+)?><p[^>]*>(.*?)<\/p>.*?— (.*?)<\/a>'
- regex_author = r'^(.*?)\s*\('
-
- if not (400 <= response.status_code <= 511):
- html = response.json()['html']
- author_name = response.json()['author_name']
-
- matches_html = re.findall(regex, html, re.DOTALL)
-
- tweet_content = []
- user_info = []
- is_RT = []
-
- for match in matches_html:
- tweet_content_match = re.sub(r'<a[^>]*>|<\/a>', '',
- match[0].strip())
- tweet_content_match = tweet_content_match.replace('<br>', '\n')
-
- user_info_match = re.sub(r'<a[^>]*>|<\/a>', '',
- match[1].strip())
- user_info_match = user_info_match.replace(')', '), ')
-
- match_author = re.search(regex_author, user_info_match)
- author_tweet = match_author.group(1)
-
- if tweet_content_match:
- tweet_content.append(tweet_content_match)
- if user_info_match:
- user_info.append(user_info_match)
-
- is_RT_match = False
- if author_name != author_tweet:
- is_RT_match = True
-
- is_RT.append(is_RT_match)
-
- return tweet_content, is_RT, user_info
- except:
- return None
-
-
-def parse_json_mimetype(tweet):
- response_json = requests.get(tweet)
-
- if not (400 <= response_json.status_code <= 511):
- json_data = response_json.json()
-
- if 'data' in json_data:
- if 'text' in json_data['data']:
- json_text = json_data['data']['text']
- return json_text
- else:
- json_text = json_data['data']
- return json_text
- else:
- if 'text' in json_data:
- json_text = json_data['text']
- return json_text
- else:
- json_text = json_data
- return json_text
-
-
-def parse_archived_tweets(archived_tweets_response, username):
- archived_urlkey = []
- archived_timestamp = []
- tweet = []
- archived_tweet = []
- parsed_tweet = []
- parsed_tweet_mimetype_json = []
- available_tweet_content = []
- available_tweet_is_RT = []
- available_tweet_username = []
- parsed_archived_tweet = []
- archived_mimetype = []
- archived_statuscode = []
- archived_digest = []
- archived_length = []
-
- for response in archived_tweets_response[1:]:
- tweet_remove_char = unquote(response[2]).replace('’', '')
- cleaned_tweet = pattern_tweet(tweet_remove_char).strip('"')
-
- wayback_machine_url = f'https://web.archive.org/web/{response[1]}/{tweet_remove_char}'
-
- original_tweet = delete_tweet_pathnames(
- clean_tweet(cleaned_tweet, username))
-
- parsed_wayback_machine_url = f'https://web.archive.org/web/{response[1]}/{original_tweet}'
-
- double_status = check_double_status(wayback_machine_url,
- original_tweet)
-
- if double_status:
- original_tweet = delete_tweet_pathnames(
- f'https://twitter.com/{original_tweet}')
-
- elif not '://' in original_tweet:
- original_tweet = delete_tweet_pathnames(
- f'https://{original_tweet}')
-
- encoded_tweet = semicolon_parse(response[2])
- encoded_archived_tweet = semicolon_parse(wayback_machine_url)
- encoded_parsed_tweet = semicolon_parse(original_tweet)
- encoded_parsed_archived_tweet = semicolon_parse(
- parsed_wayback_machine_url)
-
- content = embed(encoded_tweet)
- if content:
- available_tweet_content.append(content[0][0])
- available_tweet_is_RT.append(content[1][0])
- available_tweet_username.append(content[2][0])
-
- if response[3] == 'application/json':
- json_mimetype = parse_json_mimetype(encoded_archived_tweet)
- parsed_tweet_mimetype_json.append(json_mimetype)
-
- archived_urlkey.append(response[0])
- archived_timestamp.append(response[1])
- tweet.append(encoded_tweet)
- archived_tweet.append(encoded_archived_tweet)
- parsed_tweet.append(encoded_parsed_tweet)
- parsed_archived_tweet.append(encoded_parsed_archived_tweet)
- archived_mimetype.append(response[3])
- archived_statuscode.append(response[4])
- archived_digest.append(response[5])
- archived_length.append(response[6])
-
- return archived_urlkey, archived_timestamp, tweet, archived_tweet, parsed_tweet, parsed_tweet_mimetype_json, parsed_archived_tweet, archived_mimetype, archived_statuscode, archived_digest, archived_length, available_tweet_content, available_tweet_is_RT, available_tweet_username
-
-
-# if tweet_links[i]:
-# link = parsed_links[i]
-# tweet = embed(tweet_links[i])
-
-# parse = parse_links(links)
-# parsed_links = parse[0]
-# tweet_links = parse[1]
-# mimetype = parse[2]
-# timestamp = parse[3]
-
-# def display_not_tweet():
-# original_link = delete_tweet_pathnames(clean_tweet(tweet_links[i]))
-
-# if status:
-# original_link = delete_tweet_pathnames(
-# f'https://twitter.com/{tweet_links[i]}')
-# elif not '://' in tweet_links[i]:
-# original_link = delete_tweet_pathnames(f'https://{tweet_links[i]}')
-
-# response_html = requests.get(original_link)
-
-# if mimetype[i] == 'text/html' or mimetype[i] == 'warc/revisit' or mimetype[
-# i] == 'unk':
-# if ('.jpg' in tweet_links[i] or '.png'
-# in tweet_links[i]) and response_html.status_code == 200:
-# components.iframe(tweet_links[i], height=500, scrolling=True)
-# elif '/status/' not in original_link:
-# st.info("This isn't a status or is not available")
-# elif status or f'{st.session_state.current_handle}' not in original_link:
-# st.info(f'Replying to {st.session_state.current_handle}')
-# else:
-# components.iframe(clean_link(link), height=500, scrolling=True)
-
-# elif mimetype[i] == 'application/json':
-# try:
-# response_json = requests.get(link)
-
-# if response_json.status_code == 200:
-# json_data = response_json.json()
-
-# if 'data' in json_data:
-# if 'text' in json_data['data']:
-# json_text = json_data['data']['text']
-# else:
-# json_text = json_data['data']
-# else:
-# if 'text' in json_data:
-# json_text = json_data['text']
-# else:
-# json_text = json_data
-
-# st.code(json_text)
-# st.json(json_data, expanded=False)
-
-# st.divider()
-# else:
-# st.error(response_json.status_code)
-
-# st.divider()
-# except requests.exceptions.Timeout:
-# st.error('Connection to web.archive.org timed out.')
-# st.divider()
-# except requests.exceptions.ConnectionError:
-# st.error(
-# 'Failed to establish a new connection with web.archive.org.')
-# st.divider()
-# except UnboundLocalError:
-# st.empty()
-# else:
-# st.warning('MIME Type was not parsed.')
-# st.divider()
--- /dev/null
+import json
+
+
+def read_json(json_file_path):
+ with open(json_file_path, 'r', encoding='utf-8') as f:
+ return json.load(f)
+
+
+def generate_html(json_content, username):
+ html = f'<html>\n<head>\n<title>@{username} archived tweets</title>\n'
+ html += '<style>\n'
+ html += 'body { font-family: monospace; background-color: #f5f8fa; color: #1c1e21; margin: 0; padding: 20px; }\n'
+ html += '.container { display: flex; flex-wrap: wrap; gap: 20px; }\n'
+ html += '.tweet { flex: 0 1 calc(33.33% - 20px); background-color: #fff; border: 1px solid #e1e8ed; border-radius: 10px; padding: 15px; overflow-wrap: break-word; margin: auto; }\n'
+ html += '.tweet strong { font-weight: bold; }\n'
+ html += '.tweet a { color: #1da1f2; text-decoration: none; }\n'
+ html += '.tweet a:hover { text-decoration: underline; }\n'
+ html += 'h1 { text-align: center; }\n'
+ html += '</style>\n'
+ html += '</head>\n<body>\n'
+ html += f'<h1>@{username} archived tweets</h1>\n'
+ html += '<div class="container">\n'
+
+ for tweet in json_content:
+ html += '<div class="tweet">\n'
+ html += f'<p><strong>Archived Timestamp:</strong> {tweet["archived_timestamp"]}</p>\n'
+ html += f'<p><strong>Archived URL Key:</strong> {tweet["archived_urlkey"]}</p>\n'
+ html += f'<p><strong>Tweet:</strong> <a href="{tweet["tweet"]}">{tweet["tweet"]}</a></p>\n'
+ html += f'<p><strong>Archived Tweet:</strong> <a href="{tweet["archived_tweet"]}">{tweet["archived_tweet"]}</a></p>\n'
+ html += f'<p><strong>Parsed Tweet:</strong> <a href="{tweet["parsed_tweet"]}">{tweet["parsed_tweet"]}</a></p>\n'
+ html += f'<p><strong>Parsed Tweet Mimetype JSON:</strong> {tweet["parsed_tweet_mimetype_json"]}</p>\n'
+ html += f'<p><strong>Parsed Archived Tweet:</strong> <a href="{tweet["parsed_archived_tweet"]}">{tweet["parsed_archived_tweet"]}</a></p>\n'
+ html += f'<p><strong>Archived Mimetype:</strong> {tweet["archived_mimetype"]}</p>\n'
+ html += f'<p><strong>Archived Statuscode:</strong> {tweet["archived_statuscode"]}</p>\n'
+ html += f'<p><strong>Archived Digest:</strong> {tweet["archived_digest"]}</p>\n'
+ html += f'<p><strong>Archived Length:</strong> {tweet["archived_length"]}</p>\n'
+ html += f'<p><strong>Available Tweet Content:</strong> {tweet["available_tweet_content"]}</p>\n'
+ html += f'<p><strong>Available Tweet Is Retweet:</strong> {tweet["available_tweet_is_RT"]}</p>\n'
+ html += f'<p><strong>Available Tweet Username:</strong> {tweet["available_tweet_username"]}</p>\n'
+ html += '</div>\n'
+
+ html += '</div>\n'
+ html += '</body>\n</html>'
+
+ return html
+
+
+def save_html(html_file_path, html_content):
+ with open(html_file_path, 'w', encoding='utf-8') as f:
+ f.write(html_content)