Aggregate news by topics using OpenAI API, Hacker News API and FastAPI
How to create a useful website in four steps
Description
In this article, we will create a website that aggregates the latest top stories from Hacker News. To accomplish this, we will utilize the HackerNews API to fetch today’s top stories. Additionally, we will make an OpenAI API request to group the news articles by topics, storing the results in JSON format. The website will be served using FastAPI and the Jinja Template Engine
Step 1. Get top stories from Hacker News
To see the full code listing please check
worker.py
file in the GitHub repo
First, let’s fetch stories ids in the form of a list of integers
def get_topstories(max_stories=30):
# Get top stories
topstories = requests.get("https://hacker-news.firebaseio.com/v0/topstories.json")
if (code := topstories.status_code) != 200:
raise ValueError(f"topstories status code: {code}")
topstories_ids = topstories.json()
# Filter stores
return topstories_ids[:max_stories] # i.e. [3000, 3004, 3051]
As one may note, we will limit the number of stories to be analyzed by max_stories=30
parameter
The tricky part is how to perform all 30 requests async. We will use aiohttp
and create helpers.py
file to add functions below:
import aiohttp
import asyncio
BATCH_SIZE = 15
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def process_batch(session, urls):
tasks = []
for url in urls:
task = asyncio.ensure_future(fetch_url(session, url))
tasks.append(task)
return await asyncio.gather(*tasks)
async def process_urls(urls, batch_size=BATCH_SIZE):
async with aiohttp.ClientSession() as session:
batches = [urls[i : i + batch_size] for i in range(0, len(urls), batch_size)]
results = []
for batch in batches:
batch_results = await process_batch(session, batch)
results.extend(batch_results)
return results
Now we can pass a list of URLs into process_urls
to process all requests using an asynchronous approach.
Let’s prepare URLs using list_of_items
:
def get_items(list_of_items: List[int], batch_size=12):
# Prepare API requests to get all items
URL_ITEM = "https://hacker-news.firebaseio.com/v0/item/{}.json"
urls = [URL_ITEM.format(t_s) for t_s in list_of_items]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(process_urls(urls, batch_size))
return results
list_of_items = get_topstories()
results = get_items(list_of_items)
# Now we have a list of urls:
# ["https://hacker-news.firebaseio.com/v0/item/3001.json",
# "https://hacker-news.firebaseio.com/v0/item/4001.json",
# ...]
Next, we will transform the retrieved results into a format that is easy to parse for ChatGPT requests. We will retain both the “title” and “URL” fields since the URL can provide valuable insights for classifying elements.
results_parsed = [
f"{el['title']} URL: {el['url']}"
for el in results if el.get("url", None) is not None
]
# The result will be:
# ["The Password Game URL: https://neal.fun/password-game/",
# "FreeBSD Jails Containers URL: https://vermaden.wordpress.com/2023/06/28/freebsd-jails-containers/"
# ...]
Step 2. Make OpenAI API requests and process results
First, let’s create a function named get_openai_promt
. It takes List[strings]
as input and returns system_message
and user_message
(we will use chat-optimized models)
from typing import List, Tuple
def get_openai_prompt(topics: List[str]) -> Tuple[dict, dict]:
system_message = {
"role": "system",
"content": (
"You are an assistant that can group news articles from hackernews (news.ycombinator.com) into topics"
),
}
user_message = {
"role": "user",
"content": (
"Group the following news articles into topics\n\n"
+ topics
+ "\n\nUse the following format:\n"
+ "topic_name_1\n- title\turl\n- title\turl\ntopic_name_2\n\ttitle\turl"
),
}
return system_message, user_message
The next step is to request OpenAI via API, parse a response and save it as a json
file
import openai
topics = "\n\n".join(results_parsed)
s_m, u_m = get_openai_prompt(topics=topics) # system & user messages
# Get an API-key here: https://platform.openai.com/account/api-keys
openai.api_key = "sk-74xTNuflpF3CtQAdOeD3T3BlXkFJhYw70q1XYJKxqq0XdBZS"
# Get response from the model
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[s_m, u_m],
max_tokens=2200, # You can increase this number if needed
)
# Get a body of the response
res = response["choices"][0]["message"]["content"].split("\n")
# Parse results
# Sometimes response may be structured in different
current_topic = None
dict_ = {}
titles_returned = {}
for l in res:
if l == "\n": # We will ignore empty strings
continue
if not ("http://" in l.lower() or "https://" in l.lower()):
# If there is no link in the string it means that the string is a "topic"
current_topic = l
continue
# Otherwise current string is a title that contains a link as well
if current_topic not in dict_:
dict_[current_topic] = {}
pattern = r"- (.+?)\s*URL:"
pattern2 = r"- (.+?)\s*http"
match = re.search(pattern, l)
match2 = re.search(pattern2, l)
if match:
substring = str(match.group(1))
titles_returned[substring] = current_topic
elif match2:
substring = str(match2.group(1))
titles_returned[substring] = current_topic
else:
print(l)
data = {}
for r in results:
if "url" not in r or "score" not in r:
print("Skip")
continue
data[r["title"]] = {"url": r["url"], "score": r["score"]}
for k in data:
if k in titles_returned:
data[k]["topic"] = titles_returned[k]
continue
data[k]["topic"] = "Other"
prefix = datetime.datetime.now().strftime("%Y-%m-%d")
fname = f"data/{prefix}_articles.json"
json.dump(data, open(fname, "w"))
The script will generate a JSON similar to the one below:
We are now ready to use this JSON for use in our website
Step 3. Website (FastAPI + Jinja Templates)
To see the full code listing please check
app/app.py
file in the GitHub repo
Let’s create app.py
file in app
folder.
import json
from collections import defaultdict
import uvicorn
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import glob
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
@app.get("/")
def get_articles(request: Request):
fname = sorted(glob.glob("data/*_articles.json"), reverse=True)[0]
with open(fname, "r") as json_file:
articles = json.load(json_file)
grouped_articles = {}
for title, article in articles.items():
topic = article["topic"]
if topic in grouped_articles:
grouped_articles[topic][title] = article
else:
grouped_articles[topic] = {title: article}
# Calculate total score for each topic/group
topic_scores = defaultdict(lambda: 0)
for topic, data in articles.items():
topic_scores[data["topic"]] += data["score"]
return templates.TemplateResponse(
"index.html",
{
"request": request,
"articles": grouped_articles,
"topic_scores": topic_scores,
},
)
if __name__ == "__main__":
uvicorn.run("app:app", host="127.0.0.1", port=5556, reload=True)
It would be ideal to avoid reading the .json file from the file system each time and instead keep it in memory, occasionally updating it. However, for the sake of code simplicity in our specific situation, we have opted for the most basic code that accomplishes its task. We anticipate that the website’s load will be minimal, with less than one request per second (RPS).
Now let’s prepare index.html
and styles.css
files
index.html
styles.css
Step 4. Run and see the results
To simultaneously run both scripts, app.py
for the web server and worker.py
for interacting with an external API, we can utilize tmux.
To run the server, use the following command
uvicorn app.app:app --port 5556
To run the worker, use the command below
while true; do python3 worker.py; ls data/*; sleep 12h; done
Now you can open your favourite browser and test the result: http://localhost:5556
or test the production version at https://betterhacker.news
Thank you for taking the time to read this. You can find the complete code available on GitHub