FastAPI walkthrough

FastAPI

FastAPI

FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.7+ based on standard Python type hints.

Features:

  • Fast
  • Fast to Code
  • Fewer bugs
  • Intuitive
  • Easy
  • Short
  • Robust
  • Standard-based
    • OpenAPI
    • JSON Schema

Requirements

  • Python 3.7 +
  • Starlette
  • Pydantic

Installation

pip3 install -U pip "fastapi[all]" ujson email-validator orjson "uvicorn[standard]"
❯ pip freeze
anyio==3.6.2
click==8.1.3
fastapi==0.88.0
h11==0.14.0
httptools==0.5.0
idna==3.4
pydantic==1.10.2
python-dotenv==0.21.0
PyYAML==6.0
sniffio==1.3.0
starlette==0.22.0
typing_extensions==4.4.0
uvicorn==0.20.0
uvloop==0.17.0
watchfiles==0.18.1
websockets==10.4

Example

Create it

from typing import Union
from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def read_root_sync():
    return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item_sync(item_id: int, q: Union[str, None] = None):
    """
    Read a single item synchronously
    :param item_id: int
    :param q: an optional string query parameter
    :return:
    """
    return {"item_id": item_id, "q": q}

CleanShot 2022-12-01 at 18.51.42

Example Upgrade

3. Python Types

def get_full_name(first_name: str, last_name: str) -> str:
    full_name = first_name.title() + ' ' + last_name.title()
    return full_name


print(get_full_name("john", "doe"))

CleanShot 2022-12-01 at 19.19.30

We are using colons (:), not equals (=).

Simple Types

You can use, for example:

  • int
  • float
  • bool
  • bytes
def get_items(item_a: str, item_b: int, item_c: float, item_d: bool, item_e: bytes):
    return item_a, item_b, item_c, item_d, item_d, item_e

List

def process_items(items: list[str]):
    for item in items:
        print(item)

Those internal types in the square brackets are called “type parameters”.

In this case, str is the type parameter passed to list

Tuple and Set

def process_items(items_t: tuple[int, int, str], items_s: set[bytes]):
    return items_t, items_s

This means:

  • The variable items_t is a tuple with 3 items, an int, another int, and a str.
  • The variable items_s is a set, and each of its items is of type bytes.

Dict

To define a dict, you pass 2 type parameters, separated by commas.

The first type parameter is for the keys of the dict.

The second type parameter is for the values of the dict:

def process_items(prices: dict[str, float]):
    for item_name, item_price in prices.items():
        print(item_name)
        print(item_price)

Union

You can declare that a variable can be any of several types, for example, an int or a str.

In Python 3.10 there’s also an alternative syntax where you can put the possible types separated by a vertical bar (|).

def process_item(item: int | str):
    print(item)

Possibly None

You can declare that a value could have a type, like str, but that it could also be None.

def say_hi(name: str | None = None):
    if name is not None:
        print(f"Hey {name}!")
    else:
        print("Hello World")

Generic Types

These types that take type parameters in square brackets are called Generic types or Generics, for example:

You can use the same builtin types as generics (with square brackets and types inside):

  • list
  • tuple
  • set
  • dict

you can use the vertical bar (|) to declare unions of types.

Classes as Types

You can also declare a class as the type of a variable.

class Person:
    def __init__(self, name: str):
        self.name = name


def get_person_name(one_person: Person) -> str:
    return one_person.name


p1 = Person(name="Trump")

print(get_person_name(p1))

Pydantic Models

Pydantic is a Python library to perform data validation.

You declare the “shape” of the data as classes with attributes.

And each attribute has a type.

from datetime import datetime

from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = "John Smith"
    signup_ts: datetime | None = None
    friends: list[int] = []


external_data = {
    "id": "123",
    "signup_ts": "2017-06-01 12:22",
    "friends": [1, "2", b"3"],
}

user = User(**external_data)
print(user)

print(user.id)

CleanShot 2022-12-01 at 19.33.36

First Steps

When building APIs, you normally use these specific HTTP methods to perform a specific action.

Normally you use:

  • POST: to create data.
  • GET: to read data.
  • PUT: to update data.
  • DELETE: to delete data.

So, in OpenAPI, each of the HTTP methods is called an “operation”.

We are going to call them “operations” too.

Path parameters

Order matters

Like /users/me, let’s say that it’s to get data about the current user.

And then you can also have a path /users/{user_id} to get data about a specific user by some user ID.

Because path operations are evaluated in order, you need to make sure that the path for /users/me is declared before the one for /users/{user_id}:

Predefined values

class ModelName(str, Enum):
    alexnet = "alexnet"
    resnet = 'resnet'
    lenet = 'lenet'


@app.get("/models/{model_name}")
async def read_model(model_name: ModelName):
    if model_name is ModelName.alexnet:
        return {"model_name": model_name, "message": "Deep Learning FTW"}
    elif model_name.value == 'lenet':
        return {"model_name": model_name, "message": "LeCNN all the images"}
    return {"model_name": model_name, "message": "Have some residuals"}

Query Parameters

When you declare other function parameters that are not part of the path parameters, they are automatically interpreted as “query” parameters.

fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]


@app.get("/items/")
async def read_items_query(skip: int = 0, limit: int = 10):
    return fake_items_db[skip:skip + limit]

The query is the set of key-value pairs that go after the ? in a URL, separated by & characters.

For example, in the URL:

http://127.0.0.1:8000/items/?skip=0&limit=10

Defaults of query parameters

As query parameters are not a fixed part of a path, they can be optional and can have default values.

The same way, you can declare optional query parameters, by setting their default to None:

fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]


@app.get("/items/")
async def read_items_query(skip: int = 0, limit: int = 10):
    return fake_items_db[skip:skip + limit]


@app.get("/items_query/{item_id}")
async def read_item_query(item_id: int, q: str | None = None):
    if q:
        return {"item_id": item_id, "q": q}
    return {"item_id": item_id}

Multiple path and query parameters

You can declare multiple path parameters and query parameters at the same time, FastAPI knows which is which.

And you don’t have to declare them in any specific order.


@app.get("/users/{user_id}/items/{item_id}")
async def read_user_item(user_id: int, item_id: int, q: str | None = None, short: bool = False):
    item = {"item_id": item_id, "owner_id": user_id}
    if q:
        item.update({"q": q})
    if not short:
        item.update(
            {"description": "This is an amazing item that has a long description"}
        )
    return item

Required query parameters

But when you want to make a query parameter required, you can just not declare any default value:

Request Body

When you need to send data from a client (let’s say, a browser) to your API, you send it as a request body.

A request body is data sent by the client to your API. A response body is the data your API sends to the client.

Your API almost always has to send a response body. But clients don’t necessarily need to send request bodies all the time.

class Item2(BaseModel):
    name: str
    price: float
    description: str | None = None
    tax: float | None = None


@app.post("/items2/")
async def create_item(item: Item2):
    return item

when a model attribute has a default value, it is not required. Otherwise, it is required. Use None to make it just optional.

{
    "name": "Foo",
    "description": "An optional description",
    "price": 45.2,
    "tax": 3.5
}
@app.post("/items2/")
async def create_item(item: Item2):
    item_dict = item.dict()
    if item.tax:
        price_with_tax = item.price + item.tax
        item_dict.update({"price_with_tax": price_with_tax})
    return item_dict

Request body + path parameters

@app.put("items2/{item_id}")
async def create_item2(item_id: int, item: Item2):
    return {"item_id": item_id, **item.dict()}

Request body + path + query parameters

@app.put("/items22/{item_id}")
async def update_item2(item_id: int, item: Item2, q: str | None = None):
    result = {"item_id": item_id, **item.dict()}
    if q:
        result.update({"q": q})
    return result
  • If the parameter is also declared in the path, it will be used as a path parameter.
  • If the parameter is of a singular type (like int, float, str, bool, etc) it will be interpreted as a query parameter.
  • If the parameter is declared to be of the type of a Pydantic model, it will be interpreted as a request body.

