Building a Real-Time Binance Data Pipeline with Kafka and PostgreSQL
This project demonstrates a simple real-time data pipeline that streams live cryptocurrency prices from the Binance API, publishes them to a Kafka topic (hosted on Confluent), consumes them with a Kafka consumer, and stores the results into a PostgreSQL database (hosted on Aiven).
It’s a hands-on learning project for integrating streaming platforms with databases, ideal for practicing Data Engineering fundamentals.
Project Architecture
Producer (kafka-producer.py
)
- Connects to the Binance API.
- Publishes live price data to a Kafka topic (
binance
).
Consumer (kafka-consumer.py
)
- Subscribes to the Kafka topic.
- Parses each message.
- Inserts records into PostgreSQL.
PostgreSQL Database
- Hosted on Aiven.
- Stores parsed records for querying and analysis.
Setup
1. Environment Variables
Create a .env
file with your Kafka and PostgreSQL credentials:
# Kafka
BOOTSTRAP_SERVERS=pkc-xxxxx.confluent.cloud:9092
SECURITY_PROTOCOL=SASL_SSL
SASL_MECHANISM=PLAIN
SASL_USERNAME=<Confluent_API_Key>
SASL_PASSWORD=<Confluent_API_Secret>
TOPIC_NAME=binance
# Postgres (Aiven)
DBHOST=pg-xxxxxx.aivencloud.com
DBPORT=17154
DBNAME=defaultdb
DBUSER=avnadmin # IMPORTANT: must use the exact user from Aiven credentials
DBPASSWORD=<your_password>
Make sure to add .env
to .gitignore
so credentials aren’t pushed to GitHub. Example .gitignore
:
.env
__pycache__/
*.pyc
.vscode/
2. Installing Dependencies
For package management, the workflow was:
Install the dependencies:
pip install confluent-kafka psycopg2-binary python-dotenv requests
Save them into requirements.txt
:
pip freeze > requirements.txt
Example requirements.txt
:
confluent-kafka==2.5.3
psycopg2-binary==2.9.9
python-dotenv==1.0.1
requests==2.32.3
Re-install them anytime with:
pip install -r requirements.txt
This ensures anyone cloning the project can recreate the same environment easily.
3. Run the Pipeline
Start the producer:
python3 kafka-producer.py
Start the consumer:
python3 kafka-consumer.py
PostgreSQL Table Schema
The consumer script ensures the table exists. The schema is:
CREATE TABLE IF NOT EXISTS binance_24h (
symbol TEXT NOT NULL,
pricechange NUMERIC NOT NULL,
pricechangepercentage NUMERIC NOT NULL,
openprice NUMERIC NOT NULL,
closeprice NUMERIC NOT NULL,
highprice NUMERIC NOT NULL,
lowprice NUMERIC NOT NULL,
volume NUMERIC NOT NULL
);
Issues Encountered & Solutions
1. Authentication Failure (Password & User Mismatch)
- Issue:
FATAL: password authentication failed for user "dev_user"
-
Cause: Postgres on Aiven requires the exact generated username (
avnadmin
, etc.). UsingUSER
conflicted with the system$USER
variable. -
Solution:
- Hardcoded the correct username in
.env
. - Alternatively, renamed variable to
DBUSER
to avoid conflict with reserved names.
- Hardcoded the correct username in
2. Postgres Column Mismatch Error
- Issue:
column "price" of relation "binance_24h" does not exist
- Cause: Table schema did not match Binance API JSON keys.
-
Solution:
- Created a schema that exactly matched the API response fields (
priceChange
,priceChangePercent
, etc.). - Updated the SQL
INSERT
query to align column names with JSON keys.
- Created a schema that exactly matched the API response fields (
Lessons Learned
- Avoid using environment variable names like
USER
that clash with system defaults. - Schema consistency between producer → consumer → database is crucial.
- Use
pip freeze > requirements.txt
+pip install -r requirements.txt
for reproducible environments.
Next Steps
- Extend the pipeline to include more symbols from Binance.
- Add error handling and retries.
- Visualize the stored data in a dashboard (e.g., Power BI, Grafana).