diff --git a/types/kafkajs/index.d.ts b/types/kafkajs/index.d.ts index 13bb1dceed..96a05a527a 100644 --- a/types/kafkajs/index.d.ts +++ b/types/kafkajs/index.d.ts @@ -7,484 +7,528 @@ /// import * as tls from "tls"; -import * as net from "net"; export class Kafka { - constructor(config: KafkaConfig); - producer(config?: ProducerConfig): Producer; - consumer(config?: ConsumerConfig): Consumer; - admin(config?: AdminConfig): Admin; - logger(): Logger; + constructor(options: KafkaOptions); + + producer(options?: ProducerOptions): Producer; + consumer(options?: ConsumerOptions): Consumer; + admin(options?: AdminOptions): Admin; } -export interface KafkaConfig { - brokers: string[]; - ssl?: tls.SecureContextOptions; - sasl?: SASLOptions; +export const PartitionAssigners: { + roundRobin: PartitionAssigner; +}; + +export namespace AssignerProtocol { + interface MemberMetadataOptions { + version: number; + topics: string[]; + userData?: Buffer; + } + + interface MemberMetadata { + encode(options: MemberMetadataOptions): Buffer; + decode(buffer: Buffer): MemberMetadataOptions; + } + + interface MemberAssignmentOptions { + version: number; + assignment: { [key: string]: number[] }; + userData?: Buffer; + } + + interface MemberAssignment { + encode(options: MemberAssignmentOptions): Buffer; + decode(buffer: Buffer): MemberAssignmentOptions; + } + + interface AssignerProtocolStatic { + MemberMetadata: MemberMetadata; + MemberAssignment: MemberAssignment; + } +} + +export const AssignerProtocol: AssignerProtocol.AssignerProtocolStatic; + +export enum CompressionTypes { + None = 0, + GZIP = 1, + Snappy = 2, + LZ4 = 3, + ZSTD = 4 +} + +export const CompressionCodecs: { [key in CompressionTypes]: () => any }; + +export enum ResourceTypes { + UNKNOWN = 0, + ANY = 1, + TOPIC = 2, + GROUP = 3, + CLUSTER = 4, + TRANSACTIONAL_ID = 5, + DELEGATION_TOKEN = 6 +} + +export interface LoggerMessage { + /** @var namespace Context from which the logger was called. */ + readonly namespace: string; + + /** @var level Logger level. */ + readonly level: logLevel; + + /** @var label Logger level label. */ + readonly label: string; + + /** @var log Content of the logger entry. */ + readonly log: LoggerMessageContent; +} + +export interface LoggerMessageContent { + /** @var timestamp Message timestamp. */ + readonly timestamp: Date; + + /** @var message Message sent to the logger. */ + readonly message: string; + + // Other possible fields in the content, that depend on the context. + [key: string]: any; +} + +export interface KafkaOptions { clientId?: string; + brokers: string[]; + ssl?: tls.ConnectionOptions; + sasl?: SASLOptions; connectionTimeout?: number; - authenticationTimeout?: number; requestTimeout?: number; - enforceRequestTimeout?: boolean; retry?: RetryOptions; - socketFactory?: ISocketFactory; logLevel?: logLevel; - logCreator?: logCreator; + logCreator?: () => (message: LoggerMessage) => void; } -export type ISocketFactory = (host: string, port: number, ssl: tls.SecureContextOptions, onConnect: () => void) => net.Socket; - export interface SASLOptions { mechanism: "plain" | "scram-sha-256" | "scram-sha-512"; username: string; password: string; } -export interface ProducerConfig { - createPartitioner?: ICustomPartitioner; - retry?: RetryOptions; - metadataMaxAge?: number; - allowAutoTopicCreation?: boolean; - idempotent?: boolean; - transactionalId?: string; - transactionTimeout?: number; - maxInFlightRequests?: number; -} - -export type ICustomPartitioner = () => ( - message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message }, -) => number; - -export interface Message { - key?: string | Buffer; - value: string | Buffer | null; - partition?: string | number; - headers?: IHeaders; - timestamp?: number | string; -} - -export interface PartitionMetadata { - partitionErrorCode: number; - partitionId: number; - leader: number; - replicas: number[]; - isr: number[]; -} - -// tslint:disable-next-line:interface-name -export interface IHeaders { - [key: string]: string; -} - -export interface ConsumerConfig { - groupId: string; - partitionAssigners?: PartitionAssigner[]; - metadataMaxAge?: number; - sessionTimeout?: number; - rebalanceTimeout?: number; - heartbeatInterval?: number; - maxBytesPerPartition?: number; - minBytes?: number; - maxBytes?: number; - maxWaitTimeInMs?: number; - retry?: RetryOptions; - allowAutoTopicCreation?: boolean; - maxInFlightRequests?: number; - readUncommitted?: boolean; -} - -export interface PartitionAssigner { - new (config: { cluster: Cluster }): Assigner; -} - -export interface Cluster { - isConnected(): void; - connect(): Promise; - disconnect(): Promise; - refreshMetadata(): Promise; - refreshMetadataIfNecessary(): Promise; - addTargetTopic(topic: string): Promise; - findBroker(node: { nodeId: string }): Promise; - findControllerBroker(): Promise; - findTopicPartitionMetadata(topic: string): PartitionMetadata[]; - findLeaderForPartitions(topic: string, partitions: number[]): { [leader: string]: number[] }; - findGroupCoordinator(group: { groupId: string }): Promise; - findGroupCoordinatorMetadata(group: { - groupId: string; - }): Promise<{ errorCode: number; coordinator: { nodeId: number; host: string; port: number } }>; - defaultOffset(config: { fromBeginning: boolean }): number; - fetchTopicsOffset( - topics: Array<{ topic: string; partitions: Array<{ partition: number }>; fromBeginning: boolean }>, - ): Promise<{ topic: string; partitions: Array<{ partition: number; offset: string }> }>; -} - -export interface Assignment { [topic: string]: number[]; } - -export interface GroupMember { memberId: string; } - -export interface GroupMemberAssignment { memberId: string; memberAssignment: Buffer; } - -export interface GroupState { name: string; metadata: Buffer; } - -export interface Assigner { - name: string; - version: number; - assign(group: { - members: GroupMember[]; - topics: string[], - userData: Buffer, - }): Promise; - protocol(subscription: { topics: string[], userData: Buffer }): GroupState; -} - export interface RetryOptions { maxRetryTime?: number; initialRetryTime?: number; factor?: number; multiplier?: number; retries?: number; + maxInFlightRequests?: number | null; } -export interface AdminConfig { - retry?: RetryOptions; -} - -// tslint:disable-next-line:interface-name -export interface ITopicConfig { - topic: string; - numPartitions?: number; - replicationFactor?: number; - replicaAssignment?: object[]; - configEntries?: object[]; -} - -// tslint:disable-next-line:interface-name -export interface ITopicMetadata { - topic: string; - partitions: PartitionMetadata[]; -} - -export enum ResourceType { - UNKNOWN = 0, - ANY = 1, - TOPIC = 2, - GROUP = 3, - CLUSTER = 4, - TRANSACTIONAL_ID = 5, - DELEGATION_TOKEN = 6, -} - -export interface ResourceConfigQuery { - type: ResourceType; - name: string; - configNames: string[]; -} - -export interface ConfigEntries { - configName: string; - configValue: string; - isDefault: boolean; - isSensitive: boolean; - readOnly: boolean; - configSynonyms: ConfigSynonyms[]; -} - -export interface ConfigSynonyms { - configName: string; - configValue: string; - configSource: number; -} - -export interface DescribeConfigResponse { - resources: Array<{ - configEntries: ConfigEntries[], - errorCode: number, - errorMessage: string, - resourceName: string, - resourceType: ResourceType, - }>; - throttleTime: number; -} - -// tslint:disable-next-line:interface-name -export interface IResourceConfig { - type: ResourceType; - name: string; - configEntries: Array<{ name: string, value: string }>; -} - -export type ValueOf = T[keyof T]; - -export interface AdminEvents { - CONNECT: "admin.connect"; - DISCONNECT: "admin.disconnect"; - REQUEST: "admin.network.request"; - REQUEST_TIMEOUT: "admin.network.request_timeout"; - REQUEST_QUEUE_SIZE: "admin.network.request_queue_size"; -} - -export interface InstrumentationEvent { - id: string; - type: string; - timestamp: number; - payload: T; -} - -export type ConnectEvent = InstrumentationEvent; -export type DisconnectEvent = InstrumentationEvent; -export type RequestEvent = InstrumentationEvent<{ - apiKey: number, - apiName: string, - apiVersion: number, - broker: string, - clientId: string, - correlationId: number, - createdAt: number, - duration: number, - pendingDuration: number, - sentAt: number, - size: number, -}>; -export type RequestTimeoutEvent = InstrumentationEvent<{ - apiKey: number, - apiName: string, - apiVersion: number, - broker: string, - clientId: string, - correlationId: number, - createdAt: number, - pendingDuration: number, - sentAt: number, -}>; -export type RequestQueueSizeEvent = InstrumentationEvent<{ - broker: string, - clientId: string, - queueSize: number, -}>; - -export interface SeekEntry { - partition: number; - offset: string; -} - -export interface Admin { - connect(): Promise; - disconnect(): Promise; - createTopics(options: { - validateOnly?: boolean; - waitForLeaders?: boolean; - timeout?: number; - topics: ITopicConfig[]; - }): Promise; - deleteTopics(topics: { topics: string[], timeout: number}): Promise; - fetchTopicMetadata(topicMetadata: ITopicMetadata): Promise; - fetchOffsets(topic: { groupId: string; topic: string }): Promise>; - fetchTopicOffsets(topic: string): Promise; - setOffsets(topic: { groupId: string; topic: string; partitions: SeekEntry[] }): Promise; - resetOffsets(topic: { groupId: string; topic: string; earliest: boolean }): Promise; - describeConfigs(configs: { - resources: ResourceConfigQuery[], - includeSynonyms: boolean, - }): Promise; - alterConfigs(configs: { - validateOnly: boolean, - resources: IResourceConfig[], - }): Promise; - logger(): Logger; - on(eventName: ValueOf, listener: (...args: any[]) => void): void; - events: AdminEvents; -} - -export const PartitionAssigners: { roundRobin: PartitionAssigner }; - -// tslint:disable-next-line:interface-name -export interface ISerializer { - encode(value: T): Buffer; - decode(buffer: Buffer): T; -} - -export interface MemberMetadata { - version: number; - topics: string[]; - userData: Buffer; -} - -export interface MemberAssignment { - version: number; - assignment: Assignment; - userData: Buffer; -} - -export const AssignerProtocol: { - MemberMetadata: ISerializer; - MemberAssignment: ISerializer; -}; - -export type DefaultPartitioner = ( - message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message }, -) => number; - -export type JavaCompatiblePartitioner = ( - message: { topic: string; partitionMetadata: PartitionMetadata[]; message: Message }, -) => number; - -export const Partitioners: { - DefaultPartitioner: DefaultPartitioner, - JavaCompatiblePartitioner: JavaCompatiblePartitioner, -}; - export enum logLevel { NOTHING = 0, ERROR = 1, WARN = 2, INFO = 4, - DEBUG = 5, + DEBUG = 5 } -export interface LogEntry { namespace: string; level: logLevel; label: string; log: string; } - -export type Logger = (entry: LogEntry) => void; - -export type logCreator = (logLevel: string) => ( - namespace: string, level: string, label: string, log: string, -) => void; - -export interface Broker { - isConnected(): boolean; +export interface Producer { connect(): Promise; disconnect(): Promise; - apiVersions(): Promise<{ [apiKey: number]: { minVersion: number; maxVersion: number } }>; - metadata( - topics: string[], - ): Promise<{ - brokers: Array<{ nodeId: number; host: string; port: number }>; - topicMetadata: Array<{ topicErrorCode: number; topic: number; partitionMetadata: PartitionMetadata[] }>; - }>; - offsetCommit(request: { - groupId: string; - groupGenerationId: number; - memberId: string; - retentionTime?: number; - topics: Array<{ topic: string; partitions: Array<{ partition: number; offset: string }> }>; - }): Promise; + + send(payload: MessagePayload): Promise; + sendBatch(payload: MessageBatchPayload): Promise; + + transaction(): Promise; + + events: ProducerEvents; + on( + event: ProducerEvents[keyof ProducerEvents], + cb: (e: InstrumentationEvent) => void + ): () => Producer; } -export interface KafkaMessage { - key: Buffer; - value: Buffer; - timestamp: string; - size: number; - attributes: number; - offset: string; - headers?: IHeaders; +export interface ProducerOptions { + createPartitioner?: () => (options: { + topic: string; + partitionMetadata: PartitionMetadata[]; + message: ProducerMessage; + }) => number; + retry?: RetryOptions; + metadataMaxAge?: number; + allowAutoTopicCreation?: boolean; + transactionTimeout?: number; + idempotent?: boolean; } -export interface ProducerRecord { - topic: string; - messages: Message[]; - acks?: number; +export interface PartitionerPartitionMetadata { + partitionId: number; + leader: number; +} + +export interface PartitionMetadata { + partitionId: number; + leader: number; + partitionErrorCode?: number; + replicas?: number[]; + isr?: number[]; +} + +export interface MessagePayloadBase { + acks?: AcksBehaviour; timeout?: number; compression?: CompressionTypes; } -export interface RecordMetadata { - topicName: string; - partition: number; - errorCode: number; - offset: string; - timestamp: string; -} - -export interface TopicMessages { +export interface MessagePayload extends MessagePayloadBase { topic: string; - messages: Message[]; + messages: ProducerMessage[]; + transactionTimeout?: number; + idempotent?: boolean; } -export interface ProducerBatch { - acks: number; - timeout: number; - compression: CompressionTypes; - topicMessages: TopicMessages[]; +export interface MessageBatchPayload extends MessagePayloadBase { + topicMessages: ProducerTopicMessage[]; } -export interface PartitionOffset { +export interface ProducerMessage { + partition?: number; + key?: string; + value: string | Buffer | ArrayBuffer; + headers?: { [key: string]: string }; +} + +export interface ProducerTopicMessage { + topic: string; + messages: ProducerMessage[]; +} + +export enum AcksBehaviour { + All = -1, + No = 0, + Leader = 1 +} + +export interface Transaction { + send(payload: MessagePayload): Promise; + sendBatch(payload: MessageBatchPayload): Promise; + + sendOffsets(offsets: TransactionSendOffsets): Promise; + + commit(): Promise; + abort(): Promise; +} + +export interface TransactionSendOffsets { + consumerGroupId: string; + topics: TransactionSendOffsetsTopic[]; +} + +export interface TransactionSendOffsetsTopic { + topic: string; + partitions: TransactionSendOffsetsTopicPartitions[]; +} + +export interface TransactionSendOffsetsTopicPartitions { partition: number; offset: string; } -export interface TopicOffsets { - topic: string; - partitions: PartitionOffset[]; +export interface Consumer { + connect(): Promise; + disconnect(): Promise; + + subscribe(options: ConsumerSubscribeOptions): Promise; + + run(options: ConsumerRunOptions): Promise; + + pause(topics: Array<{ topic: string }>): void; + resume(topics: Array<{ topic: string }>): void; + seek(options: ConsumerSeekOptions): void; + + describeGroup(): Promise; + + events: ConsumerEvents; + on( + event: ConsumerEvents[keyof ConsumerEvents], + cb: (e: InstrumentationEvent) => void + ): () => Consumer; } -export interface Offsets { +export interface ConsumerOptions { + groupId: string; + partitionAssigners?: PartitionAssigner[]; + sessionTimeout?: number; + heartbeatInterval?: number; + metadataMaxAge?: number; + allowAutoTopicCreation?: boolean; + maxBytesPerPartition?: number; + minBytes?: number; + maxBytes?: number; + maxWaitTimeInMs?: number; + retry?: RetryOptions; + readUncommitted?: boolean; +} + +export interface PartitionAssigner { + ({ cluster }: { cluster: any /* TODO */ }): { + name: string; + version: number; + assign: (options: { + members: Array<{ memberId: string }>; + topics: any[]; + userData?: Buffer; + }) => Promise< + Array<{ + memberId: number; + memberAssignment: Buffer; + }> + >; + protocol?: (options: { + topics: any /* TODO */; + }) => { name: string; metadata: Buffer }; + }; +} + +export interface ConsumerRunOptions { + eachMessage?: (payload: ConsumerEachMessagePayload) => Promise; + eachBatch?: (payload: ConsumerEachBatchPayload) => Promise; + eachBatchAutoResolve?: boolean; + autoCommitInterval?: number; + autoCommitThreshold?: number; + autoCommit?: boolean; +} + +export interface ConsumerSubscribeOptions { + topic: string; + fromBeginning?: boolean; +} + +export interface ConsumerMessage { + timestamp: number; + key: string; + value: Buffer; + headers: { [key: string]: string }; + offset: number; +} + +export interface ConsumerBatch { + topic: string; + partition: number; + highWatermark: number; + messages: ConsumerMessage[]; +} + +export interface ConsumerEachMessagePayload { + topic: string; + partition: number; + message: ConsumerMessage; +} + +export interface ConsumerEachBatchPayload { + batch: ConsumerBatch; + resolveOffset: (offset: number) => Promise; + heartbeat: () => Promise; + isRunning: () => boolean; + commitOffsetsIfNecessary: ( + offsets?: OffsetsByTopicPartition + ) => Promise; + uncommittedOffsets: () => OffsetsByTopicPartition; +} + +export interface OffsetsByTopicPartition { topics: TopicOffsets[]; } -export interface Sender { - send(record: ProducerRecord): Promise; - sendBatch(batch: ProducerBatch): Promise; +export interface TopicOffsets { + partitions: PartitionOffset[]; } -export interface ProducerEvents { - CONNECT: "producer.connect"; - DISCONNECT: "producer.disconnect"; - REQUEST: "producer.network.request"; - REQUEST_TIMEOUT: "producer.network.request_timeout"; - REQUEST_QUEUE_SIZE: "producer.network.request_queue_size"; +export interface PartitionOffset { + partition: string; + offset: string; } -export type Producer = Sender & { - connect(): Promise; - disconnect(): Promise; - isIdempotent(): boolean; - events: ProducerEvents; - on(eventName: ValueOf, listener: (...args: any[]) => void): void; - transaction(): Promise; - logger(): Logger; -}; +export interface ConsumerSeekOptions { + topic: string; + partition: number; + offset: number; +} -export type Transaction = Sender & { - sendOffsets(offsets: Offsets & { consumerGroupId: string }): Promise; - commit(): Promise; - abort(): Promise; - isActive(): boolean; -}; - -export interface ConsumerGroup { - groupId: string; - generationId: number; +export interface GroupMemberMetadata { memberId: string; - coordinator: Broker; -} - -export interface MemberDescription { - clientHost: string; clientId: string; - memberId: string; - memberAssignment: Buffer; + clientHost: string; memberMetadata: Buffer; + memberAssignment: Buffer; } -export interface GroupDescription { +export interface GroupMetadata { + errorCode: number; groupId: string; - members: MemberDescription[]; - protocol: string; protocolType: string; + protocol: string; + members: GroupMemberMetadata[]; state: string; } -export interface TopicPartitions { topic: string; partitions: number[]; } +export interface Admin { + connect(): Promise; + disconnect(): Promise; -export interface Batch { + createTopics(options: AdminCreateTopicsOptions): Promise; + deleteTopics(options: AdminDeleteTopicsOptions): Promise; + getTopicMetadata(options: { + topics?: string[]; + }): Promise<{ topics: TopicMetadata[] }>; + + fetchOffsets( + options: AdminFetchOffsetsOptions + ): Promise; + resetOffsets(options: AdminResetOffsetsOptions): Promise; + setOffsets(options: AdminSetOffsetsOptions): Promise; + + describeConfigs( + options: AdminDescribeConfigsOptions + ): Promise; + alterConfigs(options: AdminAlterConfigsOptions): Promise; + + events: AdminEvents; + on( + event: AdminEvents[keyof AdminEvents], + cb: (e: InstrumentationEvent) => void + ): () => Admin; +} + +export interface AdminOptions { + retry?: RetryOptions; +} + +export interface AdminCreateTopicsOptions { + validateOnly?: boolean; + waitForLeaders?: boolean; + timeout?: number; + topics: AdminTopic[]; +} + +export interface AdminTopic { topic: string; + numPartitions?: number; + replicationFactor?: number; + replicaAssignment?: AdminTopicReplicaAssignment[]; + configEntries?: AdminTopicConfigEntry[]; +} + +export interface AdminTopicReplicaAssignment { partition: number; - highWatermark: string; - messages: KafkaMessage[]; - isEmpty(): boolean; - firstOffset(): string | null; - lastOffset(): string; - offsetLag(): string; + replicas: number[]; +} + +export interface AdminTopicConfigEntry { + name: string; + value: string; +} + +export interface AdminDeleteTopicsOptions { + timeout?: number; + topics: string[]; +} + +export interface AdminFetchOffsetsOptions { + groupId: string; + topic: string; +} + +export interface AdminResetOffsetsOptions { + groupId: string; + topic: string; + earliest?: boolean; +} + +export interface TopicMetadata { + name: string; + partitions: PartitionMetadata[]; +} + +export interface AdminDescribeConfigsOptions { + resources: ResourceConfigQuery[]; +} + +export interface ResourceConfigQuery { + type: ResourceTypes; + name: string; + configNames?: string[]; +} + +export interface AdminConfigDescription { + resources: AdminConfigDescriptionResource[]; + throttleTime: number; +} + +export interface AdminConfigDescriptionResource { + configEntries: AdminConfigDescriptionResourceConfigEntry[]; + errorCode: number; + errorMessage: string; + resourceName: string; + resourceType: ResourceTypes; +} + +export interface AdminConfigDescriptionResourceConfigEntry { + configName: string; + configValue: string; + isDefault: boolean; + isSensitive: boolean; + readOnly: boolean; +} + +export interface AdminAlterConfigsOptions { + validateOnly: boolean; + resources: ResourceConfigQuery[]; +} + +export interface ResourceConfigQuery { + type: ResourceTypes; + name: string; + configEntries: ResourceConfigEntry[]; +} + +export interface ResourceConfigEntry { + name: string; + value: string; +} + +export interface AdminAlterConfigReturn { + resources: AdminAlterConfigResource[]; + throttleTime: number; +} + +export interface AdminAlterConfigResource { + errorCode: number; + errorMessage: string; + resourceName: string; + resourceType: ResourceTypes; +} + +export interface AdminTopicOffset { + partition: number; + offset: string; +} + +export interface AdminSetOffsetsSeekEntry { + partition: number; + offset: string; +} + +export interface AdminSetOffsetsOptions { + groupId: string; + topic: string; + partitions: AdminSetOffsetsSeekEntry[]; +} + +export interface InstrumentationEvent { + id: number; + type: string; + timestamp: number; + payload: { [key: string]: any }; } export interface ConsumerEvents { @@ -493,110 +537,28 @@ export interface ConsumerEvents { GROUP_JOIN: "consumer.group_join"; FETCH: "consumer.fetch"; START_BATCH_PROCESS: "consumer.start_batch_process"; - END_BATCH_PROCESS: "consumer.end_batch_process"; + END_BATCH_PROCESS: "consumner.end_batch_process"; CONNECT: "consumer.connect"; DISCONNECT: "consumer.disconnect"; STOP: "consumer.stop"; CRASH: "consumer.crash"; - REQUEST: "consumer.network.request"; - REQUEST_TIMEOUT: "consumer.network.request_timeout"; - REQUEST_QUEUE_SIZE: "consumer.network.request_queue_size"; -} -export type ConsumerHeartbeatEvent = InstrumentationEvent<{ - groupId: string, - memberId: string, - groupGenerationId: number, -}>; -export type ConsumerCommitOffsetsEvent = InstrumentationEvent<{ - groupId: string, - memberId: string, - groupGenerationId: number, - topics: Array<{ - topic: string, - partitions: Array<{ - offset: string, - partition: string, - }> - }>, -}>; -// tslint:disable-next-line:interface-name -export interface IMemberAssignment { - [key: string]: number[]; -} -export type ConsumerGroupJoinEvent = InstrumentationEvent<{ - duration: number, - groupId: string, - isLeader: boolean, - leaderId: string, - groupProtocol: string, - memberId: string, - memberAssignment: IMemberAssignment; -}>; -export type ConsumerFetchEvent = InstrumentationEvent<{ - numberOfBatches: number, - duration: number, -}>; - -// tslint:disable-next-line:interface-name -export interface IBatchProcessEvent { - topic: string; - partition: number; - highWatermark: string; - offsetLag: string; - batchSize: number; - firstOffset: string; - lastOffset: string; -} -export type ConsumerStartBatchProcessEvent = InstrumentationEvent; -export type ConsumerEndBatchProcessEvent = InstrumentationEvent; -export type ConsumerCrashEvent = InstrumentationEvent<{ - error: Error, - groupId: string, -}>; - -export interface Consumer { - connect(): Promise; - disconnect(): Promise; - subscribe(topic: { topic: string; fromBeginning?: boolean }): Promise; - stop(): Promise; - run(config?: { - autoCommit?: boolean; - autoCommitInterval?: number | null; - autoCommitThreshold?: number | null; - eachBatchAutoResolve?: boolean; - partitionsConsumedConcurrently?: number; - eachBatch?: ( - batch: { - batch: Batch; - resolveOffset(offset: string): void; - heartbeat(): Promise; - commitOffsetsIfNecessary(offsets?: Offsets): Promise; - uncommittedOffsets(): Promise; - isRunning(): boolean; - }, - ) => Promise; - eachMessage?: (message: { topic: string; partition: number; message: KafkaMessage }) => Promise; - }): Promise; - seek(topicPartition: { topic: string; partition: number; offset: string }): void; - describeGroup(): Promise; - pause(topicPartitions: TopicPartitions[]): void; - resume(topicPartitions: TopicPartitions[]): void; - on(eventName: ValueOf, listener: (...args: any[]) => void): void; - logger(): Logger; - events: ConsumerEvents; + REQUEST: "consumer.request"; + REQUEST_TIMEOUT: "consumer.request_timeout"; + REQUEST_QUEUE_SIZE: "consumer.request_queue_size"; } -export enum CompressionTypes { - None = 0, - GZIP = 1, - Snappy = 2, - LZ4 = 3, - ZSTD = 4, +export interface ProducerEvents { + CONNECT: "producer.connect"; + DISCONNECT: "producer.disconnect"; + REQUEST: "producer.request"; + REQUEST_TIMEOUT: "producer.request_timeout"; + REQUEST_QUEUE_SIZE: "producer.request_queue_size"; } -export const CompressionCodecs: { - [CompressionTypes.GZIP]: () => any; - [CompressionTypes.Snappy]: () => any; - [CompressionTypes.LZ4]: () => any; - [CompressionTypes.ZSTD]: () => any; -}; +export interface AdminEvents { + CONNECT: "admin.connect"; + DISCONNECT: "admin.disconnect"; + REQUEST: "admin.request"; + REQUEST_TIMEOUT: "admin.request_timeout"; + REQUEST_QUEUE_SIZE: "admin.request_queue_size"; +} diff --git a/types/kafkajs/kafkajs-tests.ts b/types/kafkajs/kafkajs-tests.ts index 092cc887d8..15fe47deba 100644 --- a/types/kafkajs/kafkajs-tests.ts +++ b/types/kafkajs/kafkajs-tests.ts @@ -2,30 +2,42 @@ import * as fs from "fs"; import { Kafka, + AssignerProtocol, PartitionAssigners, logLevel, CompressionTypes, - CompressionCodecs + CompressionCodecs, + ResourceTypes, + PartitionAssigner, + LoggerMessage } from "kafkajs"; +const { MemberMetadata, MemberAssignment } = AssignerProtocol; const { roundRobin } = PartitionAssigners; // COMMON const host = "localhost"; const topic = "topic-test"; +const logger = (loggerMessage: LoggerMessage): void => { + console.log(`[${loggerMessage.namespace}] ${loggerMessage.log.message}`); +}; + const kafka = new Kafka({ logLevel: logLevel.INFO, brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`], clientId: "example-consumer", ssl: { + servername: "localhost", + rejectUnauthorized: false, ca: [fs.readFileSync("./testHelpers/certs/cert-signed", "utf-8")] }, sasl: { mechanism: "plain", username: "test", password: "testtest" - } + }, + logCreator: () => logger }); // CONSUMER @@ -35,20 +47,13 @@ const runConsumer = async () => { await consumer.connect(); await consumer.subscribe({ topic }); await consumer.run({ - eachBatch: async ({ commitOffsetsIfNecessary }) => { - commitOffsetsIfNecessary({ - topics: [{ - topic: 'topic-name', - partitions: [ - { partition: 0, offset: '500' } - ] - }] - }); - }, + // eachBatch: async ({ batch }) => { + // console.log(batch) + // }, eachMessage: async ({ topic, partition, message }) => { const prefix = `${topic}[${partition} | ${message.offset}] / ${ message.timestamp - }`; + }`; console.log(`- ${prefix} ${message.key}#${message.value}`); } }); @@ -91,17 +96,10 @@ runProducer().catch(e => console.error(`[example/producer] ${e.message}`, e)); const admin = kafka.admin({ retry: { retries: 10 } }); const runAdmin = async () => { - await admin.connect(); - await admin.fetchTopicMetadata({ - topic: 'string', - partitions: [] - }); - await admin.createTopics({ - topics: [{ topic, numPartitions: 10, replicationFactor: 1 }], - timeout: 30000, - waitForLeaders: true - }); - await admin.disconnect(); + await admin.connect(); + const { topics } = await admin.getTopicMetadata({}); + await admin.createTopics({ topics: [{ topic, numPartitions: 10, replicationFactor: 1}], timeout: 30000, waitForLeaders: true }); + await admin.disconnect(); }; runAdmin().catch(e => console.error(`[example/admin] ${e.message}`, e)); @@ -115,10 +113,39 @@ async () => { }); }; +// import SnappyCodec from "kafkajs-snappy"; const SnappyCodec: any = undefined; CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec; +const myCustomAssignmentArray = [0]; +const assignment: { [key: number]: { [key: string]: number[] } } = { + 0: { a: [0] } +}; +const MyPartitionAssigner: PartitionAssigner = ({ cluster: any }) => ({ + name: "MyPartitionAssigner", + version: 1, + async assign({ members, topics }) { + // perform assignment + return myCustomAssignmentArray.map(memberId => ({ + memberId, + memberAssignment: MemberAssignment.encode({ + version: this.version, + assignment: assignment[memberId] + }) + })); + }, + protocol({ topics }) { + return { + name: this.name, + metadata: MemberMetadata.encode({ + version: this.version, + topics + }) + }; + } +}); + kafka.consumer({ groupId: "my-group", - partitionAssigners: [roundRobin] + partitionAssigners: [MyPartitionAssigner, roundRobin] });