Building a Real-Time Binance Data Pipeline with Kafka and PostgreSQL

This project demonstrates a simple real-time data pipeline that streams live cryptocurrency prices from the Binance API, publishes them to a Kafka topic (hosted on Confluent), consumes them with a Kafka consumer, and stores the results into a PostgreSQL database (hosted on Aiven).

It’s a hands-on learning project for integrating streaming platforms with databases, ideal for practicing Data Engineering fundamentals.

Project Architecture

Producer (kafka-producer.py)

  • Connects to the Binance API.
  • Publishes live price data to a Kafka topic (binance).

Consumer (kafka-consumer.py)

  • Subscribes to the Kafka topic.
  • Parses each message.
  • Inserts records into PostgreSQL.

PostgreSQL Database

  • Hosted on Aiven.
  • Stores parsed records for querying and analysis.

Setup

1. Environment Variables

Create a .env file with your Kafka and PostgreSQL credentials:

# Kafka
BOOTSTRAP_SERVERS=pkc-xxxxx.confluent.cloud:9092
SECURITY_PROTOCOL=SASL_SSL
SASL_MECHANISM=PLAIN
SASL_USERNAME=<Confluent_API_Key>
SASL_PASSWORD=<Confluent_API_Secret>
TOPIC_NAME=binance

# Postgres (Aiven)
DBHOST=pg-xxxxxx.aivencloud.com
DBPORT=17154
DBNAME=defaultdb
DBUSER=avnadmin       # IMPORTANT: must use the exact user from Aiven credentials
DBPASSWORD=<your_password>

Make sure to add .env to .gitignore so credentials aren’t pushed to GitHub. Example .gitignore:

.env
__pycache__/
*.pyc
.vscode/

2. Installing Dependencies

For package management, the workflow was:

Install the dependencies:

   pip install confluent-kafka psycopg2-binary python-dotenv requests

Save them into requirements.txt:

   pip freeze > requirements.txt

Example requirements.txt:

   confluent-kafka==2.5.3
   psycopg2-binary==2.9.9
   python-dotenv==1.0.1
   requests==2.32.3

Re-install them anytime with:

   pip install -r requirements.txt

This ensures anyone cloning the project can recreate the same environment easily.

3. Run the Pipeline

Start the producer:

python3 kafka-producer.py

Start the consumer:

python3 kafka-consumer.py

PostgreSQL Table Schema

The consumer script ensures the table exists. The schema is:

CREATE TABLE IF NOT EXISTS binance_24h (
    symbol                  TEXT    NOT NULL,
    pricechange             NUMERIC NOT NULL,
    pricechangepercentage   NUMERIC NOT NULL,
    openprice               NUMERIC NOT NULL,
    closeprice              NUMERIC NOT NULL,
    highprice               NUMERIC NOT NULL,
    lowprice                NUMERIC NOT NULL,
    volume                  NUMERIC NOT NULL
);

Issues Encountered & Solutions

1. Authentication Failure (Password & User Mismatch)

  • Issue:
  FATAL: password authentication failed for user "dev_user"
  • Cause: Postgres on Aiven requires the exact generated username (avnadmin, etc.). Using USER conflicted with the system $USER variable.
  • Solution:

    • Hardcoded the correct username in .env.
    • Alternatively, renamed variable to DBUSER to avoid conflict with reserved names.

2. Postgres Column Mismatch Error

  • Issue:
  column "price" of relation "binance_24h" does not exist
  • Cause: Table schema did not match Binance API JSON keys.
  • Solution:

    • Created a schema that exactly matched the API response fields (priceChange, priceChangePercent, etc.).
    • Updated the SQL INSERT query to align column names with JSON keys.

Lessons Learned

  • Avoid using environment variable names like USER that clash with system defaults.
  • Schema consistency between producer → consumer → database is crucial.
  • Use pip freeze > requirements.txt + pip install -r requirements.txt for reproducible environments.

Next Steps

  • Extend the pipeline to include more symbols from Binance.
  • Add error handling and retries.
  • Visualize the stored data in a dashboard (e.g., Power BI, Grafana).

Similar Posts