Secure Token-based Authentication in Python with PyJWT
Token-based authentication is a widely-used approach for securing web applications, APIs, and other network services. It allows users to authenticate themselves by presenting a valid token, eliminating the need for traditional session-based authentication mechanisms.
In this article, we will explore how to quickly add secure token-based authentication to your Python projects using PyJWT, a Python implementation of the RFC 7519 specification for JSON Web Tokens.
Installation
To get started with PyJWT, you can simply install it using pip:
$ pip install PyJWT
Usage
Once PyJWT is installed, you can start using it to encode and decode JSON Web Tokens. Let’s take a look at a simple example:
import jwt
# Encode a payload into a JWT
encoded = jwt.encode({"some": "payload"}, "secret", algorithm="HS256")
print(encoded)
# Decode a JWT back into the payload
decoded = jwt.decode(encoded, "secret", algorithms=["HS256"])
print(decoded)
In this example, we encode a dictionary payload into a JSON Web Token using a secret key and the HS256 algorithm. We then decode the JWT back into the original payload using the same secret key and algorithm.
Example Implementations
Now that we have a basic understanding of how PyJWT works, let’s explore three example implementations that utilize PyJWT along with other popular Python packages.
1. Token-based Authentication in FastAPI
FastAPI is a modern, fast (high-performance), web framework for building APIs with Python. Combining FastAPI with PyJWT allows you to easily implement token-based authentication in your FastAPI projects.
Here’s an example of how you can use PyJWT and FastAPI to secure your API endpoints:
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
import jwt
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
# Verify token and authenticate user
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
user_id = payload.get("sub")
# Query your database or other data source to retrieve the user based on the user_id
user = get_user_from_database(user_id)
if user:
return user
except jwt.exceptions.DecodeError:
pass
raise HTTPException(status_code=401, detail="Invalid token")
# Protected route that requires authentication
@app.get("/secure")
async def secure_route(current_user: User = Depends(get_current_user)):
return {"message": "You have successfully accessed the protected route"}
# Generate a JWT for a user
@app.post("/token")
def get_token(username: str, password: str):
# Authenticate the user and generate a JWT
user = authenticate(username, password)
if user:
token = jwt.encode({"sub": user.id}, "secret", algorithm="HS256")
return {"access_token": token}
raise HTTPException(status_code=401, detail="Invalid credentials")
In this implementation, we define a /token
route that accepts a username and password, authenticates the user, and generates a JSON Web Token. The get_token
function handles this logic.
We also define a protected /secure
route that requires authentication. The get_current_user
function verifies the token and authenticates the user. If the token is valid and the user exists, the /secure
route will be accessible.
2. Token-based Authentication with SQLAlchemy
SQLAlchemy is a popular Python SQL toolkit and Object-Relational Mapping (ORM) library. By combining SQLAlchemy with PyJWT, you can add token-based authentication to your SQLAlchemy-powered applications.
Here’s an example of how you can use SQLAlchemy and PyJWT to implement token-based authentication in a SQLAlchemy application:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from pydantic import BaseModel
import jwt
Base = declarative_base()
engine = create_engine("sqlite:///database.db")
Session = sessionmaker(bind=engine)
session = Session()
# User model
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
password = Column(String)
# Token model
class Token(Base):
__tablename__ = "tokens"
id = Column(Integer, primary_key=True)
token = Column(String, unique=True)
user_id = Column(Integer)
# Verify token and authenticate user
def authenticate_token(token: str):
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
user_id = payload.get("sub")
# Query your database or other data source to retrieve the user based on the user_id
user = session.query(User).filter_by(id=user_id).first()
if user:
return user
except jwt.exceptions.DecodeError:
pass
raise HTTPException(status_code=401, detail="Invalid token")
# Generate and store a token for a user
def generate_token(user: User):
token = jwt.encode({"sub": user.id}, "secret", algorithm="HS256")
token_entry = Token(token=token, user_id=user.id)
session.add(token_entry)
session.commit()
return token
# Protect a route that requires authentication
@app.get("/secure")
def secure_route(token: str):
user = authenticate_token(token)
return {"message": "Hello, {}! You have successfully accessed the protected route".format(user.username)}
In this implementation, we define a User
model and a Token
model using SQLAlchemy. When a user logs in, we generate a JSON Web Token and store it in the tokens
table. The user_id
column in the tokens
table is used to associate the token with the user.
To protect a route that requires authentication, we decode the token and authenticate the user using the authenticate_token
function. If the token is valid and the user exists, the /secure
route will be accessible.
3. Token Revocation with Redis
Redis is an open-source, in-memory data structure store that can be used as a database, cache, and message broker. By integrating Redis with PyJWT, you can implement token revocation, allowing you to invalidate tokens and enforce logout functionality.
Here’s an example of how you can use Redis and PyJWT to implement token revocation in your Python applications:
import redis
import jwt
# Connect to Redis
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)
# Generate a JWT for a user
def generate_token(user_id: int):
token = jwt.encode({"sub": user_id}, "secret", algorithm="HS256")
redis_client.set(token, 1)
return token
# Verify token and check if it is revoked
def is_token_revoked(token: str):
return redis_client.exists(token) == 1
In this implementation, we connect to a Redis instance using the redis
package. When a user logs in, we generate a JSON Web Token and store it as a key in Redis using the redis_client.set
method.
To check if a token is revoked, we use the redis_client.exists
method to see if the token exists as a key in Redis. If the token exists, it means the token is revoked and should be considered invalid.
Conclusion
In this article, we have explored how to implement secure token-based authentication in Python using PyJWT. We covered three examples that demonstrate how PyJWT can be used with popular Python packages like FastAPI, SQLAlchemy, and Redis.
Token-based authentication offers a secure and scalable way to authenticate users in web applications and APIs. By leveraging PyJWT and other Python packages, developers can easily implement token-based authentication in their projects and provide a seamless and secure user experience.
Leave a Reply