diff --git a/notNeededPackages.json b/notNeededPackages.json index 359c65c9fa..9972063e13 100644 --- a/notNeededPackages.json +++ b/notNeededPackages.json @@ -1842,6 +1842,12 @@ "sourceRepoURL": "https://github.com/sindresorhus/junk", "asOfVersion": "3.0.0" }, + { + "libraryName": "kafkajs", + "typingsPackageName": "kafkajs", + "sourceRepoURL": "https://github.com/tulios/kafkajs", + "asOfVersion": "1.9.0" + }, { "libraryName": "keycloak-js", "typingsPackageName": "keycloak-js", diff --git a/types/kafkajs/index.d.ts b/types/kafkajs/index.d.ts deleted file mode 100644 index 96a05a527a..0000000000 --- a/types/kafkajs/index.d.ts +++ /dev/null @@ -1,564 +0,0 @@ -// Type definitions for kafkajs 1.8 -// Project: https://github.com/tulios/kafkajs, https://kafka.js.org -// Definitions by: Michal Kaminski -// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped -// TypeScript Version: 2.9 - -/// - -import * as tls from "tls"; - -export class Kafka { - constructor(options: KafkaOptions); - - producer(options?: ProducerOptions): Producer; - consumer(options?: ConsumerOptions): Consumer; - admin(options?: AdminOptions): Admin; -} - -export const PartitionAssigners: { - roundRobin: PartitionAssigner; -}; - -export namespace AssignerProtocol { - interface MemberMetadataOptions { - version: number; - topics: string[]; - userData?: Buffer; - } - - interface MemberMetadata { - encode(options: MemberMetadataOptions): Buffer; - decode(buffer: Buffer): MemberMetadataOptions; - } - - interface MemberAssignmentOptions { - version: number; - assignment: { [key: string]: number[] }; - userData?: Buffer; - } - - interface MemberAssignment { - encode(options: MemberAssignmentOptions): Buffer; - decode(buffer: Buffer): MemberAssignmentOptions; - } - - interface AssignerProtocolStatic { - MemberMetadata: MemberMetadata; - MemberAssignment: MemberAssignment; - } -} - -export const AssignerProtocol: AssignerProtocol.AssignerProtocolStatic; - -export enum CompressionTypes { - None = 0, - GZIP = 1, - Snappy = 2, - LZ4 = 3, - ZSTD = 4 -} - -export const CompressionCodecs: { [key in CompressionTypes]: () => any }; - -export enum ResourceTypes { - UNKNOWN = 0, - ANY = 1, - TOPIC = 2, - GROUP = 3, - CLUSTER = 4, - TRANSACTIONAL_ID = 5, - DELEGATION_TOKEN = 6 -} - -export interface LoggerMessage { - /** @var namespace Context from which the logger was called. */ - readonly namespace: string; - - /** @var level Logger level. */ - readonly level: logLevel; - - /** @var label Logger level label. */ - readonly label: string; - - /** @var log Content of the logger entry. */ - readonly log: LoggerMessageContent; -} - -export interface LoggerMessageContent { - /** @var timestamp Message timestamp. */ - readonly timestamp: Date; - - /** @var message Message sent to the logger. */ - readonly message: string; - - // Other possible fields in the content, that depend on the context. - [key: string]: any; -} - -export interface KafkaOptions { - clientId?: string; - brokers: string[]; - ssl?: tls.ConnectionOptions; - sasl?: SASLOptions; - connectionTimeout?: number; - requestTimeout?: number; - retry?: RetryOptions; - logLevel?: logLevel; - logCreator?: () => (message: LoggerMessage) => void; -} - -export interface SASLOptions { - mechanism: "plain" | "scram-sha-256" | "scram-sha-512"; - username: string; - password: string; -} - -export interface RetryOptions { - maxRetryTime?: number; - initialRetryTime?: number; - factor?: number; - multiplier?: number; - retries?: number; - maxInFlightRequests?: number | null; -} - -export enum logLevel { - NOTHING = 0, - ERROR = 1, - WARN = 2, - INFO = 4, - DEBUG = 5 -} - -export interface Producer { - connect(): Promise; - disconnect(): Promise; - - send(payload: MessagePayload): Promise; - sendBatch(payload: MessageBatchPayload): Promise; - - transaction(): Promise; - - events: ProducerEvents; - on( - event: ProducerEvents[keyof ProducerEvents], - cb: (e: InstrumentationEvent) => void - ): () => Producer; -} - -export interface ProducerOptions { - createPartitioner?: () => (options: { - topic: string; - partitionMetadata: PartitionMetadata[]; - message: ProducerMessage; - }) => number; - retry?: RetryOptions; - metadataMaxAge?: number; - allowAutoTopicCreation?: boolean; - transactionTimeout?: number; - idempotent?: boolean; -} - -export interface PartitionerPartitionMetadata { - partitionId: number; - leader: number; -} - -export interface PartitionMetadata { - partitionId: number; - leader: number; - partitionErrorCode?: number; - replicas?: number[]; - isr?: number[]; -} - -export interface MessagePayloadBase { - acks?: AcksBehaviour; - timeout?: number; - compression?: CompressionTypes; -} - -export interface MessagePayload extends MessagePayloadBase { - topic: string; - messages: ProducerMessage[]; - transactionTimeout?: number; - idempotent?: boolean; -} - -export interface MessageBatchPayload extends MessagePayloadBase { - topicMessages: ProducerTopicMessage[]; -} - -export interface ProducerMessage { - partition?: number; - key?: string; - value: string | Buffer | ArrayBuffer; - headers?: { [key: string]: string }; -} - -export interface ProducerTopicMessage { - topic: string; - messages: ProducerMessage[]; -} - -export enum AcksBehaviour { - All = -1, - No = 0, - Leader = 1 -} - -export interface Transaction { - send(payload: MessagePayload): Promise; - sendBatch(payload: MessageBatchPayload): Promise; - - sendOffsets(offsets: TransactionSendOffsets): Promise; - - commit(): Promise; - abort(): Promise; -} - -export interface TransactionSendOffsets { - consumerGroupId: string; - topics: TransactionSendOffsetsTopic[]; -} - -export interface TransactionSendOffsetsTopic { - topic: string; - partitions: TransactionSendOffsetsTopicPartitions[]; -} - -export interface TransactionSendOffsetsTopicPartitions { - partition: number; - offset: string; -} - -export interface Consumer { - connect(): Promise; - disconnect(): Promise; - - subscribe(options: ConsumerSubscribeOptions): Promise; - - run(options: ConsumerRunOptions): Promise; - - pause(topics: Array<{ topic: string }>): void; - resume(topics: Array<{ topic: string }>): void; - seek(options: ConsumerSeekOptions): void; - - describeGroup(): Promise; - - events: ConsumerEvents; - on( - event: ConsumerEvents[keyof ConsumerEvents], - cb: (e: InstrumentationEvent) => void - ): () => Consumer; -} - -export interface ConsumerOptions { - groupId: string; - partitionAssigners?: PartitionAssigner[]; - sessionTimeout?: number; - heartbeatInterval?: number; - metadataMaxAge?: number; - allowAutoTopicCreation?: boolean; - maxBytesPerPartition?: number; - minBytes?: number; - maxBytes?: number; - maxWaitTimeInMs?: number; - retry?: RetryOptions; - readUncommitted?: boolean; -} - -export interface PartitionAssigner { - ({ cluster }: { cluster: any /* TODO */ }): { - name: string; - version: number; - assign: (options: { - members: Array<{ memberId: string }>; - topics: any[]; - userData?: Buffer; - }) => Promise< - Array<{ - memberId: number; - memberAssignment: Buffer; - }> - >; - protocol?: (options: { - topics: any /* TODO */; - }) => { name: string; metadata: Buffer }; - }; -} - -export interface ConsumerRunOptions { - eachMessage?: (payload: ConsumerEachMessagePayload) => Promise; - eachBatch?: (payload: ConsumerEachBatchPayload) => Promise; - eachBatchAutoResolve?: boolean; - autoCommitInterval?: number; - autoCommitThreshold?: number; - autoCommit?: boolean; -} - -export interface ConsumerSubscribeOptions { - topic: string; - fromBeginning?: boolean; -} - -export interface ConsumerMessage { - timestamp: number; - key: string; - value: Buffer; - headers: { [key: string]: string }; - offset: number; -} - -export interface ConsumerBatch { - topic: string; - partition: number; - highWatermark: number; - messages: ConsumerMessage[]; -} - -export interface ConsumerEachMessagePayload { - topic: string; - partition: number; - message: ConsumerMessage; -} - -export interface ConsumerEachBatchPayload { - batch: ConsumerBatch; - resolveOffset: (offset: number) => Promise; - heartbeat: () => Promise; - isRunning: () => boolean; - commitOffsetsIfNecessary: ( - offsets?: OffsetsByTopicPartition - ) => Promise; - uncommittedOffsets: () => OffsetsByTopicPartition; -} - -export interface OffsetsByTopicPartition { - topics: TopicOffsets[]; -} - -export interface TopicOffsets { - partitions: PartitionOffset[]; -} - -export interface PartitionOffset { - partition: string; - offset: string; -} - -export interface ConsumerSeekOptions { - topic: string; - partition: number; - offset: number; -} - -export interface GroupMemberMetadata { - memberId: string; - clientId: string; - clientHost: string; - memberMetadata: Buffer; - memberAssignment: Buffer; -} - -export interface GroupMetadata { - errorCode: number; - groupId: string; - protocolType: string; - protocol: string; - members: GroupMemberMetadata[]; - state: string; -} - -export interface Admin { - connect(): Promise; - disconnect(): Promise; - - createTopics(options: AdminCreateTopicsOptions): Promise; - deleteTopics(options: AdminDeleteTopicsOptions): Promise; - getTopicMetadata(options: { - topics?: string[]; - }): Promise<{ topics: TopicMetadata[] }>; - - fetchOffsets( - options: AdminFetchOffsetsOptions - ): Promise; - resetOffsets(options: AdminResetOffsetsOptions): Promise; - setOffsets(options: AdminSetOffsetsOptions): Promise; - - describeConfigs( - options: AdminDescribeConfigsOptions - ): Promise; - alterConfigs(options: AdminAlterConfigsOptions): Promise; - - events: AdminEvents; - on( - event: AdminEvents[keyof AdminEvents], - cb: (e: InstrumentationEvent) => void - ): () => Admin; -} - -export interface AdminOptions { - retry?: RetryOptions; -} - -export interface AdminCreateTopicsOptions { - validateOnly?: boolean; - waitForLeaders?: boolean; - timeout?: number; - topics: AdminTopic[]; -} - -export interface AdminTopic { - topic: string; - numPartitions?: number; - replicationFactor?: number; - replicaAssignment?: AdminTopicReplicaAssignment[]; - configEntries?: AdminTopicConfigEntry[]; -} - -export interface AdminTopicReplicaAssignment { - partition: number; - replicas: number[]; -} - -export interface AdminTopicConfigEntry { - name: string; - value: string; -} - -export interface AdminDeleteTopicsOptions { - timeout?: number; - topics: string[]; -} - -export interface AdminFetchOffsetsOptions { - groupId: string; - topic: string; -} - -export interface AdminResetOffsetsOptions { - groupId: string; - topic: string; - earliest?: boolean; -} - -export interface TopicMetadata { - name: string; - partitions: PartitionMetadata[]; -} - -export interface AdminDescribeConfigsOptions { - resources: ResourceConfigQuery[]; -} - -export interface ResourceConfigQuery { - type: ResourceTypes; - name: string; - configNames?: string[]; -} - -export interface AdminConfigDescription { - resources: AdminConfigDescriptionResource[]; - throttleTime: number; -} - -export interface AdminConfigDescriptionResource { - configEntries: AdminConfigDescriptionResourceConfigEntry[]; - errorCode: number; - errorMessage: string; - resourceName: string; - resourceType: ResourceTypes; -} - -export interface AdminConfigDescriptionResourceConfigEntry { - configName: string; - configValue: string; - isDefault: boolean; - isSensitive: boolean; - readOnly: boolean; -} - -export interface AdminAlterConfigsOptions { - validateOnly: boolean; - resources: ResourceConfigQuery[]; -} - -export interface ResourceConfigQuery { - type: ResourceTypes; - name: string; - configEntries: ResourceConfigEntry[]; -} - -export interface ResourceConfigEntry { - name: string; - value: string; -} - -export interface AdminAlterConfigReturn { - resources: AdminAlterConfigResource[]; - throttleTime: number; -} - -export interface AdminAlterConfigResource { - errorCode: number; - errorMessage: string; - resourceName: string; - resourceType: ResourceTypes; -} - -export interface AdminTopicOffset { - partition: number; - offset: string; -} - -export interface AdminSetOffsetsSeekEntry { - partition: number; - offset: string; -} - -export interface AdminSetOffsetsOptions { - groupId: string; - topic: string; - partitions: AdminSetOffsetsSeekEntry[]; -} - -export interface InstrumentationEvent { - id: number; - type: string; - timestamp: number; - payload: { [key: string]: any }; -} - -export interface ConsumerEvents { - HEARTBEAT: "consumer.heartbeat"; - COMMIT_OFFSETS: "consumer.commit_offsets"; - GROUP_JOIN: "consumer.group_join"; - FETCH: "consumer.fetch"; - START_BATCH_PROCESS: "consumer.start_batch_process"; - END_BATCH_PROCESS: "consumner.end_batch_process"; - CONNECT: "consumer.connect"; - DISCONNECT: "consumer.disconnect"; - STOP: "consumer.stop"; - CRASH: "consumer.crash"; - REQUEST: "consumer.request"; - REQUEST_TIMEOUT: "consumer.request_timeout"; - REQUEST_QUEUE_SIZE: "consumer.request_queue_size"; -} - -export interface ProducerEvents { - CONNECT: "producer.connect"; - DISCONNECT: "producer.disconnect"; - REQUEST: "producer.request"; - REQUEST_TIMEOUT: "producer.request_timeout"; - REQUEST_QUEUE_SIZE: "producer.request_queue_size"; -} - -export interface AdminEvents { - CONNECT: "admin.connect"; - DISCONNECT: "admin.disconnect"; - REQUEST: "admin.request"; - REQUEST_TIMEOUT: "admin.request_timeout"; - REQUEST_QUEUE_SIZE: "admin.request_queue_size"; -} diff --git a/types/kafkajs/kafkajs-tests.ts b/types/kafkajs/kafkajs-tests.ts deleted file mode 100644 index 15fe47deba..0000000000 --- a/types/kafkajs/kafkajs-tests.ts +++ /dev/null @@ -1,151 +0,0 @@ -import * as fs from "fs"; - -import { - Kafka, - AssignerProtocol, - PartitionAssigners, - logLevel, - CompressionTypes, - CompressionCodecs, - ResourceTypes, - PartitionAssigner, - LoggerMessage -} from "kafkajs"; - -const { MemberMetadata, MemberAssignment } = AssignerProtocol; -const { roundRobin } = PartitionAssigners; - -// COMMON -const host = "localhost"; -const topic = "topic-test"; - -const logger = (loggerMessage: LoggerMessage): void => { - console.log(`[${loggerMessage.namespace}] ${loggerMessage.log.message}`); -}; - -const kafka = new Kafka({ - logLevel: logLevel.INFO, - brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`], - clientId: "example-consumer", - ssl: { - servername: "localhost", - rejectUnauthorized: false, - ca: [fs.readFileSync("./testHelpers/certs/cert-signed", "utf-8")] - }, - sasl: { - mechanism: "plain", - username: "test", - password: "testtest" - }, - logCreator: () => logger -}); - -// CONSUMER -const consumer = kafka.consumer({ groupId: "test-group" }); - -const runConsumer = async () => { - await consumer.connect(); - await consumer.subscribe({ topic }); - await consumer.run({ - // eachBatch: async ({ batch }) => { - // console.log(batch) - // }, - eachMessage: async ({ topic, partition, message }) => { - const prefix = `${topic}[${partition} | ${message.offset}] / ${ - message.timestamp - }`; - console.log(`- ${prefix} ${message.key}#${message.value}`); - } - }); - await consumer.disconnect(); -}; - -runConsumer().catch(e => console.error(`[example/consumer] ${e.message}`, e)); - -// PRODUCER -const producer = kafka.producer({ allowAutoTopicCreation: true }); - -const getRandomNumber = () => Math.round(Math.random() * 1000); -const createMessage = (num: number) => ({ - key: `key-${num}`, - value: `value-${num}-${new Date().toISOString()}` -}); - -const sendMessage = () => { - return producer - .send({ - topic, - compression: CompressionTypes.GZIP, - messages: Array(getRandomNumber()) - .fill(0) - .map(_ => createMessage(getRandomNumber())) - }) - .then(console.log) - .catch(e => console.error(`[example/producer] ${e.message}`, e)); -}; - -const runProducer = async () => { - await producer.connect(); - setInterval(sendMessage, 3000); - await producer.disconnect(); -}; - -runProducer().catch(e => console.error(`[example/producer] ${e.message}`, e)); - -// ADMIN -const admin = kafka.admin({ retry: { retries: 10 } }); - -const runAdmin = async () => { - await admin.connect(); - const { topics } = await admin.getTopicMetadata({}); - await admin.createTopics({ topics: [{ topic, numPartitions: 10, replicationFactor: 1}], timeout: 30000, waitForLeaders: true }); - await admin.disconnect(); -}; - -runAdmin().catch(e => console.error(`[example/admin] ${e.message}`, e)); - -// OTHERS -async () => { - await producer.send({ - topic: "topic-name", - compression: CompressionTypes.GZIP, - messages: [{ key: "key1", value: "hello world!" }] - }); -}; - -// import SnappyCodec from "kafkajs-snappy"; -const SnappyCodec: any = undefined; -CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec; - -const myCustomAssignmentArray = [0]; -const assignment: { [key: number]: { [key: string]: number[] } } = { - 0: { a: [0] } -}; -const MyPartitionAssigner: PartitionAssigner = ({ cluster: any }) => ({ - name: "MyPartitionAssigner", - version: 1, - async assign({ members, topics }) { - // perform assignment - return myCustomAssignmentArray.map(memberId => ({ - memberId, - memberAssignment: MemberAssignment.encode({ - version: this.version, - assignment: assignment[memberId] - }) - })); - }, - protocol({ topics }) { - return { - name: this.name, - metadata: MemberMetadata.encode({ - version: this.version, - topics - }) - }; - } -}); - -kafka.consumer({ - groupId: "my-group", - partitionAssigners: [MyPartitionAssigner, roundRobin] -}); diff --git a/types/kafkajs/tsconfig.json b/types/kafkajs/tsconfig.json deleted file mode 100644 index 1cab3080d1..0000000000 --- a/types/kafkajs/tsconfig.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "compilerOptions": { - "module": "commonjs", - "lib": [ - "es6" - ], - "noImplicitAny": true, - "noImplicitThis": true, - "strictNullChecks": true, - "baseUrl": "../", - "typeRoots": [ - "../" - ], - "types": [], - "noEmit": true, - "forceConsistentCasingInFileNames": true, - "strictFunctionTypes": true - }, - "files": [ - "index.d.ts", - "kafkajs-tests.ts" - ] -} diff --git a/types/kafkajs/tslint.json b/types/kafkajs/tslint.json deleted file mode 100644 index 3db14f85ea..0000000000 --- a/types/kafkajs/tslint.json +++ /dev/null @@ -1 +0,0 @@ -{ "extends": "dtslint/dt.json" }