From 3c6a112694adad85e3ca4e14465ec0069a9f73b9 Mon Sep 17 00:00:00 2001 From: Michal Kaminski <32871390+michal-b-kaminski@users.noreply.github.com> Date: Mon, 25 Feb 2019 01:08:47 +0100 Subject: [PATCH] [kafkajs] new type definitions --- types/kafkajs/index.d.ts | 538 +++++++++++++++++++++++++++++++++ types/kafkajs/kafkajs-tests.ts | 145 +++++++++ types/kafkajs/tsconfig.json | 23 ++ types/kafkajs/tslint.json | 1 + 4 files changed, 707 insertions(+) create mode 100644 types/kafkajs/index.d.ts create mode 100644 types/kafkajs/kafkajs-tests.ts create mode 100644 types/kafkajs/tsconfig.json create mode 100644 types/kafkajs/tslint.json diff --git a/types/kafkajs/index.d.ts b/types/kafkajs/index.d.ts new file mode 100644 index 0000000000..a28377e369 --- /dev/null +++ b/types/kafkajs/index.d.ts @@ -0,0 +1,538 @@ +// Type definitions for kafkajs 1.4 +// Project: https://github.com/tulios/kafkajs +// Definitions by: Michal Kaminski +// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped +// TypeScript Version: 2.9 + +/// + +import * as tls from "tls"; + +export class Kafka { + constructor(options: KafkaOptions); + + producer(options?: ProducerOptions): Producer; + consumer(options?: ConsumerOptions): Consumer; + admin(options?: AdminOptions): Admin; +} + +export const PartitionAssigners: { + roundRobin: PartitionAssigner; +}; + +export namespace AssignerProtocol { + interface MemberMetadataOptions { + version: number; + topics: string[]; + userData?: Buffer; + } + + interface MemberMetadata { + encode(options: MemberMetadataOptions): Buffer; + decode(buffer: Buffer): MemberMetadataOptions; + } + + interface MemberAssignmentOptions { + version: number; + assignment: { [key: string]: number[] }; + userData?: Buffer; + } + + interface MemberAssignment { + encode(options: MemberAssignmentOptions): Buffer; + decode(buffer: Buffer): MemberAssignmentOptions; + } + + interface AssignerProtocolStatic { + MemberMetadata: MemberMetadata; + MemberAssignment: MemberAssignment; + } +} + +export const AssignerProtocol: AssignerProtocol.AssignerProtocolStatic; + +export enum CompressionTypes { + None = 0, + GZIP = 1, + Snappy = 2, + LZ4 = 3, + ZSTD = 4 +} + +export const CompressionCodecs: { [key in CompressionTypes]: () => any }; + +export enum ResourceTypes { + UNKNOWN = 0, + ANY = 1, + TOPIC = 2, + GROUP = 3, + CLUSTER = 4, + TRANSACTIONAL_ID = 5, + DELEGATION_TOKEN = 6 +} + +export interface KafkaOptions { + clientId?: string; + brokers: string[]; + ssl?: tls.ConnectionOptions; + sasl?: SASLOptions; + connectionTimeout?: number; + requestTimeout?: number; + retry?: RetryOptions; + logLevel?: logLevel; +} + +export interface SASLOptions { + mechanism: "plain" | "scram-sha-256" | "scram-sha-512"; + username: string; + password: string; +} + +export interface RetryOptions { + maxRetryTime?: number; + initialRetryTime?: number; + factor?: number; + multiplier?: number; + retries?: number; + maxInFlightRequests?: number | null; +} + +export enum logLevel { + NOTHING = 0, + ERROR = 1, + WARN = 2, + INFO = 4, + DEBUG = 5 +} + +export interface Producer { + connect(): Promise; + disconnect(): Promise; + + send(payload: MessagePayload): Promise; + sendBatch(payload: MessageBatchPayload): Promise; + + transaction(): Promise; + + events: ProducerEvents; + on( + event: ProducerEvents[keyof ProducerEvents], + cb: (e: InstrumentationEvent) => void + ): () => Producer; +} + +export interface ProducerOptions { + createPartitioner?: () => (options: { + topic: string; + partitionMetadata: PartitionMetadata[]; + message: ProducerMessage; + }) => number; + retry?: RetryOptions; + metadataMaxAge?: number; + allowAutoTopicCreation?: boolean; + transactionTimeout?: number; + idempotent?: boolean; +} + +export interface PartitionerPartitionMetadata { + partitionId: number; + leader: number; +} + +export interface PartitionMetadata { + partitionId: number; + leader: number; + partitionErrorCode?: number; + replicas?: number[]; + isr?: number[]; +} + +export interface MessagePayloadBase { + acks?: AcksBehaviour; + timeout?: number; + compression?: CompressionTypes; +} + +export interface MessagePayload extends MessagePayloadBase { + topic: string; + messages: ProducerMessage[]; + transactionTimeout?: number; + idempotent?: boolean; +} + +export interface MessageBatchPayload extends MessagePayloadBase { + topicMessages: ProducerTopicMessage[]; +} + +export interface ProducerMessage { + partition?: number; + key?: string; + value: string | Buffer | ArrayBuffer; + headers?: { [key: string]: string }; +} + +export interface ProducerTopicMessage { + topic: string; + messages: ProducerMessage[]; +} + +export enum AcksBehaviour { + All = -1, + No = 0, + Leader = 1 +} + +export interface Transaction { + send(payload: MessagePayload): Promise; + sendBatch(payload: MessageBatchPayload): Promise; + + sendOffsets(offsets: TransactionSendOffsets): Promise; + + commit(): Promise; + abort(): Promise; +} + +export interface TransactionSendOffsets { + consumerGroupId: string; + topics: TransactionSendOffsetsTopic[]; +} + +export interface TransactionSendOffsetsTopic { + topic: string; + partitions: TransactionSendOffsetsTopicPartitions[]; +} + +export interface TransactionSendOffsetsTopicPartitions { + partition: number; + offset: string; +} + +export interface Consumer { + connect(): Promise; + disconnect(): Promise; + + subscribe(options: ConsumerSubscribeOptions): Promise; + + run(options: ConsumerRunOptions): Promise; + + pause(topics: Array<{ topic: string }>): void; + resume(topics: Array<{ topic: string }>): void; + seek(options: ConsumerSeekOptions): void; + + describeGroup(): Promise; + + events: ConsumerEvents; + on( + event: ConsumerEvents[keyof ConsumerEvents], + cb: (e: InstrumentationEvent) => void + ): () => Consumer; +} + +export interface ConsumerOptions { + groupId: string; + partitionAssigners?: PartitionAssigner[]; + sessionTimeout?: number; + heartbeatInterval?: number; + metadataMaxAge?: number; + allowAutoTopicCreation?: boolean; + maxBytesPerPartition?: number; + minBytes?: number; + maxBytes?: number; + maxWaitTimeInMs?: number; + retry?: RetryOptions; + readUncommitted?: boolean; +} + +export interface PartitionAssigner { + ({ cluster }: { cluster: any /* TODO */ }): { + name: string; + version: number; + assign: (options: { + members: Array<{ memberId: string }>; + topics: any[]; + userData?: Buffer; + }) => Promise< + Array<{ + memberId: number; + memberAssignment: Buffer; + }> + >; + protocol?: (options: { + topics: any /* TODO */; + }) => { name: string; metadata: Buffer }; + }; +} + +export interface ConsumerRunOptions { + eachMessage?: (payload: ConsumerEachMessagePayload) => Promise; + eachBatch?: (payload: ConsumerEachBatchPayload) => Promise; + eachBatchAutoResolve?: boolean; + autoCommitInterval?: number; + autoCommitThreshold?: number; + autoCommit?: boolean; +} + +export interface ConsumerSubscribeOptions { + topic: string; + fromBeginning?: boolean; +} + +export interface ConsumerMessage { + timestamp: number; + key: string; + value: Buffer; + headers: { [key: string]: string }; + offset: number; +} + +export interface ConsumerBatch { + topic: string; + partition: number; + highWatermark: number; + messages: ConsumerMessage[]; +} + +export interface ConsumerEachMessagePayload { + topic: string; + partition: number; + message: ConsumerMessage; +} + +export interface ConsumerEachBatchPayload { + batch: ConsumerBatch; + resolveOffset: (offset: number) => Promise; + heartbeat: () => Promise; + isRunning: () => boolean; + commitOffsetsIfNecessary: ( + offsets?: OffsetsByTopicPartition + ) => Promise; + uncommittedOffsets: () => OffsetsByTopicPartition; +} + +export interface OffsetsByTopicPartition { + topics: TopicOffsets[]; +} + +export interface TopicOffsets { + partitions: PartitionOffset[]; +} + +export interface PartitionOffset { + partition: string; + offset: string; +} + +export interface ConsumerSeekOptions { + topic: string; + partition: number; + offset: number; +} + +export interface GroupMemberMetadata { + memberId: string; + clientId: string; + clientHost: string; + memberMetadata: Buffer; + memberAssignment: Buffer; +} + +export interface GroupMetadata { + errorCode: number; + groupId: string; + protocolType: string; + protocol: string; + members: GroupMemberMetadata[]; + state: string; +} + +export interface Admin { + connect(): Promise; + disconnect(): Promise; + + createTopics(options: AdminCreateTopicsOptions): Promise; + deleteTopics(options: AdminDeleteTopicsOptions): Promise; + getTopicMetadata(options: { + topics?: string[]; + }): Promise<{ topics: TopicMetadata[] }>; + + fetchOffsets( + options: AdminFetchOffsetsOptions + ): Promise; + resetOffsets(options: AdminResetOffsetsOptions): Promise; + setOffsets(options: AdminSetOffsetsOptions): Promise; + + describeConfigs( + options: AdminDescribeConfigsOptions + ): Promise; + alterConfigs(options: AdminAlterConfigsOptions): Promise; + + events: AdminEvents; + on( + event: AdminEvents[keyof AdminEvents], + cb: (e: InstrumentationEvent) => void + ): () => Admin; +} + +export interface AdminOptions { + retry?: RetryOptions; +} + +export interface AdminCreateTopicsOptions { + validateOnly?: boolean; + waitForLeaders?: boolean; + timeout?: number; + topics: AdminTopic[]; +} + +export interface AdminTopic { + topic: string; + numPartitions?: number; + replicationFactor?: number; + replicaAssignment?: AdminTopicReplicaAssignment[]; + configEntries?: AdminTopicConfigEntry[]; +} + +export interface AdminTopicReplicaAssignment { + partition: number; + replicas: number[]; +} + +export interface AdminTopicConfigEntry { + name: string; + value: string; +} + +export interface AdminDeleteTopicsOptions { + timeout?: number; + topics: string[]; +} + +export interface AdminFetchOffsetsOptions { + groupId: string; + topic: string; +} + +export interface AdminResetOffsetsOptions { + groupId: string; + topic: string; + earliest?: boolean; +} + +export interface TopicMetadata { + name: string; + partitions: PartitionMetadata[]; +} + +export interface AdminDescribeConfigsOptions { + resources: ResourceConfigQuery[]; +} + +export interface ResourceConfigQuery { + type: ResourceTypes; + name: string; + configNames?: string[]; +} + +export interface AdminConfigDescription { + resources: AdminConfigDescriptionResource[]; + throttleTime: number; +} + +export interface AdminConfigDescriptionResource { + configEntries: AdminConfigDescriptionResourceConfigEntry[]; + errorCode: number; + errorMessage: string; + resourceName: string; + resourceType: ResourceTypes; +} + +export interface AdminConfigDescriptionResourceConfigEntry { + configName: string; + configValue: string; + isDefault: boolean; + isSensitive: boolean; + readOnly: boolean; +} + +export interface AdminAlterConfigsOptions { + validateOnly: boolean; + resources: ResourceConfigQuery[]; +} + +export interface ResourceConfigQuery { + type: ResourceTypes; + name: string; + configEntries: ResourceConfigEntry[]; +} + +export interface ResourceConfigEntry { + name: string; + value: string; +} + +export interface AdminAlterConfigReturn { + resources: AdminAlterConfigResource[]; + throttleTime: number; +} + +export interface AdminAlterConfigResource { + errorCode: number; + errorMessage: string; + resourceName: string; + resourceType: ResourceTypes; +} + +export interface AdminTopicOffset { + partition: number; + offset: string; +} + +export interface AdminSetOffsetsSeekEntry { + partition: number; + offset: string; +} + +export interface AdminSetOffsetsOptions { + groupId: string; + topic: string; + partitions: AdminSetOffsetsSeekEntry[]; +} + +export interface InstrumentationEvent { + id: number; + type: string; + timestamp: number; + payload: { [key: string]: any }; +} + +export interface ConsumerEvents { + HEARTBEAT: "consumer.heartbeat"; + COMMIT_OFFSETS: "consumer.commit_offsets"; + GROUP_JOIN: "consumer.group_join"; + FETCH: "consumer.fetch"; + START_BATCH_PROCESS: "consumer.start_batch_process"; + END_BATCH_PROCESS: "consumner.end_batch_process"; + CONNECT: "consumer.connect"; + DISCONNECT: "consumer.disconnect"; + STOP: "consumer.stop"; + CRASH: "consumer.crash"; + REQUEST: "consumer.request"; + REQUEST_TIMEOUT: "consumer.request_timeout"; + REQUEST_QUEUE_SIZE: "consumer.request_queue_size"; +} + +export interface ProducerEvents { + CONNECT: "producer.connect"; + DISCONNECT: "producer.disconnect"; + REQUEST: "producer.request"; + REQUEST_TIMEOUT: "producer.request_timeout"; + REQUEST_QUEUE_SIZE: "producer.request_queue_size"; +} + +export interface AdminEvents { + CONNECT: "admin.connect"; + DISCONNECT: "admin.disconnect"; + REQUEST: "admin.request"; + REQUEST_TIMEOUT: "admin.request_timeout"; + REQUEST_QUEUE_SIZE: "admin.request_queue_size"; +} diff --git a/types/kafkajs/kafkajs-tests.ts b/types/kafkajs/kafkajs-tests.ts new file mode 100644 index 0000000000..3ae0f37c2c --- /dev/null +++ b/types/kafkajs/kafkajs-tests.ts @@ -0,0 +1,145 @@ +import * as fs from "fs"; + +import { + Kafka, + AssignerProtocol, + PartitionAssigners, + logLevel, + CompressionTypes, + CompressionCodecs, + ResourceTypes, + PartitionAssigner +} from "kafkajs"; + +const { MemberMetadata, MemberAssignment } = AssignerProtocol; +const { roundRobin } = PartitionAssigners; + +// COMMON +const host = "localhost"; +const topic = "topic-test"; + +const kafka = new Kafka({ + logLevel: logLevel.INFO, + brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`], + clientId: "example-consumer", + ssl: { + servername: "localhost", + rejectUnauthorized: false, + ca: [fs.readFileSync("./testHelpers/certs/cert-signed", "utf-8")] + }, + sasl: { + mechanism: "plain", + username: "test", + password: "testtest" + } +}); + +// CONSUMER +const consumer = kafka.consumer({ groupId: "test-group" }); + +const runConsumer = async () => { + await consumer.connect(); + await consumer.subscribe({ topic }); + await consumer.run({ + // eachBatch: async ({ batch }) => { + // console.log(batch) + // }, + eachMessage: async ({ topic, partition, message }) => { + const prefix = `${topic}[${partition} | ${message.offset}] / ${ + message.timestamp + }`; + console.log(`- ${prefix} ${message.key}#${message.value}`); + } + }); + await consumer.disconnect(); +}; + +runConsumer().catch(e => console.error(`[example/consumer] ${e.message}`, e)); + +// PRODUCER +const producer = kafka.producer({ allowAutoTopicCreation: true }); + +const getRandomNumber = () => Math.round(Math.random() * 1000); +const createMessage = (num: number) => ({ + key: `key-${num}`, + value: `value-${num}-${new Date().toISOString()}` +}); + +const sendMessage = () => { + return producer + .send({ + topic, + compression: CompressionTypes.GZIP, + messages: Array(getRandomNumber()) + .fill(0) + .map(_ => createMessage(getRandomNumber())) + }) + .then(console.log) + .catch(e => console.error(`[example/producer] ${e.message}`, e)); +}; + +const runProducer = async () => { + await producer.connect(); + setInterval(sendMessage, 3000); + await producer.disconnect(); +}; + +runProducer().catch(e => console.error(`[example/producer] ${e.message}`, e)); + +// ADMIN +const admin = kafka.admin({ retry: { retries: 10 } }); + +const runAdmin = async () => { + await admin.connect(); + const { topics } = await admin.getTopicMetadata({}); + await admin.createTopics({ topics: [{ topic, numPartitions: 10, replicationFactor: 1}], timeout: 30000, waitForLeaders: true }); + await admin.disconnect(); +}; + +runAdmin().catch(e => console.error(`[example/admin] ${e.message}`, e)); + +// OTHERS +async () => { + await producer.send({ + topic: "topic-name", + compression: CompressionTypes.GZIP, + messages: [{ key: "key1", value: "hello world!" }] + }); +}; + +// import SnappyCodec from "kafkajs-snappy"; +const SnappyCodec: any = undefined; +CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec; + +const myCustomAssignmentArray = [0]; +const assignment: { [key: number]: { [key: string]: number[] } } = { + 0: { a: [0] } +}; +const MyPartitionAssigner: PartitionAssigner = ({ cluster: any }) => ({ + name: "MyPartitionAssigner", + version: 1, + async assign({ members, topics }) { + // perform assignment + return myCustomAssignmentArray.map(memberId => ({ + memberId, + memberAssignment: MemberAssignment.encode({ + version: this.version, + assignment: assignment[memberId] + }) + })); + }, + protocol({ topics }) { + return { + name: this.name, + metadata: MemberMetadata.encode({ + version: this.version, + topics + }) + }; + } +}); + +kafka.consumer({ + groupId: "my-group", + partitionAssigners: [MyPartitionAssigner, roundRobin] +}); diff --git a/types/kafkajs/tsconfig.json b/types/kafkajs/tsconfig.json new file mode 100644 index 0000000000..1cab3080d1 --- /dev/null +++ b/types/kafkajs/tsconfig.json @@ -0,0 +1,23 @@ +{ + "compilerOptions": { + "module": "commonjs", + "lib": [ + "es6" + ], + "noImplicitAny": true, + "noImplicitThis": true, + "strictNullChecks": true, + "baseUrl": "../", + "typeRoots": [ + "../" + ], + "types": [], + "noEmit": true, + "forceConsistentCasingInFileNames": true, + "strictFunctionTypes": true + }, + "files": [ + "index.d.ts", + "kafkajs-tests.ts" + ] +} diff --git a/types/kafkajs/tslint.json b/types/kafkajs/tslint.json new file mode 100644 index 0000000000..3db14f85ea --- /dev/null +++ b/types/kafkajs/tslint.json @@ -0,0 +1 @@ +{ "extends": "dtslint/dt.json" }