Query Parameters and String Validators

FastAPI allows you to declare additional information and validation for your parameters.

Optional

@app.get("/items-optional/")
async def read_items_optional(q: str | None = None):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

Additional Validation

We are going to enforce that even though q is optional, whenever it is provided, its length doesn’t exceed 50 characters.

@app.get("/items-optional/")
async def read_items_optional(q: str | None = Query(default=None, max_length=50)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

Add more validations

You can also add a parameter min_length:

@app.get("/items-optional/")
async def read_items_optional(q: str | None = Query(default=None, min_length=3, max_length=50)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

CleanShot 2022-12-02 at 10.57.24

Add regular expressions

@app.get("/items-optional/")
async def read_items_optional(q: str | None = Query(default=None, min_length=3, max_length=50, regex="^fixedquery$")):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

Required

@app.get("/items-required/")
async def read_items_optional(q: str = Query(min_length=3, max_length=50, regex="^fixedquery$")):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

Required with Ellipsis ()

@app.get("/items-required-ellipsis/")
async def read_items_optional(q: str = Query(default=..., max_length=5)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

... : it is a special single value, it is part of Python and is called “Ellipsis”.

It is used by Pydantic and FastAPI to explicitly declare that a value is required.

Required with None

You can declare that a parameter can accept None, but that it’s still required. This would force clients to send a value, even if the value is None.

To do that, you can declare that None is a valid type but still use default=...:

@app.get("/items/")
async def read_items(q: str | None = Query(default=..., min_length=3)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

Pydantic Required, instead of Ellipsis(…)

from pydantic import Required
@app.get("/items/")
async def read_items(q: str = Query(default=Required, min_length=3)):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

Query parameter list / multiple value

@app.get("/items-query-list")
async def read_items_query_list(q: list[str] | None = Query(default=None)):
    query_items = {"q": q}
    return query_items

To declare a query parameter with a type of list, like in the example above, you need to explicitly use Query, otherwise it would be interpreted as a request body.

Query parameter list / multiple values with defaults

@app.get("/items/")
async def read_items(q: list[str] = Query(default=["foo", "bar"])):
    query_items = {"q": q}
    return query_items

using list

@app.get("/items/")
async def read_items(q: list = Query(default=[])):
    query_items = {"q": q}
    return query_items

Declare more metadata

@app.get("/items-query-metadata")
async def read_items_query_metadata(
        q: str | None = Query(
            default=None,
            title="Query String Title",
            description="Query String Description",
            min_length=3, )
):
    results = {"items": [{"item_id": "foo"}, {"item_id": "bar"}]}
    if q:
        results.update({"q": q})
    return results

Alias parameters

Imagine that you want the parameter to be item-query.

Like in:

http://127.0.0.1:8000/items/?item-query=foobaritems

But item-query is not a valid Python variable name.

@app.get("/items-query-alias")
async def read_items_query_alias(q: str | None = Query(default=None, alias="item-query")):
    results = {"items": [{"item_id": "foo"}, {"item_id": "bar"}]}
    if q:
        results.update({"q": q})
    return results

CleanShot 2022-12-02 at 11.20.23

Deprecating parameters

@app.get("/items/")
async def read_items(
    q: str
    | None = Query(
        default=None,
        alias="item-query",
        title="Query string",
        description="Query string for the items to search in the database that have a good match",
        min_length=3,
        max_length=50,
        regex="^fixedquery$",
        deprecated=True,
    )
):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

Exclude from OpenAPI and automcatic document system

from fastapi import FastAPI, Query

app = FastAPI()


@app.get("/items/")
async def read_items(
    hidden_query: str | None = Query(default=None, include_in_schema=False)
):
    if hidden_query:
        return {"hidden_query": hidden_query}
    else:
        return {"hidden_query": "Not found"}

Path Parameters and numeric validators

@app.get("/items-path-parameters/{item_id}")
async def read_items_path_parameters(
        item_id: int = Path(title="the ID of the item to get"),
        q: str | None = Query(default=None, alias="item-query")):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results

Order the parameters as you need, tricks

If you want to declare the q query parameter without a Query nor any default value, and the path parameter item_id using Path, and have them in a different order, Python has a little special syntax for that.

Pass *, as the first parameter of the function.

Python won’t do anything with that *, but it will know that all the following parameters should be called as keyword arguments (key-value pairs), also known as kwargs. Even if they don’t have a default value.

@app.get("/items/{item_id}")
async def read_items(*, item_id: int = Path(title="The ID of the item to get"), q: str):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results

Number validations

greater than or equal

@app.get("/items/{item_id}")
async def read_items(
    *, item_id: int = Path(title="The ID of the item to get", ge=1), q: str
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results

greater than and less than or equal

from fastapi import FastAPI, Path

app = FastAPI()


@app.get("/items/{item_id}")
async def read_items(
    *,
    item_id: int = Path(title="The ID of the item to get", gt=0, le=1000),
    q: str,
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results

floats, greater than and less than

@app.get("/items/{item_id}")
async def read_items(
    *,
    item_id: int = Path(title="The ID of the item to get", ge=0, le=1000),
    q: str,
    size: float = Query(gt=0, lt=10.5)
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results

Body - Multiple Parameters

Mix Path, Query and body parameters

you can also declare body parameters as optional, by setting the default to None:

class Item3(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None


@app.put("/items-body/{item_id}")
async def update_item3(
        *,  # count the parameters as key word parameters
        item_id: int = Path(title="the ID of the item to update", ge=0, le=1000),
        q: str | None = None,
        item: Item3 | None = None,
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    if item:
        results.update({"item": item})
    return results

Multiple body parameters

class User3(BaseModel):
    username: str
    full_name: str | None = None


@app.put("/items-body2/{item_id}")
async def update_item3(
        *,  # count the parameters as key word parameters
        item_id: int = Path(title="the ID of the item to update", ge=0, le=1000),
        q: str | None = None,
        item: Item3,
        user: User3,
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    if item:
        results.update({"item": item})
    return results

Singular values in body

@app.put("/items-importance/{item_id}")
async def update_item(item_id: int, item: Item3, user: User3, importance: int = Body()):
    results = {"item_id": item_id, "item": item, "user": user, "importance": importance}
    return results

Multiple body params and query

@app.put("/items/{item_id}")
async def update_item(
    *,
    item_id: int,
    item: Item,
    user: User,
    importance: int = Body(gt=0),
    q: str | None = None
):
    results = {"item_id": item_id, "item": item, "user": user, "importance": importance}
    if q:
        results.update({"q": q})
    return results

Embed a single body parameter

Let’s say you only have a single item body parameter from a Pydantic model Item.

{
    "name": "Foo",
    "description": "The pretender",
    "price": 42.0,
    "tax": 3.2
}

But if you want it to expect a JSON with a key item and inside of it the model contents, as it does when you declare extra body parameters, you can use the special Body parameter embed:

@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item = Body(embed=True)):
    results = {"item_id": item_id, "item": item}
    return results

In this case FastAPI will expect a body like:

{
    "item": {
        "name": "Foo",
        "description": "The pretender",
        "price": 42.0,
        "tax": 3.2
    }
}

Body Fields

you can declare validation and metadata inside of Pydantic models using Pydantic’s Field.

class Item4(BaseModel):
    name: str
    description: str | None = Field(
        default=None,
        title="The description of the item",
        max_length=300
    )
    price: float
    tax: float | None = None

Declare model attributes

You can then use Field with model attributes:

class Item4(BaseModel):
    name: str
    description: str | None = Field(
        default=None,
        title="The description of the item",
        max_length=300
    )
    price: float = Field(gt=0, description="The price must be greater than 0")
    tax: float | None = None


@app.put("/items4/{item_id}")
async def update_item4(item_id: int, item: Item4 = Body(embed=True)):
    results = {"item_id": item_id, "item": item}
    return results

Body - Nested Models

With FastAPI, you can define, validate, document, and use arbitrarily deeply nested models (thanks to Pydantic).

List Fields

You can define an attribute to be a subtype. For example, a Python list:

class Item5(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None
    tags: list = []


@app.put("/items5/{item_id}")
async def update_item(item_id: int, item: Item5):
    results = {"item_id": item_id, "item": item}
    return results

Declare a list with a type parameter

class Item(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None
    tags: list[str] = []

Set types

But then we think about it, and realize that tags shouldn’t repeat, they would probably be unique strings.

And Python has a special data type for sets of unique items, the set.

class Item(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None
    tags: set[str] = set()
// request body
{
  "name": "string",
  "description": "string",
  "price": 0,
  "tax": 0,
  "tags": [1,2,3,1]
}
// response body
{
  "item_id": 5,
  "item": {
    "name": "string",
    "description": "string",
    "price": 0,
    "tax": 0,
    "tags": [
      "2",
      "1",
      "3"
    ]
  }
}

Nested Models

Each attribute of a Pydantic model has a type.

But that type can itself be another Pydantic model.

So, you can declare deeply nested JSON “objects” with specific attribute names, types and validations.

All that, arbitrarily nested.

class Image(BaseModel):
    url: str
    name: str


class Item6(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None
    tags: set[str] = set()
    image: Image | None = None

Special types and validation

For example, as in the Image model we have a url field, we can declare it to be instead of a str, a Pydantic’s HttpUrl:

from pydantic import BaseModel, HttpUrl

class Image(BaseModel):
    url: HttpUrl
    name: str
{
  "name": "string",
  "description": "string",
  "price": 0,
  "tax": 0,
  "tags": [],
  "image": {
    "url": "https",
    "name": "string"
  }
}
{
  "detail": [
    {
      "loc": [
        "body",
        "image",
        "url"
      ],
      "msg": "invalid or missing URL scheme",
      "type": "value_error.url.scheme"
    }
  ]
}

Attributes with lists of submodels

You can also use Pydantic models as subtypes of list, set, etc:

class Item7(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None
    tags: set[str] = set()
    images: list[Image] | None = None

Deeply nested models

class Offer(BaseModel):
    name: str
    description: str | None = None
    price: float
    items: list[Item]

Item nested images, and Offer nested Item

Bodies of pure lists

If the top level value of the JSON body you expect is a JSON array (a Python list), you can declare the type in the parameter of the function, the same as in Pydantic models:

@app.post("/images/multiple/")
async def create_multiple_images(images: list[Image]):
    return images

Bodies of arbitrary dicts

You can also declare a body as a dict with keys of some type and values of other type.

Without having to know beforehand what are the valid field/attribute names (as would be the case with Pydantic models).

This would be useful if you want to receive keys that you don’t already know.

Other useful case is when you want to have keys of other type, e.g. int.

That’s what we are going to see here.

In this case, you would accept any dict as long as it has int keys with float values:

@app.post("/index-weights/")
async def create_index_weights(weights: dict[int, float]):
    return weights
{ "1" : 3.44 }
curl -X 'POST' \
  'http://127.0.0.1:8000/index-weights/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "1": 3.14,
}'

Declare Request Example Data

You can declare examples of the data your app can receive.

Pydantic schema_extra

class Item8(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None

    class Config:
        schema_extra = {
            "example": {
                "name": "example",
                "description": "example description",
                "price": 10.0,
                "tax": 3.34
            }
        }


@app.put("/items8/{item_id}")
async def update_item8(item_id: int, item: Item8):
    results = {"item_id": item_id, "item": item}
    return results

CleanShot 2022-12-04 at 13.15.15

Field additional arguments

When using Field() with Pydantic models, you can also declare extra info for the JSON Schema by passing any other arbitrary arguments to the function.

You can use this to add example for each field:

from pydantic import BaseModel, Field

class Item9(BaseModel):
    name: str = Field(example="Foo")
    description: str | None = Field(default=None, example="A very nice Item")
    price: float = Field(example=35.4)
    tax: float | None = Field(default=None, example=3.2)

CleanShot 2022-12-04 at 13.17.03

example and examples in OpenAPI

When using any of:

  • Path()
  • Query()
  • Header()
  • Cookie()
  • Body()
  • Form()
  • File()

you can also declare a data example or a group of examples with additional information that will be added to OpenAPI.

Body with example

@app.put("/items/{item_id}")
async def update_item(
    item_id: int,
    item: Item = Body(
        example={
            "name": "Foo",
            "description": "A very nice Item",
            "price": 35.4,
            "tax": 3.2,
        },
    ),
):
    results = {"item_id": item_id, "item": item}
    return results

Body with multiple examples

Alternatively to the single example, you can pass examples using a dict with multiple examples, each with extra information that will be added to OpenAPI too.

@app.put("/items/{item_id}")
async def update_item(
    *,
    item_id: int,
    item: Item = Body(
        examples={
            "normal": {
                "summary": "A normal example",
                "description": "A **normal** item works correctly.",
                "value": {
                    "name": "Foo",
                    "description": "A very nice Item",
                    "price": 35.4,
                    "tax": 3.2,
                },
            },
            "converted": {
                "summary": "An example with converted data",
                "description": "FastAPI can convert price `strings` to actual `numbers` automatically",
                "value": {
                    "name": "Bar",
                    "price": "35.4",
                },
            },
            "invalid": {
                "summary": "Invalid data is rejected with an error",
                "value": {
                    "name": "Baz",
                    "price": "thirty five point four",
                },
            },
        },
    ),
):
    results = {"item_id": item_id, "item": item}
    return results

img

Extra Data Types

common data types, like:

  • int
  • float
  • str
  • bool

Other data types

Here are some of the additional data types you can use:

  • UUID
    

    :

    • A standard “Universally Unique Identifier”, common as an ID in many databases and systems.
    • In requests and responses will be represented as a str.
  • datetime.datetime
    

    :

    • A Python datetime.datetime.
    • In requests and responses will be represented as a str in ISO 8601 format, like: 2008-09-15T15:53:00+05:00.
  • datetime.date
    

    :

    • Python datetime.date.
    • In requests and responses will be represented as a str in ISO 8601 format, like: 2008-09-15.
  • datetime.time
    

    :

    • A Python datetime.time.
    • In requests and responses will be represented as a str in ISO 8601 format, like: 14:23:55.003.
  • datetime.timedelta
    

    :

    • A Python datetime.timedelta.
    • In requests and responses will be represented as a float of total seconds.
    • Pydantic also allows representing it as a “ISO 8601 time diff encoding”, see the docs for more info.
  • frozenset
    

    :

    • In requests and responses, treated the same as a

      set
      

      :

      • In requests, a list will be read, eliminating duplicates and converting it to a set.
      • In responses, the set will be converted to a list.
      • The generated schema will specify that the set values are unique (using JSON Schema’s uniqueItems).
  • bytes
    

    :

    • Standard Python bytes.
    • In requests and responses will be treated as str.
    • The generated schema will specify that it’s a str with binary “format”.
  • Decimal
    

    :

    • Standard Python Decimal.
    • In requests and responses, handled the same as a float.
  • You can check all the valid pydantic data types here: Pydantic data types.
@app.put("/items10/{item_id}")
async def read_items(
        item_id: UUID,
        start_datetime: datetime | None = Body(default=None),
        end_datetime: datetime | None = Body(default=None),
        repeat_at: time | None = Body(default=None),
        process_after: timedelta | None = Body(default=None),
):
    start_process = start_datetime + process_after
    duration = end_datetime - start_process
    return {
        "item_id": item_id,
        "start_datetime": start_datetime,
        "end_datetime": end_datetime,
        "repeat_at": repeat_at,
        "process_after": process_after,
        "start_process": start_process,
        "duration": duration,
    }

You can define Cookie parameters the same way you define Query and Path parameters.

@app.get("/items11/")
async def read_items11(ads_id: str | None = Cookie(default=None)):
    print(ads_id)
    return {"ads_id": ads_id}

CleanShot 2022-12-05 at 18.39.02

Header Parameters

You can define Header parameters the same way you define Query, Path and Cookie parameters.

@app.get("/header/")
async def read_header(user_agent: str | None = Header(default=None)):
    return {"User-Agent": user_agent}
{
  "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36"
}

Header has a little extra functionality on top of what Path, Query and Cookie provide.

Most of the standard headers are separated by a “hyphen” character, also known as the “minus symbol” (-).

But a variable like user-agent is invalid in Python.

So, by default, Header will convert the parameter names characters from underscore (_) to hyphen (-) to extract and document the headers.

Also, HTTP headers are case-insensitive, so, you can declare them with standard Python style (also known as “snake_case”).

So, you can use user_agent as you normally would in Python code, instead of needing to capitalize the first letters as User_Agent or something similar.

It is possible to receive duplicate headers. That means, the same header with multiple values.

You can define those cases using a list in the type declaration.

You will receive all the values from the duplicate header as a Python list.

@app.get("/header/")
async def read_header(user_agent: str | None = Header(default=None), x_token: str | None = Header(default=None)):
    return {"User-Agent": user_agent, "X-Token Values": x_token}

Response Model

You can declare the model used for the response with the parameter response_model in any of the path operations:

  • @app.get()
  • @app.post()
  • @app.put()
  • @app.delete()

Notice that response_model is a parameter of the “decorator” method (get, post, etc). Not of your path operation function, like all the parameters and body.

@app.get("/response_body/", response_model=Item8)
async def read_response_body():
    return {"item_id": 1, "name": "Foo", "description": "A very nice Item", "price": 10.0, "tax": 3.34}


class UserIn(BaseModel):
    username: str
    email: EmailStr
    password: SecretStr
    full_name: str | None = None


class UserOut(BaseModel):
    username: str
    email: EmailStr
    full_name: str | None = None


@app.post("/users/", response_model=UserOut)
async def create_user(user: UserIn):
    return user
class Item(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float = 10.5


items = {
    "foo": {"name": "Foo", "price": 50.2},
    "bar": {"name": "Bar", "description": "The Bar fighters", "price": 62, "tax": 20.2},
    "baz": {
        "name": "Baz",
        "description": "There goes my baz",
        "price": 50.2,
        "tax": 10.5,
    },
}


@app.get(
    "/items/{item_id}/name",
    response_model=Item,
    response_model_include={"name", "description"},
)
async def read_item_name(item_id: str):
    return items[item_id]


@app.get("/items/{item_id}/public", response_model=Item, response_model_exclude={"tax"})
async def read_item_public_data(item_id: str):
    return items[item_id]
@app.get("/items/{item_id}", response_model=Item, response_model_exclude_unset=True)
async def read_item(item_id: str):
    return items[item_id]

You can also use:

  • response_model_exclude_defaults=True
  • response_model_exclude_none=True

as described in the Pydantic docs for exclude_defaults and exclude_none.

Extra Models

Continuing with the previous example, it will be common to have more than one related model.

This is especially the case for user models, because:

  • The input model needs to be able to have a password.
  • The output model should not have a password.
  • The database model would probably need to have a hashed password.
class UserBase(BaseModel):
    username: str
    email: EmailStr
    full_name: str | None = None


class UserIn(UserBase):
    password: str


class UserOut(UserBase):
    pass


class UserInDB(UserBase):
    hashed_password: str


def fake_password_hasher(raw_password: str):
    return "supersecret" + raw_password


def fake_save_user(user_in: UserIn):
    hashed_password = fake_password_hasher(user_in.password)
    user_in_db = UserInDB(**user_in.dict(), hashed_password=hashed_password)
    print("User saved! ..not really")
    return user_in_db


@app.post("/users-extra/", response_model=UserOut)
async def create_user(user_in: UserIn):
    user_saved = fake_save_user(user_in)
    print(user_saved)
    return user_saved
curl -X 'POST' \
  'http://127.0.0.1:8000/users-extra/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "username": "string",
  "email": "[email protected]",
  "full_name": "string",
  "password": "string"
}'

response

{
  "username": "string",
  "email": "[email protected]",
  "full_name": "string"
}

CleanShot 2022-12-05 at 20.23.41

Union or anyOf

You can declare a response to be the Union of two types, that means, that the response would be any of the two.

It will be defined in OpenAPI with anyOf.

class BaseItem(BaseModel):
    description: str
    type: str


class CarItem(BaseItem):
    type = "car"


class PlaneItem(BaseItem):
    type = "plane"
    size: int


items = {
    "item1": {"description": "All my friends drive a low rider", "type": "car"},
    "item2": {
        "description": "Music is my aeroplane, it's my aeroplane",
        "type": "plane",
        "size": 5,
    },
}


@app.get("/items/{item_id}", response_model=Union[PlaneItem, CarItem])
async def read_item(item_id: str):
    return items[item_id]

List of models

@app.get("/items/", response_model=list[Item])
async def read_items():
    return items

Response with arbitrary dict

@app.get("/keyword-weights/", response_model=dict[str, float])
async def read_keyword_weights():
    return {"foo": 2.3, "bar": 3.4}

Response Status Code

@app.post("/items/", status_code=201)
async def create_item(name: str):
    return {"name": name}

In short:

  • 100 and above are for “Information”. You rarely use them directly. Responses with these status codes cannot have a body.

  • 200

    and above are for “Successful” responses. These are the ones you would use the most.

    • 200 is the default status code, which means everything was “OK”.
    • Another example would be 201, “Created”. It is commonly used after creating a new record in the database.
    • A special case is 204, “No Content”. This response is used when there is no content to return to the client, and so the response must not have a body.
  • 300 and above are for “Redirection”. Responses with these status codes may or may not have a body, except for 304, “Not Modified”, which must not have one.

  • 400

    and above are for “Client error” responses. These are the second type you would probably use the most.

    • An example is 404, for a “Not Found” response.
    • For generic errors from the client, you can just use 400.
  • 500 and above are for server errors. You almost never use them directly. When something goes wrong at some part in your application code, or server, it will automatically return one of these status codes.

Form Data

When you need to receive form fields instead of JSON, you can use Form.

To use forms, first install python-multipart.

E.g. pip install python-multipart.

For example, in one of the ways the OAuth2 specification can be used (called “password flow”) it is required to send a username and password as form fields.

The spec requires the fields to be exactly named username and password, and to be sent as form fields, not JSON.

Data from forms is normally encoded using the “media type” application/x-www-form-urlencoded.

But when the form includes files, it is encoded as multipart/form-data. You’ll read about handling files in the next chapter.

@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
    return {"username": username}

Request Files

pip install python-multipart
@app.post("/files/")
async def create_file(file: bytes = File()):
    # whole content will be saved in the memory as bytes
    print(file)
    return {"file_size": len(file)}


@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
    contents = await file.read()
    print(contents)
    return {"filename": file.filename, "content_type": file.content_type, "file": file}
{
  "filename": "CleanShot 2022-12-05 at 10.32.57.png",
  "content_type": "image/png",
  "file": {
    "filename": "CleanShot 2022-12-05 at 10.32.57.png",
    "content_type": "image/png",
    "file": {},
    "headers": {
      "content-disposition": "form-data; name=\"file\"; filename=\"CleanShot 2022-12-05 at 10.32.57.png\"",
      "content-type": "image/png"
    }
  }
}

UploadFile¶ UploadFile has the following attributes:

filename: A str with the original file name that was uploaded (e.g. myimage.jpg). content_type: A str with the content type (MIME type / media type) (e.g. image/jpeg). file: A SpooledTemporaryFile (a file-like object). This is the actual Python file that you can pass directly to other functions or libraries that expect a “file-like” object. UploadFile has the following async methods. They all call the corresponding file methods underneath (using the internal SpooledTemporaryFile).

write(data): Writes data (str or bytes) to the file. read(size): Reads size (int) bytes/characters of the file. seek(offset): Goes to the byte position offset (int) in the file. E.g., await myfile.seek(0) would go to the start of the file. This is especially useful if you run await myfile.read() once and then need to read the contents again. close(): Closes the file. As all these methods are async methods, you need to “await” them.

For example, inside of an async path operation function you can get the contents with:

contents = await myfile.read() If you are inside of a normal def path operation function, you can access the UploadFile.file directly, for example:

contents = myfile.file.read()

Optional File Upload

@app.post("/files/")
async def create_file(file: bytes | None = File(default=None, description="A file read as bytes")):
    # whole content will be saved in the memory as bytes
    if not file:
        return {"message": "No file sent"}
    else:
        return {"file_size": len(file)}


@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile | None = File(default=None, description="A file read as UploadFile")):
    if not file:
        return {"message": "No upload file sent"}
    else:
        contents = await file.read()
        print(contents)
        return {"filename": file.filename, "content_type": file.content_type, "file": file}

Multiple File Uploads

@app.post("/multi-files/")
async def create_multi_files(files: list[bytes] = File(description="Multiple files as bytes")):
    return {"file_sizes": [len(file) for file in files]}


@app.post("/upload-multiple-files/")
async def create_upload_files2(files: list[UploadFile] = File(description="Multiple upload files as UploadFile")):
    return {"filenames": [file.filename for file in files]}

Request Forms and Files

@app.post("/token-files/")
async def create_file(
        file: bytes = File(), fileb: UploadFile = File(), token: str = Form()
):
    return {
        "file_size": len(file),
        "token": token,
        "fileb_content_type": fileb.content_type,
    }
curl -X 'POST' \
  'http://127.0.0.1:8000/token-files/' \
  -H 'accept: application/json' \
  -H 'Content-Type: multipart/form-data' \
  -F 'file=@CleanShot 2022-12-05 at 10.32.57.png;type=image/png' \
  -F 'fileb=@CleanShot 2022-12-05 at 10.32.57.png;type=image/png' \
  -F 'token=1111'
{
  "file_size": 19851,
  "token": "1111",
  "fileb_content_type": "image/png"
}

Path Operation Configuration

Tags

@app.post("/items/", response_model=Item, tags=["items"])
async def create_item(item: Item):
    return item


@app.get("/items/", tags=["items"])
async def read_items():
    return [{"name": "Foo", "price": 42}]


@app.get("/users/", tags=["users"])
async def read_users():
    return [{"username": "johndoe"}]

image01

Tags with Enums

class Tags(Enum):
    items = "items"
    users = "users"


@app.get("/items/", tags=[Tags.items])
async def get_items():
    return ["Portal gun", "Plumbus"]


@app.get("/users/", tags=[Tags.users])
async def read_users():
    return ["Rick", "Morty"]

Description from docstring

@app.post("/items/", response_model=Item, summary="Create an item")
async def create_item(item: Item):
    """
    Create an item with all the information:

    - **name**: each item must have a name
    - **description**: a long description
    - **price**: required
    - **tax**: if the item doesn't have tax, you can omit this
    - **tags**: a set of unique tag strings for this item
    """
    return item

image02

Deprecated

@app.get("/elements/", tags=["items"], deprecated=True)
async def read_elements():
    return [{"item_id": "Foo"}]

JSON Compatible Encoder

There are some cases where you might need to convert a data type (like a Pydantic model) to something compatible with JSON (like a dict, list, etc).

For example, if you need to store it in a database.

Let’s imagine that you have a database fake_db that only receives JSON compatible data.

For example, it doesn’t receive datetime objects, as those are not compatible with JSON.

from fastapi.encoders import jsonable_encoder

@app.put("/items/{id}")
def update_item(id: str, item: Item):
    json_compatible_item_data = jsonable_encoder(item)
    fake_db[id] = json_compatible_item_data

Body Updates (PUT or PATCH)

class Item(BaseModel):
    name: str | None = None
    description: str | None = None
    price: float | None = None
    tax: float = 10.5
    tags: list[str] = []


items = {
    "foo": {"name": "Foo", "price": 50.2},
    "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
    "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}


@app.get("/items/{item_id}", response_model=Item)
async def read_item(item_id: str):
    return items[item_id]


@app.patch("/items/{item_id}", response_model=Item)
async def update_item(item_id: str, item: Item):
    stored_item_data = items[item_id]
    stored_item_model = Item(**stored_item_data)
    update_data = item.dict(exclude_unset=True)
    updated_item = stored_item_model.copy(update=update_data)
    items[item_id] = jsonable_encoder(updated_item)
    return updated_item

Dependencies

FastAPI has a very powerful but intuitive Dependency Injection system.

“Dependency Injection” means, in programming, that there is a way for your code (in this case, your path operation functions) to declare things that it requires to work and use: “dependencies”.

And then, that system (in this case FastAPI) will take care of doing whatever is needed to provide your code with those needed dependencies (“inject” the dependencies).

This is very useful when you need to:

  • Have shared logic (the same code logic again and again).
  • Share database connections.
  • Enforce security, authentication, role requirements, etc.
  • And many other things…

All these, while minimizing code repetition.

async def common_parameters(q: str | None = None, skip: int = 0, limit: int = 100):
    return {"q": q, "skip": skip, "limit": limit}


@app.get("/items-depends/")
async def read_items(commons: dict = Depends(common_parameters)):
    return commons


@app.get("/users-depends/")
async def read_users(commons: dict = Depends(common_parameters)):
    return commons
  • Calling your dependency (“dependable”) function with the correct parameters.
  • Get the result from your function.
  • Assign that result to the parameter in your path operation function.

With the Dependency Injection system, you can also tell FastAPI that your path operation function also “depends” on something else that should be executed before your path operation function, and FastAPI will take care of executing it and “injecting” the results.

Other common terms for this same idea of “dependency injection” are:

  • resources
  • providers
  • services
  • injectables
  • components

For example, let’s say you have 4 API endpoints (path operations):

  • /items/public/
  • /items/private/
  • /users/{user_id}/activate
  • /items/pro/

then you could add different permission requirements for each of them just with dependencies and sub-dependencies:

CleanShot 2022-12-06 at 21.00.53

Classes as Dependencies

The key factor is that a dependency should be a “callable”.

A “callable” in Python is anything that Python can “call” like a function.

class CommonQueryParams:
    def __init__(self, q: str | None = None, skip: int = 0, limit: int = 100):
        self.q = q
        self.skip = skip
        self.limit = limit


@app.get("/items-common-depends/")
async def read_items(commons: CommonQueryParams = Depends()):
    response = {}
    if commons.q:
        response.update({"q": commons.q})
    items = fake_items_db[commons.skip: commons.skip + commons.limit]
    response.update({"items": items})
    return response

Sub-dependencies

You can create dependencies that have sub-dependencies.

def query_extractor(q: str | None = None):
    return q


def query_or_cookie_extractor(
        q: str = Depends(query_extractor), last_query: str | None = Cookie(default=None)
):
    # If the user didn't provide any query q, we use the last query used, which we saved to a cookie before.
    if not q:
        return last_query
    return q


@app.get("/items12/")
async def read_query(query_or_default: str = Depends(query_or_cookie_extractor)):
    return {"q_or_cookie": query_or_default}

CleanShot 2022-12-07 at 10.10.27

CleanShot 2022-12-07 at 10.11.29

Dependencies in path operation decorators

In some cases you don’t really need the return value of a dependency inside your path operation function.

Or the dependency doesn’t return a value.

But you still need it to be executed/solved.

For those cases, instead of declaring a path operation function parameter with Depends, you can add a list of dependencies to the path operation decorator.

async def verify_token(x_token: str = Header()):
    if x_token != "fake-super-secret-token":
        raise HTTPException(status_code=400, detail="X-Token header invalid")


async def verify_key(x_key: str = Header()):
    if x_key != "fake-super-secret-key":
        raise HTTPException(status_code=400, detail="X-Key header invalid")
    return x_key


@app.get("/items12/", dependencies=[Depends(verify_token), Depends(verify_key)])
async def read_items():
    return [{"item": "Foo"}, {"item": "Bar"}]

Global Dependency

app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)])

Dependency with Yield

FastAPI supports dependencies that do some extra steps after finishing.

To do this, use yield instead of return, and write the extra steps after.

Any function that is valid to use with:

would be valid to use as a FastAPI dependency.

In fact, FastAPI uses those two decorators internally.

If you use a try block in a dependency with yield, you’ll receive any exception that was thrown when using the dependency.

async def get_db():
    db = DBSession()
    try:
        yield db
    finally:
        db.close()

Sub Dependency with Yield

FastAPI will make sure that the “exit code” in each dependency with yield is run in the correct order.

For example, dependency_c can have a dependency on dependency_b, and dependency_b on dependency_a:

async def dependency_a():
    dep_a = generate_dep_a()
    try:
        yield dep_a
    finally:
        dep_a.close()


async def dependency_b(dep_a=Depends(dependency_a)):
    dep_b = generate_dep_b()
    try:
        yield dep_b
    finally:
        dep_b.close(dep_a)


async def dependency_c(dep_b=Depends(dependency_b)):
    dep_c = generate_dep_c()
    try:
        yield dep_c
    finally:
        dep_c.close(dep_b)
sequenceDiagram

participant client as Client
participant handler as Exception handler
participant dep as Dep with yield
participant operation as Path Operation
participant tasks as Background tasks

    Note over client,tasks: Can raise exception for dependency, handled after response is sent
    Note over client,operation: Can raise HTTPException and can change the response
    client ->> dep: Start request
    Note over dep: Run code up to yield
    opt raise
        dep -->> handler: Raise HTTPException
        handler -->> client: HTTP error response
        dep -->> dep: Raise other exception
    end
    dep ->> operation: Run dependency, e.g. DB session
    opt raise
        operation -->> dep: Raise HTTPException
        dep -->> handler: Auto forward exception
        handler -->> client: HTTP error response
        operation -->> dep: Raise other exception
        dep -->> handler: Auto forward exception
    end
    operation ->> client: Return response to client
    Note over client,operation: Response is already sent, can't change it anymore
    opt Tasks
        operation -->> tasks: Send background tasks
    end
    opt Raise other exception
        tasks -->> dep: Raise other exception
    end
    Note over dep: After yield
    opt Handle other exception
        dep -->> dep: Handle exception, can't change response. E.g. close DB session.
    end

Context Managers

“Context Managers” are any of those Python objects that you can use in a with statement.

For example, you can use with to read a file:

with open("./somefile.txt") as f:
    contents = f.read()
    print(contents)

Underneath, the open("./somefile.txt") creates an object that is a called a “Context Manager”.

When the with block finishes, it makes sure to close the file, even if there were exceptions.

In Python, you can create Context Managers by creating a class with two methods: __enter__() and __exit__().

class MySuperContextManager:
    def __init__(self):
        self.db = DBSession()

    def __enter__(self):
        return self.db

    def __exit__(self, exc_type, exc_value, traceback):
        self.db.close()


async def get_db():
    with MySuperContextManager() as db:
        yield db

Security

There are many ways to handle security, authentication and authorization.

To put it simply, authentication is about who you are, while authorization is about what you are allowed to do.

OAuth2

OAuth2 is a specification that defines several ways to handle authentication and authorization.

It is quite an extensive specification and covers several complex use cases.

It includes ways to authenticate using a “third party”.

That’s what all the systems with “login with Facebook, Google, Twitter, GitHub” use underneath.

OpenID Connect is another specification, based on OAuth2.

It just extends OAuth2 specifying some things that are relatively ambiguous in OAuth2, to try to make it more interoperable.

OpenAPI (previously known as Swagger) is the open specification for building APIs (now part of the Linux Foundation).

FastAPI is based on OpenAPI.

OpenAPI defines the following security schemes:

  • apiKey: an application specific key that can come from:
    • A query parameter.
    • A header.
    • A cookie.
  • http: standard HTTP authentication systems, including:
    • bearer: a header Authorization with a value of Bearer plus a token. This is inherited from OAuth2.
    • HTTP Basic authentication.
    • HTTP Digest, etc.
  • oauth2: all the OAuth2 ways to handle security (called “flows”).
    • Several of these flows are appropriate for building an OAuth 2.0 authentication provider (like Google, Facebook, Twitter, GitHub, etc):
      • implicit
      • clientCredentials
      • authorizationCode
    • But there is one specific “flow” that can be perfectly used for handling authentication in the same application directly:
      • password: some next chapters will cover examples of this.
  • openIdConnect: has a way to define how to discover OAuth2 authentication data automatically.
    • This automatic discovery is what is defined in the OpenID Connect specification.

username and password

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


@app.get("/items13/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}

CleanShot 2022-12-07 at 14.48.17

OAuth2 was designed so that the backend or API could be independent of the server that authenticates the user.

But in this case, the same FastAPI application will handle the API and the authentication.

  • A “token” is just a string with some content that we can use later to verify this user.

  • Normally, a token is set to expire after some time.

    • So, the user will have to log in again at some point later.
    • And if the token is stolen, the risk is less. It is not like a permanent key that will work forever (in most of the cases).

image-20221207145347600

When we create an instance of the OAuth2PasswordBearer class we pass in the tokenUrl parameter. This parameter contains the URL that the client (the frontend running in the user’s browser) will use to send the username and password in order to get a token.

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


# tokenUrl="token" refers to a relative URL token, it's equivalent to ./token


@app.get("/items13/")
async def read_items(token: str = Depends(oauth2_scheme)):
    # This dependency will provide a str that is assigned to the parameter token of the path operation function.
    return {"token": token}

Get current user

class User4(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


def fake_decode_token(token):
    user = User4(
        username=token + "fake_decoded",
        email="[email protected]",
        full_name="John Doe",
        disabled=False
    )
    print(user)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    print(token)
    user = fake_decode_token(token)
    print(user)
    return user


@app.get("/users-token/me")
async def read_users_me(current_user: User4 = Depends(get_current_user)):
    return current_user

Simple OAuth2 with Password and Bearer

OAuth2 specifies that when using the “password flow” (that we are using) the client/user must send a username and password fields as form data.

scope

The spec also says that the client can send another form field “scope”.

The spec also says that the client can send another form field “scope”.

The form field name is scope (in singular), but it is actually a long string with “scopes” separated by spaces.

Each “scope” is just a string (without spaces).

They are normally used to declare specific security permissions, for example:

  • users:read or users:write are common examples.
  • instagram_basic is used by Facebook / Instagram.
  • https://www.googleapis.com/auth/drive is used by Google.

In OAuth2 a “scope” is just a string that declares a specific permission required.

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}


def fake_hash_password(password: str) -> str:
    return "fakehashed" + password


class UserInDB(User4):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token4(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User4 = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    print(form_data.username)
    print(form_data.password)
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


@app.get("/users-secure/me")
async def read_users_me(current_user: User4 = Depends(get_current_active_user)):
    return current_user

“Hashing” means: converting some content (a password in this case) into a sequence of bytes (just a string) that looks like gibberish.

Whenever you pass exactly the same content (exactly the same password) you get exactly the same gibberish.

But you cannot convert from the gibberish back to the password.

UserInDB(**user_dict) means:

Pass the keys and values of the user_dict directly as key-value arguments, equivalent to:

UserInDB(
    username = user_dict["username"],
    email = user_dict["email"],
    full_name = user_dict["full_name"],
    disabled = user_dict["disabled"],
    hashed_password = user_dict["hashed_password"],
)

OAuth2 with Password (and hashing), Bearer with JWT tokens

Let’s make the application actually secure, using JWT tokens and secure password hashing.

JWT means “JSON Web Tokens”.

It’s a standard to codify a JSON object in a long dense string without spaces. It looks like this:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

It is not encrypted, so, anyone could recover the information from the contents.

But it’s signed. So, when you receive a token that you emitted, you can verify that you actually emitted it.

# Hashing tokens
from passlib.context import CryptContext
from jose import JWTError, jwt

# to get a string like this run:
# openssl rand -hex 32

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"

ACCESS_TOKEN_EXPIRE_MINUTES = 30

fake_users_db2 = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


class User5(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB5(User5):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user5(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db5, username: str, password: str):
    user = get_user(fake_db5, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user5(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db2, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User5 = Depends(get_current_user5)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db2, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users-jwt/me/", response_model=User5)
async def read_users_me(current_user: User5 = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User5 = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

Middleware

A “middleware” is a function that works with every request before it is processed by any specific path operation. And also with every response before returning it.

from fastapi import FastAPI, Request
import time


@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

CORS (Cross-Origin Resource Sharing)

from fastapi.middleware.cors import CORSMiddleware

origins = [
    "http://localhost.tiangolo.com",
    "https://localhost.tiangolo.com",
    "http://localhost",
    "http://localhost:8080",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.get("/new")
async def main1():
    return {"message": "Hello World"}

SQL (Relational) Databases

You should have a directory named my_super_project that contains a sub-directory called sql_app.

sql_app should have the following files:

  • sql_app/__init__.py: is an empty file.
  • sql_app/database.py:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
  • sql_app/models.py:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")
  • sql_app/schemas.py:
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
  • sql_app/crud.py:
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item
  • sql_app/main.py:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Alternative DB session with middleware

@app.middleware("http")  
async def db_session_middleware(request: Request, call_next):  
    # It's probably better to use dependencies with yield when they are enough for the use case.  
    response = Response("Internal server error", status_code=500)  
    try:  
        # request.state is a property of each Request object. It is there to  
        # store arbitrary objects attached to the request itself, like the database session in this case.        # For us in this case, it helps us ensure a single database session is used through all the request,        # and then closed afterwards (in the middleware).        request.state.db = SessionLocal()  
        response = await call_next(request)  
    finally:  
        request.state.db.close()  
    return response

def get_db(request: Request):  
    return request.state.db

It’s probably better to use dependencies with yield when they are enough for the use case.

Bigger Applications - Multiple Files

img

.
├── app                  # "app" is a Python package
│   ├── __init__.py      # this file makes "app" a "Python package"
│   ├── main.py          # "main" module, e.g. import app.main
│   ├── dependencies.py  # "dependencies" module, e.g. import app.dependencies
│   └── routers          # "routers" is a "Python subpackage"
│   │   ├── __init__.py  # makes "routers" a "Python subpackage"
│   │   ├── items.py     # "items" submodule, e.g. import app.routers.items
│   │   └── users.py     # "users" submodule, e.g. import app.routers.users
│   └── internal         # "internal" is a "Python subpackage"
│       ├── __init__.py  # makes "internal" a "Python subpackage"
│       └── admin.py     # "admin" submodule, e.g. import app.internal.admin
# app/routers / users.py  
# Created by azat at 8.12.2022  
  
from fastapi import APIRouter  
  
router = APIRouter()  
  
  
@router.get("/users/", tags=["users"])  
async def read_users():  
    return [{"username": "Rick"}, {"username": "Morty"}]  
  
  
@router.get("/users/me", tags=["users"])  
async def read_user_me():  
    return {"username": "fakecurrentuser"}  
  
  
@router.get("/users/{username}", tags=["users"])  
async def read_user(username: str):  
    return {"username": username}
# app / dependencies.py  
# Created by azat at 8.12.2022  
  
from fastapi import Header, HTTPException  
  
  
async def get_token_header(x_token: str = Header()):  
    if x_token != "fake-super-secret-token":  
        raise HTTPException(status_code=400, detail="X-Token header invalid")  
  
  
async def get_query_token(token: str):  
    if token != "jessica":  
        raise HTTPException(status_code=400, detail="No Jessica token provided")
# app/routers / items.py  
# Created by azat at 8.12.2022  
  
from fastapi import APIRouter, Depends, HTTPException  
  
from ..dependencies import get_token_header  
  
router = APIRouter(  
    prefix="/items",  
    tags=["items"],  
    dependencies=[Depends(get_token_header)],  
    responses={404: {"description": "Not found"}},  
)  
  
fake_items_db = {"plumbus": {"name": "Plumbus"}, "gun": {"name": "Portal Gun"}}  
  
  
@router.get("/")  
async def read_items():  
    return fake_items_db  
  
  
@router.get("/{item_id}")  
async def read_item(item_id: str):  
    if item_id not in fake_items_db:  
        raise HTTPException(status_code=404, detail="Item not found")  
    return {"name": fake_items_db[item_id]["name"], "item_id": item_id}  
  
  
# This last path operation will have the combination of tags: ["items", "custom"].  
# # And it will also have both responses in the documentation, one for 404 and one for 403.  
@router.put(  
    "/{item_id}",  
    tags=["custom"],  
    responses={403: {"description": "Operation forbidden"}},  
)  
async def update_item(item_id: str):  
    if item_id != "plumbus":  
        raise HTTPException(  
            status_code=403, detail="You can only update the item: plumbus"  
        )  
    return {"item_id": item_id, "name": "The great Plumbus"}
# app/internal / admin.py  
# Created by azat at 8.12.2022  
  
from fastapi import APIRouter  
  
router = APIRouter()  
  
  
@router.post("/")  
async def update_admin():  
    return {"message": "Admin getting schwifty"}
# app / main.py
# Created by azat at 8.12.2022
from app.routers import items

from fastapi import Depends, FastAPI

from .dependencies import get_query_token, get_token_header
from .internal import admin
from .routers import items, users

app = FastAPI(dependencies=[Depends(get_query_token)])

app.include_router(users.router)
app.include_router(items.router)
app.include_router(
    admin.router,
    prefix="/admin",
    tags=["admin"],
    dependencies=[Depends(get_token_header)],
    responses={418: {"description": "I'm a teapot"}},
)


@app.get("/")
async def root():
    return {"message": "Hello Bigger Applications!"}

uvicorn app.main:app --reload

Background Tasks

# app_bgtasks / main.py
# Created by azat at 8.12.2022

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    # And as the write operation doesn't use async and await, we define the function with normal def:
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    # Inside your path operation function,
    # pass your task function to the background tasks object with the method .add_task():
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification2/{email}")
async def send_notification2(
        email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

Metadata and Docs URLS

# app_metadata_docs / main.py
# Created by azat at 8.12.2022

from fastapi import FastAPI

description = """
ChimichangApp API helps you do awesome stuff. 🚀

## Items

You can **read items**.

## Users

You will be able to:

* **Create users** (_not implemented_).
* **Read users** (_not implemented_).
"""

tags_metadata = [
    {
        "name": "users",
        "description": "Operations with users. The **login** logic is also here.",
    },
    {
        "name": "items",
        "description": "Manage items. So _fancy_ they have their own docs.",
        "externalDocs": {
            "description": "Items external docs",
            "url": "https://fastapi.tiangolo.com/",
        },
    },
]

app = FastAPI(
    title="ChimichangApp",
    description=description,
    version="0.0.1",
    terms_of_service="http://example.com/terms/",
    contact={
        "name": "Deadpoolio the Amazing",
        "url": "http://x-force.example.com/contact/",
        "email": "[email protected]",
    },
    license_info={
        "name": "Apache 2.0",
        "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
    },
    openapi_tags=tags_metadata,
    docs_url="/docs",
    redoc_url=None,
)


@app.get("/items/")
async def read_items():
    return [{"name": "Katana"}]


@app.get("/users/", tags=["users"])
async def get_users():
    return [{"name": "Harry"}, {"name": "Ron"}]


@app.get("/items/", tags=["items"])
async def get_items():
    return [{"name": "wand"}, {"name": "flying broom"}]

CleanShot 2022-12-08 at 21.17.13

Static Files

# app_static / main.py  
# Created by azat at 8.12.2022  
  
from fastapi import FastAPI  
from fastapi.staticfiles import StaticFiles  
  
app = FastAPI()  
  
app.mount("/static", StaticFiles(directory="static"), name="static")

CleanShot 2022-12-08 at 22.29.41

SQLModel

# sqlm / main.py
# Created by azat at 9.12.2022

from typing import Optional
from fastapi import FastAPI, HTTPException, Query, Depends
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select


# =========== Team Models ============

class TeamBase(SQLModel):
    name: str = Field(index=True)
    headquarters: str


class Team(TeamBase, table=True):
    id: int | None = Field(default=None, primary_key=True)
    heroes: list["Hero"] = Relationship(back_populates="team")


class TeamCreate(TeamBase):
    pass


class TeamRead(TeamBase):
    id: int


class TeamUpdate(SQLModel):
    name: str | None = None
    headquarters: str | None = None


# =========== Hero Models ============

class HeroBase(SQLModel):
    name: str = Field(index=True, unique=True)
    secret_name: str
    age: int | None = Field(default=None, index=True)

    team_id: int | None = Field(default=None, foreign_key="team.id")


class Hero(HeroBase, table=True):
    id: int | None = Field(default=None, primary_key=True)

    team: Team | None = Relationship(back_populates="heroes")


# class Hero(SQLModel, table=True):
#     #  because this same SQLModel object is not only a Pydantic model instance but also a SQLAlchemy model instance,
#     #  we can use it directly in a session to create the row in the database.
#     id: int | None = Field(default=None, primary_key=True)
#     name: str = Field(index=True)
#     secret_name: str
#     age: int | None = Field(default=None, index=True)


# class HeroCreate(SQLModel):
#     name: str
#     secret_name: str
#     age: int | None = None

class HeroCreate(HeroBase):
    pass


# class HeroRead(SQLModel):
#     id: int
#     name: str
#     secret_name: str
#     age: int | None = None

class HeroRead(HeroBase):
    id: int


class HeroUpdate(SQLModel):
    name: str | None = None
    secret_name: str | None = None
    age: int | None = None
    team_id: int | None = None


# =========== Models with Relationship ===========
class HeroReadWithTeam(HeroRead):
    # The HeroReadWithTeam inherits from HeroRead, which means that it will have the normal fields for reading,
    # including the required id that was declared in HeroRead.
    # And then it adds the new field team, which could be None, and is declared with the type TeamRead
    # with the base fields for reading a team.
    team: TeamRead | None = None


class TeamReadWithHeroes(TeamRead):
    # Then we do the same for the TeamReadWithHeroes, it inherits from TeamRead, and declares the new field heroes,
    # which is a list of HeroRead.
    heroes: list["HeroRead"] = []


sqlite_file_name = "simple_hero.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"

connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)


def create_db_and_tables():
    SQLModel.metadata.create_all(engine)


def get_session():
    with Session(engine) as session:
        yield session


app = FastAPI()


@app.on_event("startup")
def on_startup():
    create_db_and_tables()


# =========== Team Endpoints ============
@app.post("/teams/", response_model=TeamRead)
def create_team(*, session: Session = Depends(get_session), team: TeamCreate):
    db_team = Team.from_orm(team)
    session.add(db_team)
    session.commit()
    session.refresh(db_team)
    return db_team


@app.get("/teams/", response_model=list[TeamRead])
def read_teams(
        *,
        session: Session = Depends(get_session),
        offset: int = 0,
        limit: int = Query(default=100, lte=100),
):
    teams = session.exec(select(Team).offset(offset).limit(limit)).all()
    return teams


@app.get("/teams/{team_id}", response_model=TeamReadWithHeroes)
def read_team(*, team_id: int, session: Session = Depends(get_session)):
    team = session.get(Team, team_id)
    if not team:
        raise HTTPException(status_code=404, detail="Team not found")
    return team


@app.patch("/teams/{team_id}", response_model=TeamRead)
def update_team(
        *,
        session: Session = Depends(get_session),
        team_id: int,
        team: TeamUpdate,
):
    db_team = session.get(Team, team_id)
    if not db_team:
        raise HTTPException(status_code=404, detail="Team not found")
    team_data = team.dict(exclude_unset=True)
    for key, value in team_data.items():
        setattr(db_team, key, value)
    session.add(db_team)
    session.commit()
    session.refresh(db_team)
    return db_team


@app.delete("/teams/{team_id}")
def delete_team(*, session: Session = Depends(get_session), team_id: int):
    team = session.get(Team, team_id)
    if not team:
        raise HTTPException(status_code=404, detail="Team not found")
    session.delete(team)
    session.commit()
    return {"ok": True}


# @app.post("/heroes/", response_model=HeroRead)
# def create_hero(hero: HeroCreate):
#     with Session(engine) as session:
#         db_hero = Hero.from_orm(hero)
#         session.add(db_hero)
#         session.commit()
#         session.refresh(db_hero)
#         return db_hero

@app.post("/heroes/", response_model=HeroRead, status_code=201)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
    # Python would normally complain about that, but we can use the initial "parameter" *,
    # to mark all the rest of the parameters as "keyword only", which solves the problem.
    db_hero = Hero.from_orm(hero)
    db_team_id = db_hero.team_id
    if db_team_id:
        db_team = session.get(Team, db_team_id)
        if not db_team:
            raise HTTPException(status_code=404, detail="Team not found")
    session.add(db_hero)
    session.commit()
    session.refresh(db_hero)
    return db_hero


@app.get("/heroes/", response_model=list[Hero])
def read_heroes(
        *,
        session: Session = Depends(get_session),
        offset: int = 0, limit: int = Query(default=100, lte=100)):
    heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
    return heroes


@app.get("/heroes/{hero_id}", response_model=HeroReadWithTeam)
def read_hero(
        *,
        session: Session = Depends(get_session),
        hero_id: int):
    hero = session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Item mot Found")
    return hero


@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(*,
                session: Session = Depends(get_session),
                hero_id: int, hero: HeroUpdate):
    # hero_id the hero id to update
    # we read the hero from database by the provided id
    hero_db = session.get(Hero, hero_id)
    if not hero_db:
        # checking if it exists, possibly raising an error for the client if it doesn't exist
        raise HTTPException(status_code=404, detail="Item mot Found")
    hero_data = hero.dict(exclude_unset=True)
    # Now that we have a dictionary with the data sent by the client, we can iterate for
    # each one of the keys and the values, and then we set them in the database hero model db_hero using setattr().
    for key, value in hero_data.items():
        setattr(hero_db, key, value)
    session.add(hero_db)
    session.commit()
    session.refresh(hero_db)
    return hero_db


@app.delete("/heroes/{hero_id}")
def delete_hero(
        *,
        session: Session = Depends(get_session),
        hero_id: int):
    hero_db = session.get(Hero, hero_id)
    if not hero_db:
        raise HTTPException(status_code=404, detail="Item mot Found")
    session.delete(hero_db)
    session.commit()
    return {"status": "success"}