Below is the complete set of type definitions for the Kafka Magic Automation Script objects.
declare var Magic: KafkaMagic.Cluster;
declare namespace KafkaMagic {
interface MessageContext {
Timestamp: Date;
Topic: string;
Partition: number;
Offset: number;
SchemaId: number;
SchemaType: string;
Key: number[];
Headers: any;
Message: any;
Error: string;
interface Cluster {
* @returns true if script runs in validation mode.
validatingOnly: boolean;
* Creates a cluster configuration object `KafkaClusterConfiguration` by reading data from KafkaMagic configuration store (file or memory).
* Sensitive info removed, can't be used for connecting to cluster.
* @param clusterName string name of the cluster registered in KafkaMagic configuration store.
* @returns `KafkaClusterConfiguration` object retrieved from the configuration store.
getClusterConfig(clusterName: string): KafkaClusterConfiguration;
* Helper method to make cluster configuration object `KafkaClusterConfiguration` with `bootstrapServers` property set.
* You can set other configuration parameters and use this object in other commands in place of cluster name.
* @param bootstrapServers A list of brokers as a CSV list of `host` or `host:port` values. @example `'localhost:9092'`
* @returns `KafkaClusterConfiguration` object with `bootstrapServers` property set.
makeClusterConfig(bootstrapServers: string): KafkaClusterConfiguration;
* Retrieves brokers, topics metadata for all topics in the cluster.
* @param cluster string name of the cluster registered in KafkaMagic configuration store, or `KafkaClusterConfiguration` object defining all connection parameters for the cluster.
* @returns `Metadata` object
getClusterMetadata(cluster: string | KafkaClusterConfiguration): Metadata;
* Makes `Topic` object representing existing topic, to be used in searching and publishing operations.
* @param cluster string name of the cluster registered in KafkaMagic configuration store, or `KafkaClusterConfiguration` object defining all connection parameters for the cluster.
* @param topicName Name of the topic.
* @returns `Topic` object
getTopic(cluster: string | KafkaClusterConfiguration, topicName: string): Topic;
* Creates new topic in the cluster, returns `Topic` object representing created topic, to be used in searching and publishing operations.
* @param cluster string name of the cluster registered in KafkaMagic configuration store, or `KafkaClusterConfiguration` object defining all connection parameters for the cluster.
* @param topicName Name of the topic.
* @param partitions Number of partitions for this topic, optional, defaults to 1
* @param replicationFactor Number of replicas for this topic, optional, defaults to 1
* @param avroSchema JSON string or object representation of Avro schema, defaults to `null`
* @returns `Topic` object
createTopic(cluster: string | KafkaClusterConfiguration,
topicName: string,
partitions?: number,
replicationFactor?: number,
avroSchema?: any): Topic;
* Retrieves schema object from Schema Registry by subjectName.
* @param config string name of the cluster registered in KafkaMagic configuration store,
* or `SchemaRegistryConfiguration` object defining source schema registry connection.
* @param subjectName Subject the schema is registered against.
* @param version Optional version number of the schema, if not provided - gets latest version
* @returns object representation of the schema
getSchema(config: string | SchemaRegistryConfiguration, subjectName: string, version?: number): any;
* Retrieves schema object from Schema Registry by schemaId.
* @param config string name of the cluster registered in KafkaMagic configuration store,
* or `SchemaRegistryConfiguration` object defining source schema registry connection.
* @param schemaId Id of the schema.
* @returns object representation of the schema
getSchemaById(config: string | SchemaRegistryConfiguration, schemaId: number): any;
* Registers schema in the Schema Registry.
* @param config String name of the cluster registered in KafkaMagic configuration store,
* or `SchemaRegistryConfiguration` object defining target schema registry connection.
* @param topicName Subject the schema should be registered against.
* @param schema JSON string or object representation of the schema
* @param schemaType: string, allowed values: 'Avro', 'Json'
* @param compatibility: string, allowed values: 'NONE', 'FORWARD', 'BACKWARD', 'FULL', 'BACKWARD_TRANSITIVE', 'FORWARD_TRANSITIVE', 'FULL_TRANSITIVE'
* @returns schemaId
registerSchema(config: string | SchemaRegistryConfiguration, subjectName: string, schema: string, schemaType: string, compatibility: string): number;
* Deletes schema version from Schema Registry by subjectName and version. If version is not provided - deletes all versions.
* @param config string name of the cluster registered in KafkaMagic configuration store,
* or `SchemaRegistryConfiguration` object defining source schema registry connection.
* @param subjectName Subject the schema is registered against.
* @param version Optional version number of the schema, if not provided - deletes all versions
* @returns array of deleted schema Ids
deleteSchemaVersion(config: string | SchemaRegistryConfiguration, subjectName: string, version?: number): number[];
* Permanently deletes all schema versions from Schema Registry by subjectName.
* @param config string name of the cluster registered in KafkaMagic configuration store,
* or `SchemaRegistryConfiguration` object defining source schema registry connection.
* @param subjectName Subject the schema is registered against.
* @returns array of deleted schema Ids
deleteSchema(config: string | SchemaRegistryConfiguration, subjectName: string): number[];
* Adds arbitrary data to JSON array displayed in results window
* @param data Any object to be displayed in the results window
reportProgress(data: any): void;
* Delay execution
* @param time Number of milliseconds
delay(time: number): void;
* `Topic` object representing Topic configuration and related operations
interface Topic {
* Topic name
topicName: string;
* Name of the cluster registered in KafkaMagic configuration store,
* or `KafkaClusterConfiguration` object defining all connection parameters for the cluster
cluster: string | KafkaClusterConfiguration;
* Get topic metadata
* @returns `TopicMetadata` object
getMetadata(): TopicMetadata;
* Publish single message without providing a key or headers.
* If Avro serialization is requested, the message properties must conform to the schema.
* @param message object content of the message
* @param useSchema boolean true if message should be serialized using registered schema selected by provided subject or by default subject name strategy
* @param options PublishingOptions object: SchemaId, SchemaSubject, PartitionId, CompressionType, CompressionLevel
publishMessage(message: any, useSchema: boolean, options?: PublishingOptions): void;
* Publish single message with optional key, headers, and partition Id in a Context object.
* If Avro serialization is requested, the message must conform to the schema.
* @param message `MessageContext` object containing message, key, headers, and partitionId
* @param useSchema boolean true if message should be serialized using registered schema selected by provided subject or by default subject name strategy
* @param options PublishingOptions object: SchemaId, SchemaSubject, PartitionId, CompressionType, CompressionLevel
publishMessageContext(message: MessageContext, useSchema: boolean, options?: PublishingOptions): void;
* Publish multiple messages without providing keys or headers.
* If Avro serialization is requested, the message must conform to the schema.
* @param messages array of objects
* @param useSchema boolean true if message should be serialized using registered schema selected by provided subject or by default subject name strategy
* @param options PublishingOptions object: SchemaId, SchemaSubject, PartitionId, CompressionType, CompressionLevel
publishMessageArray(messages: any[], useSchema: boolean, options?: PublishingOptions): void;
* Publish multiple messages, each with a key, headers, and partition Id in a Context object.
* If Avro serialization is requested, the message must conform to the schema.
* @param messageContextArray Array of `MessageContext` objects, each containing message, key and headers
* @param useSchema boolean true if message should be serialized using registered schema selected by provided subject or by default subject name strategy
* @param options PublishingOptions object: SchemaId, SchemaSubject, PartitionId, CompressionType, CompressionLevel
publishMessageContextArray(messageContextArray: MessageContext[], useSchema: boolean, options?: PublishingOptions): void;
* Search for messages in the topic
* @param useSchema true if try to deserialize message using registered schema.
* @param maxResults Maximum number of messages to return.
* @param filter Callback function accepting `MessageContext` object and returning boolean value.
* @param options SearchOptions object defining search direction, timestamp limits, and PartitionOffset limits.
* For intellisense use helper objects `new KafkaMagic.SearchOptions();` and `new KafkaMagic.PartitionOffset(partitionId, minOffset, maxOffset);`
* @returns Array of `MessageContext` objects.
search(useSchema: boolean, maxResults: number, filter?: SearchFilterFunction, options?: SearchOptions): MessageContext[];
* Process messages in the topic using `processor` callback function.
* @param useSchema true if try to deserialize message using registered schema.
* @param maxResults Maximum number of messages to process.
* @param processor Callback function accepting `MessageContext` object. Any return value will be ignored
* @param options SearchOptions object defining search direction, timestamp limits, and PartitionOffset limits.
* For intellisense use helper objects `new KafkaMagic.SearchOptions();` and `new KafkaMagic.PartitionOffset(partitionId, minOffset, maxOffset);`
* @returns nothing.
process(useSchema: boolean, maxResults: number, processor?: ProcessorFunction, minTimestamp?: Date, maxTimestamp?: Date,
partitionOffsets?: PartitionOffset[]): void;
* Get topic statistics: min/max offsets and timestamps for each partition
* @returns Array of `PartitionStats` objects
getStats(): PartitionStats[];
* Delete topic and data
delete(): void;
type SearchFilterFunction = (Context: MessageContext) => boolean;
type ProcessorFunction = (Context: MessageContext) => void;
class SearchOptions {
* Search direction, by default accending - in order of messages published
descending: boolean;
* Date, string, or number - Minimum message timestamp to limit search scope to limit search scope by time
minTimestamp: Date;
* Date, string, or number - Maximum message timestamp to limit search scope to limit search scope by time
maxTimestamp: Date;
* Array of `PartitionOffset` objects to limit search scope by partitions and offsets
partitionOffsets: PartitionOffset[];
class PublishingOptions {
* Id of the registered schema used to serialize message
SchemaId: number;
* Subject of the registered schema used to serialize message
SchemaSubject: string;
* SubjectNameStrategy for the producer. Available values: `None`, `Topic`, `Record`, `TopicRecord`. Any other value considered `None`.
SubjectNameStrategy: string;
* Id of the partition to which the message should be published
PartitionId: number;
* Compression type. Allowed values: 'None', 'Gzip', 'Snappy', 'Lz4', 'Zstd'
CompressionType: string;
* Compression level. Allowed values depend on the Compression type
CompressionLevel: number;
class PartitionOffset {
PartitionId: number;
MinOffset: number;
MaxOffset: number;
interface Metadata {
Brokers: BrokerMetadata[];
Topics: TopicMetadata[];
OriginatingBrokerId: number;
OriginatingBrokerName: string;
interface BrokerMetadata {
BrokerId: number;
Host: string;
Port: number;
interface TopicMetadata {
Topic: string;
Partitions: PartitionMetadata[];
Error: MetadataError;
interface PartitionStats {
PartitionId: number;
MinOffset: number;
MinTimestamp: Date;
MaxOffset: number;
MaxTimestamp: Date;
interface PartitionMetadata {
PartitionId: number;
Leader: number;
Replicas: number[];
InSyncReplicas: number[];
Error: MetadataError;
interface MetadataError {
ErrorCode: number;
IsFatal: boolean;
Reason: string;
class SchemaRegistryConfiguration {
SchemaRegistryBasicAuthCredentialsSource: string;
SchemaRegistryUrl: string;
SchemaRegistryRequestTimeoutMs: number;
SchemaRegistryMaxCachedSchemas: number;
SchemaRegistryBasicAuthUserInfo: string;
AutoRegisterSchemas: boolean;
class KafkaClusterConfiguration {
ClusterId: string;
ClusterName: string;
SchemaRegistry: SchemaRegistryConfiguration;
LogConnectionClose: boolean;
InternalTerminationSignal: number;
ApiVersionRequest: boolean;
ApiVersionRequestTimeoutMs: number;
ApiVersionFallbackMs: number;
BrokerVersionFallback: string;
SecurityProtocol: string;
SslCipherSuites: string;
SslCurvesList: string;
SslSigalgsList: string;
SslKeyLocation: string;
SslKeyPassword: string;
SslCertificateLocation: string;
SslCaLocation: string;
SslCrlLocation: string;
SslKeystoreLocation: string;
SslKeystorePassword: string;
SaslKerberosServiceName: string;
SaslKerberosPrincipal: string;
SaslKerberosKinitCmd: string;
SaslKerberosKeytab: string;
SaslKerberosMinTimeBeforeRelogin: number;
SaslUsername: string;
LogThreadName: boolean;
LogQueue: boolean;
StatisticsIntervalMs: number;
ReconnectBackoffMaxMs: number;
SaslMechanism: string;
Acks: string;
ClientId: string;
BootstrapServers: string;
MessageMaxBytes: number;
MessageCopyMaxBytes: number;
ReceiveMessageMaxBytes: number;
MaxInFlight: number;
MetadataRequestTimeoutMs: number;
TopicMetadataRefreshIntervalMs: number;
MetadataMaxAgeMs: number;
SaslPassword: string;
TopicMetadataRefreshFastIntervalMs: number;
TopicBlacklist: string;
Debug: string;
SocketTimeoutMs: number;
SocketSendBufferBytes: number;
SocketReceiveBufferBytes: number;
SocketKeepaliveEnable: boolean;
SocketNagleDisable: boolean;
SocketMaxFails: number;
BrokerAddressTtl: number;
BrokerAddressFamily: string;
ReconnectBackoffMs: number;
TopicMetadataRefreshSparse: boolean;
PluginLibraryPaths: string;
FetchErrorBackoffMs: number;
FetchMinBytes: number;
FetchMaxBytes: number;
MaxPartitionFetchBytes: number;
FetchWaitMaxMs: number;
QueuedMaxMessagesKbytes: number;
QueuedMinMessages: number;
EnableAutoOffsetStore: boolean;
AutoCommitIntervalMs: number;
MaxPollIntervalMs: number;
EnablePartitionEof: boolean;
CoordinatorQueryIntervalMs: number;
GroupProtocolType: string;
HeartbeatIntervalMs: number;
SessionTimeoutMs: number;
PartitionAssignmentStrategy: string;
GroupId: string;
AutoOffsetReset: string;
ConsumeResultFields: string;
EnableAutoCommit: boolean;
CheckCrcs: boolean;
ConsumeBufferLength: number;