Automate your blogging workflow by syncing your Obsidian notes directly to Hashnode using the GraphQL API and a Python script.


A raw reflection on breaking the cycle of failure through lifestyle redesign, awareness, and building systems that support growth instead of sabotage it.

Exploring the three major memory management approaches in programming: Garbage Collection, Manual Memory Management, and Rust's Ownership system.
I prefer using Obsidian as my primary editor and note-taking tool due to its intuitive user interface and the ability to store all data locally, ensuring privacy and ease of access.
Additionally, I find that sharing knowledge in the form of blogs enhances my understanding of concepts while also allowing me to contribute to the community, which I find personally fulfilling.
After researching the Hashnode GraphQL API, which enables managing blogs on your account, I decided to integrate it with a specific folder in my Obsidian workspace. This setup would:
I got my HASHNODE_API_KEY and HASHNODE_PUBLICATION_ID from the Hashnode developer site and wrote a Python script to automate the posting.
import os
import requests
import frontmatter
import logging
from constants import HASHNODE_API_KEY, HASHNODE_PUBLICATION_ID, OBSIDIAN_BLOG_FOLDER
class HashNodeBlogSync:
def __init__(
self,
obsidian_folder,
hashnode_personal_access_token,
publication_id,
):
self.obsidian_folder = obsidian_folder
self.hashnode_token = hashnode_personal_access_token
self.publication_id = publication_id
self.base_url = "https://gql.hashnode.com"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
self.logger = logging.getLogger(__name__)
def _parse_markdown_file(self, filepath):
try:
with open(filepath, "r", encoding="utf-8") as file:
post = frontmatter.load(file)
blog_data = {
"title": post.get("title", "Untitled"),
"content": post.content.strip(),
"canonicalUrl": post.get("canonical_url", ""),
"coverImageOptions": {"coverImageURL": post.get("cover_image", "")},
"existing_post_id": post.get("hashnode_post_id"),
"tags": [{"name": tag, "slug": tag} for tag in post.get("tags", [])],
}
return blog_data
except Exception as e:
self.logger.error(f"Error parsing Markdown file {filepath}: {e}")
raise
def publish_to_hashnode(self, blog_data):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.hashnode_token}",
}
is_update = bool(blog_data.get("existing_post_id"))
if is_update:
query = """
mutation UpdatePost($input: UpdatePostInput!) {
updatePost(input: $input) {
post {
id
title
url
}
}
}
"""
variables = {
"input": {
"id": blog_data["existing_post_id"],
"title": blog_data["title"],
"contentMarkdown": blog_data["content"],
}
}
else:
query = """
mutation PublishPost($input: PublishPostInput!) {
publishPost(input: $input) {
post {
id
title
url
}
}
}
"""
variables = {
"input": {
"title": blog_data["title"],
"publicationId": self.publication_id,
"contentMarkdown": blog_data["content"],
}
}
if blog_data.get("tags"):
variables["input"]["tags"] = blog_data["tags"]
if blog_data.get("canonical_url"):
variables["input"]["canonicalUrl"] = blog_data["canonical_url"]
if blog_data.get("cover_image"):
variables["input"]["coverImage"] = blog_data["cover_image"]
response = requests.post(
self.base_url,
json={"query": query, "variables": variables},
headers=headers,
)
try:
response.raise_for_status()
result = response.json()
if is_update:
post_id = result["data"]["updatePost"]["post"]["id"]
action = "updated"
url = result["data"]["updatePost"]["post"]["url"]
else:
post_id = result["data"]["publishPost"]["post"]["id"]
action = "published"
url = result["data"]["publishPost"]["post"]["url"]
self.logger.info(
f"Blog post {action}: {blog_data['title']} ({post_id}) at {url}"
)
return post_id
except requests.exceptions.HTTPError as e:
self.logger.error(f"HTTP Error: {response.text}")
raise
def sync_blog_files(self):
for filename in os.listdir(self.obsidian_folder):
if filename.endswith(".md"):
filepath = os.path.join(self.obsidian_folder, filename)
try:
blog_data = self._parse_markdown_file(filepath)
post_id = self.publish_to_hashnode(blog_data)
with open(filepath, "r", encoding="utf-8") as file:
post = frontmatter.load(file)
if not post.get("hashnode_post_id"):
post["hashnode_post_id"] = post_id
with open(filepath, "wb") as file:
frontmatter.dump(post, file)
except Exception as e:
self.logger.error(f"Error syncing {filepath}: {e}")
def main():
sync_manager = HashNodeBlogSync(
OBSIDIAN_BLOG_FOLDER,
HASHNODE_API_KEY,
HASHNODE_PUBLICATION_ID,
)
print(f"Starting Hashnode Blog Sync for {OBSIDIAN_BLOG_FOLDER}")
sync_manager.sync_blog_files()
print("Sync completed.")
if __name__ == "__main__":
main().md files in your Obsidian blog folder and extracts frontmatter metadatahashnode_post_id exists in the frontmatter, it updates the existing posthashnode_post_id back in the frontmatter for future updatesRather than fully automating the process by setting up a cron job to run the code at regular intervals, I opted for a more manual approach.
This gives you control over when to publish while still making the process as simple as running a single command.
Thank you for reading! I hope you found something new and interesting.