Skip to content

Latest commit

 

History

History
327 lines (272 loc) · 7.73 KB

File metadata and controls

327 lines (272 loc) · 7.73 KB

Trigger Notification Event

# Synchronous Example
import novu_py
from novu_py import Novu


with Novu(
    secret_key="YOUR_SECRET_KEY_HERE",
) as novu:

    res = novu.trigger(trigger_event_request_dto=novu_py.TriggerEventRequestDto(
        workflow_id="workflow_identifier",
        payload={
            "comment_id": "string",
            "post": {
                "text": "string",
            },
        },
        overrides=novu_py.Overrides(),
        to="SUBSCRIBER_ID",
        actor="<value>",
        context={
            "key": "org-acme",
        },
    ))

    # Handle response
    print(res)

The same SDK client can also be used to make asynchronous requests by importing asyncio.

# Asynchronous Example
import asyncio
import novu_py
from novu_py import Novu

async def main():

    async with Novu(
        secret_key="YOUR_SECRET_KEY_HERE",
    ) as novu:

        res = await novu.trigger_async(trigger_event_request_dto=novu_py.TriggerEventRequestDto(
            workflow_id="workflow_identifier",
            payload={
                "comment_id": "string",
                "post": {
                    "text": "string",
                },
            },
            overrides=novu_py.Overrides(),
            to="SUBSCRIBER_ID",
            actor="<value>",
            context={
                "key": "org-acme",
            },
        ))

        # Handle response
        print(res)

asyncio.run(main())

Cancel Triggered Event

# Synchronous Example
from novu_py import Novu


with Novu(
    secret_key="YOUR_SECRET_KEY_HERE",
) as novu:

    res = novu.cancel(transaction_id="<id>")

    # Handle response
    print(res)

The same SDK client can also be used to make asynchronous requests by importing asyncio.

# Asynchronous Example
import asyncio
from novu_py import Novu

async def main():

    async with Novu(
        secret_key="YOUR_SECRET_KEY_HERE",
    ) as novu:

        res = await novu.cancel_async(transaction_id="<id>")

        # Handle response
        print(res)

asyncio.run(main())

Broadcast Event to All

# Synchronous Example
import novu_py
from novu_py import Novu


with Novu(
    secret_key="YOUR_SECRET_KEY_HERE",
) as novu:

    res = novu.trigger_broadcast(trigger_event_to_all_request_dto=novu_py.TriggerEventToAllRequestDto(
        name="<value>",
        payload={
            "comment_id": "string",
            "post": {
                "text": "string",
            },
        },
        overrides=novu_py.TriggerEventToAllRequestDtoOverrides(
            **{
                "fcm": {
                    "data": {
                        "key": "value",
                    },
                },
            },
        ),
        actor=novu_py.SubscriberPayloadDto(
            first_name="John",
            last_name="Doe",
            email="john.doe@example.com",
            phone="+1234567890",
            avatar="https://example.com/avatar.jpg",
            locale="en-US",
            timezone="America/New_York",
            subscriber_id="<id>",
        ),
        context={
            "key": "org-acme",
        },
    ))

    # Handle response
    print(res)

The same SDK client can also be used to make asynchronous requests by importing asyncio.

# Asynchronous Example
import asyncio
import novu_py
from novu_py import Novu

async def main():

    async with Novu(
        secret_key="YOUR_SECRET_KEY_HERE",
    ) as novu:

        res = await novu.trigger_broadcast_async(trigger_event_to_all_request_dto=novu_py.TriggerEventToAllRequestDto(
            name="<value>",
            payload={
                "comment_id": "string",
                "post": {
                    "text": "string",
                },
            },
            overrides=novu_py.TriggerEventToAllRequestDtoOverrides(
                **{
                    "fcm": {
                        "data": {
                            "key": "value",
                        },
                    },
                },
            ),
            actor=novu_py.SubscriberPayloadDto(
                first_name="John",
                last_name="Doe",
                email="john.doe@example.com",
                phone="+1234567890",
                avatar="https://example.com/avatar.jpg",
                locale="en-US",
                timezone="America/New_York",
                subscriber_id="<id>",
            ),
            context={
                "key": "org-acme",
            },
        ))

        # Handle response
        print(res)

asyncio.run(main())

Trigger Notification Events in Bulk

# Synchronous Example
import novu_py
from novu_py import Novu


with Novu(
    secret_key="YOUR_SECRET_KEY_HERE",
) as novu:

    res = novu.trigger_bulk(bulk_trigger_event_dto={
        "events": [
            novu_py.TriggerEventRequestDto(
                workflow_id="workflow_identifier",
                payload={
                    "comment_id": "string",
                    "post": {
                        "text": "string",
                    },
                },
                overrides=novu_py.Overrides(),
                to="SUBSCRIBER_ID",
            ),
            novu_py.TriggerEventRequestDto(
                workflow_id="workflow_identifier",
                payload={
                    "comment_id": "string",
                    "post": {
                        "text": "string",
                    },
                },
                overrides=novu_py.Overrides(),
                to="SUBSCRIBER_ID",
            ),
            novu_py.TriggerEventRequestDto(
                workflow_id="workflow_identifier",
                payload={
                    "comment_id": "string",
                    "post": {
                        "text": "string",
                    },
                },
                overrides=novu_py.Overrides(),
                to="SUBSCRIBER_ID",
            ),
        ],
    })

    # Handle response
    print(res)

The same SDK client can also be used to make asynchronous requests by importing asyncio.

# Asynchronous Example
import asyncio
import novu_py
from novu_py import Novu

async def main():

    async with Novu(
        secret_key="YOUR_SECRET_KEY_HERE",
    ) as novu:

        res = await novu.trigger_bulk_async(bulk_trigger_event_dto={
            "events": [
                novu_py.TriggerEventRequestDto(
                    workflow_id="workflow_identifier",
                    payload={
                        "comment_id": "string",
                        "post": {
                            "text": "string",
                        },
                    },
                    overrides=novu_py.Overrides(),
                    to="SUBSCRIBER_ID",
                ),
                novu_py.TriggerEventRequestDto(
                    workflow_id="workflow_identifier",
                    payload={
                        "comment_id": "string",
                        "post": {
                            "text": "string",
                        },
                    },
                    overrides=novu_py.Overrides(),
                    to="SUBSCRIBER_ID",
                ),
                novu_py.TriggerEventRequestDto(
                    workflow_id="workflow_identifier",
                    payload={
                        "comment_id": "string",
                        "post": {
                            "text": "string",
                        },
                    },
                    overrides=novu_py.Overrides(),
                    to="SUBSCRIBER_ID",
                ),
            ],
        })

        # Handle response
        print(res)

asyncio.run(main())