mirror of
https://github.com/AderKonstantin/aderktech-chronark.com-.git
synced 2025-06-08 13:48:42 +03:00
179 lines
4.0 KiB
Plaintext
179 lines
4.0 KiB
Plaintext
---
|
|
title: "@upstash/kafka"
|
|
description: A fully typed Kafka client built for Upstash Kafka and HTTP, perfect for serverless and edge runtimes
|
|
date: "2022-01-08"
|
|
url: https://upstash.com/kafka
|
|
repository: upstash/upstash-kafka
|
|
published: true
|
|
|
|
---
|
|
|
|
|
|
An HTTP/REST based Kafka client built on top of
|
|
[Upstash REST API](https://docs.upstash.com/kafka/rest).
|
|
|
|
It is the only connectionless (HTTP based) Kafka client and designed for:
|
|
|
|
- Serverless functions (AWS Lambda ...)
|
|
- Cloudflare Workers (see the example)
|
|
- Fastly Compute@Edge
|
|
- Next.js Edge, Remix ...
|
|
- Client side web/mobile applications
|
|
- WebAssembly and other environments where HTTP is preferred over TCP
|
|
connections.
|
|
|
|
# Installation
|
|
|
|
```bash
|
|
npm install @upstash/kafka
|
|
```
|
|
|
|
# Quickstart
|
|
|
|
## Auth
|
|
|
|
1. Go to [upstash](https://console.upstash.com/kafka) and select your database.
|
|
2. Copy the `REST API` secrets at the bottom of the page
|
|
|
|
```typescript
|
|
import { Kafka } from "@upstash/kafka"
|
|
|
|
const kafka = new Kafka({
|
|
url: "<UPSTASH_KAFKA_REST_URL>",
|
|
username: "<UPSTASH_KAFKA_REST_USERNAME>",
|
|
password: "<UPSTASH_KAFKA_REST_PASSWORD>",
|
|
})
|
|
```
|
|
|
|
## Produce a single message
|
|
|
|
```typescript
|
|
const p = kafka.producer()
|
|
const message = { hello: "world" } // Objects will get serialized using `JSON.stringify`
|
|
const res = await p.produce("<my.topic>", message)
|
|
const res = await p.produce("<my.topic>", message, {
|
|
partition: 1,
|
|
timestamp: 12345,
|
|
key: "<custom key>",
|
|
headers: [{ key: "traceId", value: "85a9f12" }],
|
|
})
|
|
```
|
|
|
|
## Produce multiple messages.
|
|
|
|
The same options from the example above can be set for every message.
|
|
|
|
```typescript
|
|
const p = kafka.producer()
|
|
const res = await p.produceMany([
|
|
{
|
|
topic: "my.topic",
|
|
value: { hello: "world" },
|
|
// ...options
|
|
},
|
|
{
|
|
topic: "another.topic",
|
|
value: "another message",
|
|
// ...options
|
|
},
|
|
])
|
|
```
|
|
|
|
## Consume
|
|
|
|
The first time a consumer is created, it needs to figure out the group
|
|
coordinator by asking the Kafka brokers and joins the consumer group. This
|
|
process takes some time to complete. That's why when a consumer instance is
|
|
created first time, it may return empty messages until consumer group
|
|
coordination is completed.
|
|
|
|
```typescript
|
|
const c = kafka.consumer()
|
|
const messages = await c.consume({
|
|
consumerGroupId: "group_1",
|
|
instanceId: "instance_1",
|
|
topics: ["test.topic"],
|
|
autoOffsetReset: "earliest",
|
|
})
|
|
```
|
|
|
|
More examples can be found in the
|
|
[docstring](https://github.com/upstash/upstash-kafka/blob/main/pkg/consumer.ts#L265)
|
|
|
|
## Commit manually
|
|
|
|
While `consume` can handle committing automatically, you can also use
|
|
`Consumer.commit` to manually commit.
|
|
|
|
```typescript
|
|
const consumerGroupId = "mygroup"
|
|
const instanceId = "myinstance"
|
|
const topic = "my.topic"
|
|
|
|
const c = kafka.consumer()
|
|
const messages = await c.consume({
|
|
consumerGroupId,
|
|
instanceId,
|
|
topics: [topic],
|
|
autoCommit: false,
|
|
})
|
|
|
|
for (const message of messages) {
|
|
// message handling logic
|
|
|
|
await c.commit({
|
|
consumerGroupId,
|
|
instanceId,
|
|
offset: {
|
|
topic: message.topic,
|
|
partition: message.partition,
|
|
offset: message.offset,
|
|
},
|
|
})
|
|
}
|
|
```
|
|
|
|
## Fetch
|
|
|
|
You can also manage offsets manually by using `Consumer.fetch`
|
|
|
|
```typescript
|
|
const c = kafka.consumer()
|
|
const messages = await c.fetch({
|
|
topic: "greeting",
|
|
partition: 3,
|
|
offset: 42,
|
|
timeout: 1000,
|
|
})
|
|
```
|
|
|
|
## Examples
|
|
|
|
See [/examples](https://github.com/upstash/upstash-kafka/tree/main/examples) as
|
|
well as various examples in the docstrings of each method.
|
|
|
|
# Contributing
|
|
|
|
## Requirements
|
|
|
|
- [nodejs](https://nodejs.org) v14.x or higher
|
|
- [pnpm](https://pnpm.io/installation)
|
|
|
|
## Setup
|
|
|
|
0. Install dependencies using `pnpm install`
|
|
1. Create a kafka instance on upstash.
|
|
[docs](https://docs.upstash.com/kafka#create-a-kafka-cluster)
|
|
2. Create the following topics: `blue`, `red`, `green`.
|
|
[docs](https://docs.upstash.com/kafka#create-a-topic)
|
|
|
|
The partitions or retention settings don't matter at this time.
|
|
|
|
3. Create `.env` file with your kafka secrets `cp .env.example .env`
|
|
|
|
## Running tests
|
|
|
|
```bash
|
|
pnpm test
|
|
```
|