8 minute read

mochi-bot-twitter Repository Explanation

rtree/mochi-bot-twitter is a sample Python script repository that integrates OpenAI (ChatGPT) and X(Twitter), while also incorporating Bing Search results to fetch the latest information. It demonstrates how to automatically tweet summaries of up-to-date content.


Overview

  1. Conversation History Management with OpenAI (ChatGPT API)
    • Uses a conversation history (a deque) to store both user inputs and AI responses.
    • Each new user input is appended to this history, which is then sent to OpenAI’s API to generate replies that consider past dialogue.
  2. Incorporating Bing Search for Up-to-Date Information
    • The script extracts relevant keywords from the user’s query and searches them on Bing.
    • It fetches the top few pages or PDF files, processes their contents, and generates a summary using ChatGPT.
  3. Posting Summaries to Twitter (via tweepy)
    • After generating a summarized response, the script can optionally post it as a series of tweets.
    • It supports splitting a long response into multiple tweets in a thread.

High-Level Flow

  1. Load Environment Variables (dotenv)
    • Reads .env file to get OPENAI_API_KEY, TWITTER_API_KEY, and so on.
    • Ensures each service has the required credentials for operation.
  2. Conversation History (conversation_history)
    • Maintained using a deque with a maximum length defined by HISTORY_LENGTH.
    • Stores recent messages, ensuring context is preserved in the conversation.
  3. OpenAI API Integration (openai module)
    • A ChatGPT client is created via OpenAI(api_key=OPENAI_API_KEY).
    • For each user message, the script calls client.chat.completions.create(...) with the appropriate model (e.g., gpt-3.5-turbo or a specified model).
  4. Bing Search
    • A function search_bing() calls the Bing Search API (https://api.bing.microsoft.com/v7.0/search) using the keywords extracted from the user’s query.
    • Up to SEARCH_RESULTS (default 8) top results are processed; their snippets or page contents are retrieved.
  5. Summarizing Results
    • summarize_results_with_pages_async() fetches each page’s content (HTML, PDF, etc.), partially processes or extracts text, and concatenates the results.
    • The concatenated text is then summarized by ChatGPT, producing a final answer that blends both the search results and the AI’s own reasoning.
  6. Twitter Posting (Optional)
    • If TWITTER_DO_TWEET = True, the resulting summary is split into multiple tweets, each posted as a reply to the previous one (a tweet thread).
    • The code uses the tweepy library for tweeting.
  7. Main Function Execution
    • When python main.py is run, it triggers asyncio.run(run_bot()).
    • A sample user query is defined ("Summarize today’s news ..."), Bing Search is performed if needed, the AI provides a response, and (optionally) tweets are posted.

Code Structure

Below is the core Python script from the repository for reference:

import os
import tweepy
from dotenv import load_dotenv
from openai import OpenAI
import requests
from bs4 import BeautifulSoup
from collections import deque
import asyncio
from datetime import datetime
import lxml
from PyPDF2 import PdfReader
from io import BytesIO
import base64
import sys
import logging

# Load environment variables
load_dotenv()
OPENAI_API_KEY       = os.getenv('OPENAI_API_KEY')
BING_API_KEY         = os.getenv('BING_API_KEY')
HISTORY_LENGTH       = 10
SEARCH_RESULTS       = 8
SEARCH_MAX_CONTENT_LENGTH   = 5000
TWITTER_API_KEY = os.getenv('TWITTER_API_KEY')
TWITTER_API_SECRET = os.getenv('TWITTER_API_SECRET')
TWITTER_ACCESS_TOKEN = os.getenv('TWITTER_ACCESS_TOKEN')
TWITTER_ACCESS_SECRET = os.getenv('TWITTER_ACCESS_SECRET')
TWITTER_BEARER_TOKEN  = os.getenv('TWITTER_ACCESS_SECRET')
TWITTER_DO_TWEET      = False
TWITTER_DELIMITER     = "@@@@@@@@@@"
REPUTABLE_DOMAINS = [
    "go.jp", "gov",  # Government and public sector
    "scholar.google.com", "ci.nii.ac.jp", "pubmed.ncbi.nlm.nih.gov", "arxiv.org", "jstage.jst.go.jp", "ac.jp",  # Academic and research databases
    "nikkei.com",  # News and business
    "nature.com", "sciencedirect.com", "springer.com", "wiley.com",  # Scientific publishers
    "ieee.org", "researchgate.net",  # Technical and engineering
    "cambridge.org", "oxfordjournals.org",  # Prestigious university publishers
    "jamanetwork.com", "nejm.org", "plos.org"  # Medical and health research
]

#GPT_MODEL            = 'gpt-4-turbo-preview'
GPT_MODEL            = os.getenv('GPT_MODEL')
AINAME               = "もちお"
CHARACTER            = f'You are an AI assistant cat named "{AINAME}", slightly mischievous and cute, speaking politely in Japanese with some playful endings like "だよ".'

###########################
#
# Initialize

# Conversation history
conversation_history = deque(maxlen=HISTORY_LENGTH)
# Logging
log_dir = "./log"
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"{datetime.today().strftime("%Y-%m-%d")}.log")
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s - %(levelname)s - %(message)s",
                    handlers=[
                        logging.FileHandler(log_file, mode="a"),
                        logging.StreamHandler()
                    ])

# OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

# Twitter client
twclient = tweepy.Client(
    bearer_token=TWITTER_BEARER_TOKEN,
    consumer_key=TWITTER_API_KEY,
    consumer_secret=TWITTER_API_SECRET,
    access_token=TWITTER_ACCESS_TOKEN,
    access_token_secret=TWITTER_ACCESS_SECRET
)

# -------------------------------- Search related ----------------------------

def parse_prompt(discIn):
    p_src = ("You are an assistant that analyzes the user's prompt. "
             "Please extract the main subject, subtopics, and relevant keywords from the recent user input.")
    messages = []
    messages.extend(conversation_history)
    messages.append({"role": "user", "content": p_src})
    response = client.chat.completions.create(
        model=GPT_MODEL,
        messages=messages
    )
    logging.info("= parse_prompt ============================================")
    logging.info(f"response: {response.choices[0].message.content}")
    logging.info("= End of parse_prompt =====================================")

    return response.choices[0].message.content

def should_search(discIn):
    p_src = ("You are a smart assistant. Analyze the conversation history to determine whether the user’s query "
             "requires external, up-to-date information. If yes, return 'Yes' only.")
    messages = []
    messages.extend(conversation_history)
    messages.append({"role": "user", "content": p_src})
    response = client.chat.completions.create(
        model=GPT_MODEL,
        messages=messages
    )
    logging.info("= should_search ============================================")
    logging.info(f"response: {response.choices[0].message.content}")
    logging.info("= End of should_search =====================================")
    return response.choices[0].message.content

def extract_keywords(parsed_text):
    p_src = (f"You are an assistant that extracts concise search keywords from the analyzed prompt. "
             f"Output them separated by single spaces: {parsed_text}")
    messages = []
    messages.extend(conversation_history)
    messages.append({"role": "user", "content": p_src})
    response = client.chat.completions.create(
        model=GPT_MODEL,
        messages=messages
    )
    logging.info("= extract_keywords ============================================")
    logging.info(f"response: {response.choices[0].message.content}")
    logging.info("= End of extract_keywords =====================================")

    return response.choices[0].message.content

def search_bing(query, domains=REPUTABLE_DOMAINS, count=SEARCH_RESULTS):
    url = "https://api.bing.microsoft.com/v7.0/search"
    headers = {"Ocp-Apim-Subscription-Key": BING_API_KEY}
    params = {"q": query, "count": count, "mkt": "en-US", "freshness": "Day", "sortBy": "Date"}
    response = requests.get(url, headers=headers, params=params)
    response.raise_for_status()
    search_data = response.json()
    search_data["urls"] = [result["url"] for result in search_data.get("webPages", {}).get("value", [])[:SEARCH_RESULTS]]

    logging.info("Bing Search Results:")
    for result in search_data.get("webPages", {}).get("value", [])[:count]:
        logging.info(f"Title: {result['name']}")
        logging.info(f"URL: {result['url']}")
        logging.info(f"Snippet: {result['snippet']}")
        logging.info("---")
    return search_data

