I won't know if this script works until I run it and see the errors but the comments won't start to generate until after all the posts so I can't debug that part until I've already created too much content.
import sqlite3
import requests
from pythorhead import Lemmy
import schedule
import time
import logging
from config import *
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
handlers=[logging.FileHandler("debug.log"), logging.StreamHandler()],
)
def initialize_database():
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
github_url TEXT PRIMARY KEY,
lemmy_post_id INTEGER,
lemmy_post_name TEXT,
lemmy_post_body TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS comments (
github_comment_id INTEGER PRIMARY KEY,
lemmy_comment_id INTEGER,
comment_user TEXT,
comment_body TEXT
)
""")
conn.commit()
return conn
def initialize_lemmy_instance():
lemmy = Lemmy(LEMMY_INSTANCE_URL)
lemmy.log_in(LEMMY_USERNAME, LEMMY_PASSWORD)
logging.info("Initialized Lemmy instance")
return lemmy
def discover_community(lemmy, community_name):
community_id = lemmy.discover_community(community_name)
logging.info(f"Discovered community {community_name} with ID {community_id}")
return community_id
def fetch_github_issues(repo):
url = f"{GITHUB_API_BASE}/repos/{repo}/issues"
headers = {"Accept": "application/vnd.github+json"}
response = requests.get(url, headers=headers)
logging.info(f"Fetched issues from {url}")
return response.json()
def extract_issue_info(issue, repo):
issue_url = issue["html_url"]
issue_state = "[Closed]" if issue["state"] == "closed" else ""
repo_abbr = "[BE]" if "lemmy" in repo else "[UI]"
issue_title = f"{issue_state}{repo_abbr} {issue['title']} #{issue['number']}"
issue_body = issue["body"]
return issue_url, issue_title, issue_body
def post_issues_to_lemmy(lemmy, community_id, repo):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
issues = fetch_github_issues(repo)
for issue in issues:
issue_url, issue_title, issue_body = extract_issue_info(issue, repo)
cursor.execute("SELECT lemmy_post_id FROM posts WHERE github_url=?", (issue_url,))
existing_post = cursor.fetchone()
if not existing_post:
post = lemmy.post.create(community_id, issue_title, url=issue_url, body=issue_body)["post_view"]["post"]
lemmy_post_id = post["id"]
lemmy_post_name = post["name"]
lemmy_post_body = post["body"]
cursor.execute("INSERT INTO posts (github_url, lemmy_post_id, lemmy_post_name, lemmy_post_body) VALUES (?, ?, ?, ?)", (issue_url, lemmy_post_id, lemmy_post_name, lemmy_post_body))
conn.commit()
logging.info(f"Posted issue {issue_title} to community {community_id}")
def fetch_github_comments(repo, issue_number):
url = f"{GITHUB_API_BASE}/repos/{repo}/issues/{issue_number}/comments"
headers = {"Accept": "application/vnd.github+json"}
response = requests.get(url, headers=headers)
logging.info(f"Fetched comments for issue #{issue_number}")
return response.json()
def post_comments_to_lemmy(lemmy, post_id, repo, issue_number):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
github_comments = fetch_github_comments(repo, issue_number)
for comment in github_comments:
github_comment_id = comment["id"]
cursor.execute("SELECT lemmy_comment_id FROM comments WHERE github_comment_id=?", (github_comment_id,))
existing_comment = cursor.fetchone()
if not existing_comment:
comment_user = comment["user"]["login"]
comment_body = comment["body"]
lemmy_comment_id = lemmy.comment.create(post_id, comment_body)["comment"]["id"]
cursor.execute("INSERT INTO comments (github_comment_id, lemmy_comment_id, comment_user, comment_body) VALUES (?, ?, ?, ?)", (github_comment_id, lemmy_comment_id, comment_user, comment_body))
conn.commit()
logging.info(f"Posted comment {github_comment_id} to lemmy post {post_id}")
# Fetch the GitHub issue number and Lemmy post ID for each issue
def fetch_issue_data(repo):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT github_url, lemmy_post_id FROM posts WHERE github_url LIKE ?", (f"https://github.com/{repo}/issues/%",))
issue_data = cursor.fetchall()
return issue_data
def extract_issue_number(github_url):
return int(github_url.split("/")[-1])
def main():
logging.info("Running main function")
initialize_database()
lemmy = initialize_lemmy_instance()
community_id = discover_community(lemmy, LEMMY_COMMUNITY_NAME)
for repo in REPOSITORIES:
post_issues_to_lemmy(lemmy, community_id, repo)
issue_data = fetch_issue_data(repo)
for github_url, lemmy_post_id in issue_data:
issue_number = extract_issue_number
post_comments_to_lemmy(lemmy, lemmy_post_id, repo, issue_number)
def run_periodically():
main()
schedule.every(2).hours.do(main)
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == "__main__":
logging.info("Starting script")
run_periodically()