Andreas Thomas d13531c222
redesign
2023-03-26 12:29:53 +02:00

179 lines
4.0 KiB
Plaintext

---
title: "@upstash/kafka"
description: A fully typed Kafka client built for Upstash Kafka and HTTP, perfect for serverless and edge runtimes
date: "2022-01-08"
url: https://upstash.com/kafka
repository: upstash/upstash-kafka
published: true
---
An HTTP/REST based Kafka client built on top of
[Upstash REST API](https://docs.upstash.com/kafka/rest).
It is the only connectionless (HTTP based) Kafka client and designed for:
- Serverless functions (AWS Lambda ...)
- Cloudflare Workers (see the example)
- Fastly Compute@Edge
- Next.js Edge, Remix ...
- Client side web/mobile applications
- WebAssembly and other environments where HTTP is preferred over TCP
connections.
# Installation
```bash
npm install @upstash/kafka
```
# Quickstart
## Auth
1. Go to [upstash](https://console.upstash.com/kafka) and select your database.
2. Copy the `REST API` secrets at the bottom of the page
```typescript
import { Kafka } from "@upstash/kafka"
const kafka = new Kafka({
url: "<UPSTASH_KAFKA_REST_URL>",
username: "<UPSTASH_KAFKA_REST_USERNAME>",
password: "<UPSTASH_KAFKA_REST_PASSWORD>",
})
```
## Produce a single message
```typescript
const p = kafka.producer()
const message = { hello: "world" } // Objects will get serialized using `JSON.stringify`
const res = await p.produce("<my.topic>", message)
const res = await p.produce("<my.topic>", message, {
partition: 1,
timestamp: 12345,
key: "<custom key>",
headers: [{ key: "traceId", value: "85a9f12" }],
})
```
## Produce multiple messages.
The same options from the example above can be set for every message.
```typescript
const p = kafka.producer()
const res = await p.produceMany([
{
topic: "my.topic",
value: { hello: "world" },
// ...options
},
{
topic: "another.topic",
value: "another message",
// ...options
},
])
```
## Consume
The first time a consumer is created, it needs to figure out the group
coordinator by asking the Kafka brokers and joins the consumer group. This
process takes some time to complete. That's why when a consumer instance is
created first time, it may return empty messages until consumer group
coordination is completed.
```typescript
const c = kafka.consumer()
const messages = await c.consume({
consumerGroupId: "group_1",
instanceId: "instance_1",
topics: ["test.topic"],
autoOffsetReset: "earliest",
})
```
More examples can be found in the
[docstring](https://github.com/upstash/upstash-kafka/blob/main/pkg/consumer.ts#L265)
## Commit manually
While `consume` can handle committing automatically, you can also use
`Consumer.commit` to manually commit.
```typescript
const consumerGroupId = "mygroup"
const instanceId = "myinstance"
const topic = "my.topic"
const c = kafka.consumer()
const messages = await c.consume({
consumerGroupId,
instanceId,
topics: [topic],
autoCommit: false,
})
for (const message of messages) {
// message handling logic
await c.commit({
consumerGroupId,
instanceId,
offset: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
},
})
}
```
## Fetch
You can also manage offsets manually by using `Consumer.fetch`
```typescript
const c = kafka.consumer()
const messages = await c.fetch({
topic: "greeting",
partition: 3,
offset: 42,
timeout: 1000,
})
```
## Examples
See [/examples](https://github.com/upstash/upstash-kafka/tree/main/examples) as
well as various examples in the docstrings of each method.
# Contributing
## Requirements
- [nodejs](https://nodejs.org) v14.x or higher
- [pnpm](https://pnpm.io/installation)
## Setup
0. Install dependencies using `pnpm install`
1. Create a kafka instance on upstash.
[docs](https://docs.upstash.com/kafka#create-a-kafka-cluster)
2. Create the following topics: `blue`, `red`, `green`.
[docs](https://docs.upstash.com/kafka#create-a-topic)
The partitions or retention settings don't matter at this time.
3. Create `.env` file with your kafka secrets `cp .env.example .env`
## Running tests
```bash
pnpm test
```