def fetch_page_content(url):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        content_type = response.headers.get("Content-Type", "")

        if "application/pdf" in content_type:
            pdf_reader = PdfReader(BytesIO(response.content))
            pdf_text = "".join(page.extract_text() for page in pdf_reader.pages)
            return pdf_text[:SEARCH_MAX_CONTENT_LENGTH], "PDF"
        elif "text/html" in content_type:
            soup = BeautifulSoup(response.content, "lxml")
            return soup.get_text(separator="\n", strip=True)[:SEARCH_MAX_CONTENT_LENGTH], "HTML"
        elif content_type.startswith("image/"):
            base64_img = base64.b64encode(response.content).decode("utf-8")
            data_url = f"data:{content_type};base64,{base64_img}"
            return data_url, "Image"
        else:
            return None, "Unsupported"
    except Exception as e:
        logging.info(f"Error fetching {url}: {str(e)}")
        return None, "Error"

################################################################################################################

async def fetch_page_content_async(url):
    """
    Asynchronously fetch page content by offloading blocking I/O to a thread.
    Returns (content, type).
    """
    def blocking_fetch():
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            content_type = response.headers.get("Content-Type", "")

            if "application/pdf" in content_type:
                pdf_reader = PdfReader(BytesIO(response.content))
                pdf_text = "".join(page.extract_text() for page in pdf_reader.pages)
                return pdf_text[:SEARCH_MAX_CONTENT_LENGTH], "PDF"
            elif "text/html" in content_type:
                soup = BeautifulSoup(response.content, "lxml")
                text = soup.get_text(separator="\n", strip=True)
                return text[:SEARCH_MAX_CONTENT_LENGTH], "HTML"
            elif content_type.startswith("image/"):
                base64_img = base64.b64encode(response.content).decode("utf-8")
                data_url = f"data:{content_type};base64,{base64_img}"
                return data_url, "Image"
            else:
                return None, "Unsupported"
        except Exception as e:
            logging.info(f"Error fetching {url}: {str(e)}")
            return None, "Error"

    content, ctype = await asyncio.to_thread(blocking_fetch)
    return content, ctype

################################################################################################################
################################################################################################################

async def summarize_results_with_pages_async(search_results):
    """
    Asynchronously fetch content for each search result. 
    Returns a combined string of all results (page or snippet).
    """
    content_list = []
    web_results = search_results.get("webPages", {}).get("value", [])[:SEARCH_RESULTS]

    tasks = [fetch_page_content_async(r["url"]) for r in web_results]
    pages = await asyncio.gather(*tasks, return_exceptions=True)

    for (r, page_result) in zip(web_results, pages):
        title = r["name"]
        snippet = r["snippet"]
        url = r["url"]

        if isinstance(page_result, Exception):
            content_list.append(f"Title: {title}\nURL: {url}\nSnippet:\n{snippet}\n")
            continue

        page_content, content_type = page_result
        if content_type in ("HTML", "PDF") and page_content:
            content_list.append(
                f"Title: {title}\nURL: {url}\nContent:\n{page_content}\n"
            )
        else:
            content_list.append(
                f"Title: {title}\nURL: {url}\nSnippet:\n{snippet}\n"
            )

    return "\n".join(content_list)

async def summarize_results_async(search_results):
    """
    Calls GPT to summarize the combined content from search results.
    """
    snippets = await summarize_results_with_pages_async(search_results)

    p_src = (
        f"{CHARACTER}. You are summarizing these search results as a report. "
        "Please consider the conversation history, grasp the user's intent, "
        "and summarize the following search results in English or Japanese, as needed. "
        "Incorporate your own knowledge if it leads to a higher quality answer. "
        f"For Twitter, please split text around every 180 characters with the delimiter {TWITTER_DELIMITER}, etc. "
        f"Here is the data:\n{snippets}"
    )

    def blocking_chat_completion():
        messages = [{"role": "system", "content": CHARACTER}]
        messages.extend(conversation_history)
        messages.append({"role": "user", "content": p_src})

        return client.chat.completions.create(
            model=GPT_MODEL,
            messages=messages
        )

    response = await asyncio.to_thread(blocking_chat_completion)
    summary = response.choices[0].message.content

    titles = search_results.get("titles", [])
    urls = search_results.get("urls", [])
    sources = "\n".join(f"Source: {t} - {u}" for t, u in zip(titles, urls))

    return f"{summary}\n\n{sources}"

