# Synchronous Example
import novu_py
from novu_py import Novu
with Novu(
secret_key="YOUR_SECRET_KEY_HERE",
) as novu:
res = novu.trigger(trigger_event_request_dto=novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.Overrides(),
to="SUBSCRIBER_ID",
actor="<value>",
context={
"key": "org-acme",
},
))
# Handle response
print(res)The same SDK client can also be used to make asynchronous requests by importing asyncio.
# Asynchronous Example
import asyncio
import novu_py
from novu_py import Novu
async def main():
async with Novu(
secret_key="YOUR_SECRET_KEY_HERE",
) as novu:
res = await novu.trigger_async(trigger_event_request_dto=novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.Overrides(),
to="SUBSCRIBER_ID",
actor="<value>",
context={
"key": "org-acme",
},
))
# Handle response
print(res)
asyncio.run(main())# Synchronous Example
from novu_py import Novu
with Novu(
secret_key="YOUR_SECRET_KEY_HERE",
) as novu:
res = novu.cancel(transaction_id="<id>")
# Handle response
print(res)The same SDK client can also be used to make asynchronous requests by importing asyncio.
# Asynchronous Example
import asyncio
from novu_py import Novu
async def main():
async with Novu(
secret_key="YOUR_SECRET_KEY_HERE",
) as novu:
res = await novu.cancel_async(transaction_id="<id>")
# Handle response
print(res)
asyncio.run(main())# Synchronous Example
import novu_py
from novu_py import Novu
with Novu(
secret_key="YOUR_SECRET_KEY_HERE",
) as novu:
res = novu.trigger_broadcast(trigger_event_to_all_request_dto=novu_py.TriggerEventToAllRequestDto(
name="<value>",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.TriggerEventToAllRequestDtoOverrides(
**{
"fcm": {
"data": {
"key": "value",
},
},
},
),
actor=novu_py.SubscriberPayloadDto(
first_name="John",
last_name="Doe",
email="john.doe@example.com",
phone="+1234567890",
avatar="https://example.com/avatar.jpg",
locale="en-US",
timezone="America/New_York",
subscriber_id="<id>",
),
context={
"key": "org-acme",
},
))
# Handle response
print(res)The same SDK client can also be used to make asynchronous requests by importing asyncio.
# Asynchronous Example
import asyncio
import novu_py
from novu_py import Novu
async def main():
async with Novu(
secret_key="YOUR_SECRET_KEY_HERE",
) as novu:
res = await novu.trigger_broadcast_async(trigger_event_to_all_request_dto=novu_py.TriggerEventToAllRequestDto(
name="<value>",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.TriggerEventToAllRequestDtoOverrides(
**{
"fcm": {
"data": {
"key": "value",
},
},
},
),
actor=novu_py.SubscriberPayloadDto(
first_name="John",
last_name="Doe",
email="john.doe@example.com",
phone="+1234567890",
avatar="https://example.com/avatar.jpg",
locale="en-US",
timezone="America/New_York",
subscriber_id="<id>",
),
context={
"key": "org-acme",
},
))
# Handle response
print(res)
asyncio.run(main())# Synchronous Example
import novu_py
from novu_py import Novu
with Novu(
secret_key="YOUR_SECRET_KEY_HERE",
) as novu:
res = novu.trigger_bulk(bulk_trigger_event_dto={
"events": [
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.Overrides(),
to="SUBSCRIBER_ID",
),
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.Overrides(),
to="SUBSCRIBER_ID",
),
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.Overrides(),
to="SUBSCRIBER_ID",
),
],
})
# Handle response
print(res)The same SDK client can also be used to make asynchronous requests by importing asyncio.
# Asynchronous Example
import asyncio
import novu_py
from novu_py import Novu
async def main():
async with Novu(
secret_key="YOUR_SECRET_KEY_HERE",
) as novu:
res = await novu.trigger_bulk_async(bulk_trigger_event_dto={
"events": [
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.Overrides(),
to="SUBSCRIBER_ID",
),
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.Overrides(),
to="SUBSCRIBER_ID",
),
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides=novu_py.Overrides(),
to="SUBSCRIBER_ID",
),
],
})
# Handle response
print(res)
asyncio.run(main())