async def search_or_call_openai_async(discIn, img):
    """
    Decide whether external Bing search is needed; if yes, fetch pages concurrently,
    then summarize. Otherwise, call GPT directly.
    """
    if img or any("http" in entry["content"] for entry in discIn):
        logging.info("Skipping search and calling OpenAI directly.")
        return just_call_openai(discIn)
    else:
        yesorno = "Yes"  # Hardcoded example; normally would call should_search(discIn)
        if "Yes" in yesorno:
            logging.info("searching... ---------------------------------------------")
            parsed_result = parse_prompt(discIn)
            keywords = extract_keywords(parsed_result)
            logging.info(f"keyword: {keywords}")
            search_results = search_bing(keywords)

            summary = await summarize_results_async(search_results)
            return summary
        else:
            logging.info("generating... --------------------------------------------")
            return just_call_openai(discIn)

async def ai_respond(discIn, img):
    """
    High-level function to produce an AI response (asynchronous).
    """
    try:
        result = await search_or_call_openai_async(discIn, img)
        return result
    except Exception as e:
        logging.info(f"API Call Error: {str(e)}")
        return f"Error: {str(e)}"

def just_call_openai(discIn):
    messages = [{"role": "system", "content": f"{CHARACTER}"}]
    messages.extend(conversation_history)
    completion = client.chat.completions.create(
        model=GPT_MODEL,
        messages=messages
    )
    return completion.choices[0].message.content

def post_to_twitter(content):
    tweets = content.split(TWITTER_DELIMITER)
    trimmed_tweets = [tweet[:199] for tweet in tweets]

    first_tweet = twclient.create_tweet(text=trimmed_tweets[0])
    tweet_id = first_tweet.data["id"]
    
    for tweet in trimmed_tweets[1:]:
        reply_tweet = twclient.create_tweet(text=tweet, in_reply_to_tweet_id=tweet_id)
        tweet_id = reply_tweet.data["id"]

    logging.info("Tweet thread posted successfully!")

async def run_bot():
    msg = f"Summarize today's news. Today's date is ({datetime.today().strftime('%Y-%m-%d')}). Focus on economy and technology."
    img_url = None
    logging.info("-User input------------------------------------------------------------------")
    logging.info(f"  Message content: '{msg}'")
    discIn = [{"role": "user", "content": msg}]
    conversation_history.extend(discIn)

    response = await ai_respond(discIn, img_url)
    conversation_history.append({"role": "assistant", "content": response})
    logging.info("-Agent response--------------------------------------------------------------")
    logging.info(f"  Response content: '{response}'")

    if TWITTER_DO_TWEET:
        post_to_twitter(response)

def twtest():
    if TWITTER_DO_TWEET:
        content = "Sample tweet for testing."
        tweets = content.split(TWITTER_DELIMITER)
        trimmed_tweets = [tweet[:199] for tweet in tweets]

        first_tweet = twclient.create_tweet(text=trimmed_tweets[0])
        tweet_id = first_tweet.data["id"]
        
        for tweet in trimmed_tweets[1:]:
            reply_tweet = twclient.create_tweet(text=tweet, in_reply_to_tweet_id=tweet_id)
            tweet_id = reply_tweet.data["id"]
        logging.info("Tweet posted successfully!")

if __name__ == "__main__":
    if not ("test" in sys.argv):
        TWITTER_DO_TWEET = True
    
    asyncio.run(run_bot())

How to Set Up and Use

1.Clone the Repository

   git clone https://github.com/rtree/mochi-bot-twitter.git
   cd mochi-bot-twitter

2.Prepare your .env File

   OPENAI_API_KEY=sk-xxxx
   BING_API_KEY=xxxxxxxx
   TWITTER_API_KEY=xxxxxxxx
   TWITTER_API_SECRET=xxxxxxxx
   TWITTER_ACCESS_TOKEN=xxxxxxxx
   TWITTER_ACCESS_SECRET=xxxxxxxx
   GPT_MODEL=xxxxxxxx

3.Install Dependencies

   pip install -r requirements.txt

4.Run the script

   python main.py
  • If you want to disable automatic tweeting, set TWITTER_DO_TWEET = False in the code (or pass an argument and handle it).
  • If everything is set up correctly, the script will:
    • Load environment variables
    • Attempt to fetch the latest news from Bing
    • Summarize the findings with ChatGPT
    • Optionally post the summary to Twitter

Conclusion

With mochi-bot-twitter, you can:

  • Use ChatGPT to generate rich, contextual answers
  • Fetch the latest information via Bing Search
  • Post AI-generated summaries directly to Twitter
  • Feel free to customize and expand on this example to create your own news aggregator bot or question-answer bot, suited to your specific needs.

View the mochi-bot-twitter repository here.

Updated: