Provided with Professional license.
Write JavaScript code (ECMAScript 5.1) to manipulate topics and messages across Kafka clusters. See Script Reference for exact definitions of all objects.
After entering your script into the editor window it is recommended to run it in validation mode - click ‘Validate Script’ button.
In validation mode no changes are made in the cluster, only metadata read-only operations are executed, for all other operations only parameter validations are performed.
The script execution will be stopped after the timeout expires. Default timeout period is 1800 seconds (30 minutes). You can configure different timeout value for the script execution.
For Docker container use Environment variable KMAGIC_SCRIPT_TIMEOUT_SEC
For desktop app use configuration parameter SCRIPT_TIMEOUT_SEC
in the appsettings.json
file.
To perform an operation on a Kafka cluster you need to configure a connection to the cluster. Many Automation Script functions accept cluster connection configuration in one of two forms:
Cluster Name example:
var metadata = Magic.getClusterMetadata('My cluster');
Explicit Configuration Object example:
var config = {
"BootstrapServers": "localhost:9092",
"SchemaRegistry": {
"SchemaRegistryUrl": "http://localhost:8081"
}
};
var metadata = Magic.getClusterMetadata(config);
Any object returned from the script will be displayed in the Results window in JSON format.
Use global Magic object to perform cluster-wide operations and to get a topic reference.
Boolean property returning true if the script is running in validation mode.
You can inspect this property, for example, when you need to bypass a part of the script which would otherwise throw an exception in read-only validation.
if (Magic.validatingOnly) {
Magic.reportProgress('Validating.');
}
else {
Magic.reportProgress('Executing for real.')
}
return 'Done!';
Helper function making cluster configuration object KafkaClusterConfiguration
, optionally with bootstrapServers property set.
This convenience function actually doesn’t do anything, it just returns an object which will work with IntelliSense autocomplete suggestions in the editor.
You can set other configuration properties and use this object in other commands where cluster configuration is required.
Use this function if you want to manage cluster which is not registered in the Kafka Magic app’s configuration store.
// make and populate configuration object:
var config = Magic.makeClusterConfig('localhost:9092');
config.SchemaRegistry.SchemaRegistryUrl = 'http://localhost:8081';
// use it in other commands:
var meta = Magic.getClusterMetadata(config);
return meta;
Returns a cluster configuration object KafkaClusterConfiguration
created by reading data from Kafka Magic app’s configuration store (file or memory).
Because sensitive info (secrets) is removed, the resulting object can’t be used “as-is” for connecting to the cluster.
var config = Magic.getClusterConfig('My cluster');
return config;
Retrieves brokers, topics metadata for all topics in the cluster.
var metadata = Magic.getClusterMetadata('My cluster');
return metadata;
Gets Topic
reference object representing existing topic, to be used in topic-level operations.
var topic = Magic.getTopic('My cluster', 'my-topic');
Creates new topic in the cluster, optionally registers Avro schema for the topic, returns Topic
reference object representing created topic, to be used in topic-level operations. If Avro schema registration is requested, the cluster configuration must contain Schema Registry connection parameters. Can take several seconds to complete.
var topic = Magic.createTopic('My cluster', 'new-topic', 4);
return 'Topic created!';
Creates new topic in the cluster copying number of partitions and Avro schema from a source topic, optionally from a different cluster.
Returns Topic
reference object representing created topic.
Topic
reference object - containing topicName and cluster configuration defining source topic to be cloned.var sourceTopic = Magic.getTopic('My cluster', 'old-topic');
var topic = Magic.createTopicClone('My cluster', 'new-topic', sourceTopic);
return 'Topic created: ' + topic.topicName;
Retrieves Avro schema object from an Avro schema registry by subject (topic name).
SchemaRegistryConfiguration
object defining source Avro schema registry connection.var schema = Magic.getAvroSchema('My cluster', 'my-avro-topic');
return schema;
var registryConfig = {
"SchemaRegistryUrl": "http://localhost:8081"
};
var schema = Magic.getAvroSchema(registryConfig, 'my-avro-topic');
return schema;
Registers Avro schema in the Avro schema registry.
SchemaRegistryConfiguration
object defining Avro schema registry connection.var schema = Magic.getAvroSchema('My cluster', 'some-avro-topic'); // reading existing schema
var newField = {
name: 'MyNewField',
type: 'int',
doc: 'Some random number'
};
schema.fields.push(newField); // modifying schema by adding new field
var newSubject = 'new-topic'; // target topic
Magic.registerAvroSchema('My cluster', newSubject, schema); // registering schema for target topic
Magic.reportProgress('Registered new schema, trying to read it.');
var newSchema = Magic.getAvroSchema('My cluster', newSubject);
return newSchema; // displaying new schema in Results window
Use this function to display arbitrary data in the Results window during script execution.
Magic.reportProgress('Display this string in the Results window');
Use this function to delay script execution for the specified number of milliseconds, for example, when you have to wait for a change to take effect.
Magic.reportProgress(new Date());
Magic.delay(5000);
return new Date();
Topic
reference object represents existing topic, is used in topic-level operations. You get this object as a result of some methods of the Magic
global object, like: getTopic(), createTopic(), createTopicClone().
var topic = Magic.getTopic('My cluster', 'new-topic');
try {
topic.getMetadata(); // throws exception if topic doesn't exist
Magic.reportProgress("Topic exists, delete it.");
topic.delete();
return "Topic 'new-topic' deleted.";
}
catch (e) {
Magic.reportProgress("Topic not found, create it.");
topic = Magic.createTopic('My cluster', 'new-topic');
return "Topic 'new-topic' created.";
}
Returns topic metadata object, throws exception if topic doesn’t exist.
var topic = Magic.getTopic('My cluster', 'some-topic');
var meta = topic.getMetadata();
return meta;
You can publish (produce) a JSON or Avro serialized messages to a Kafka topic using topic reference object. You have options to provide a message content only, or putting a message in the Context, containing content, headers, a key, and optionally a partition id. You can publish a single message or an array of messages in a single step, can also set message compression type and level.
Publish a single message without providing a key or headers. If Avro serialization is requested, the message properties must conform to the schema.
var topic = Magic.getTopic('My cluster', 'my-topic');
var message = {
"Field1": 123.45,
"Field2": "test message"
};
// publish
topic.publishMessage(message);
return message;
Publish single message with optional key, headers, and partition Id in a Context object. Partition provided in a message context overrides the PartitionId provided in the options parameter. If Avro serialization is requested, the message must conform to the schema.
var topic = Magic.getTopic('My cluster', 'my-topic');
var messageContext = {
"Message": {
"Field1": 123.45,
"Field2": "test message"
},
"Headers": {
"MyHeader1": "string1",
"MyHeader2": "string2"
},
"Key": [77, 78, 79],
"Partition": null
};
// publish
topic.publishMessageContext(messageContext);
return messageContext;
Publish multiple messages without providing a key or headers. If Avro serialization is requested, the message properties must conform to the schema.
var topic = Magic.getTopic('My cluster', 'my-topic');
var messages = [
{
"Field1": 123.45,
"Field2": "test message 1"
},
{
"Field1": 234.56,
"Field2": "test message 2"
}
];
topic.publishMessageArray(messages);
return messages;
Publish multiple messages, each with a key, headers, and partition id in a Context object. Partition provided in a message context overrides the PartitionId provided in the options parameter. If Avro serialization is requested, the message must conform to the schema.
var topic = Magic.getTopic('My cluster', 'my-topic');
var messageContextArr = [
{
"Message": {
"Field1": 123.45,
"Field2": "test message 1"
},
"Headers": {
"MyHeader1": "string1-1",
"MyHeader2": "string2-1"
},
"Key": [77, 78, 79],
"Partition": null
},
{
"Message": {
"Field1": 234.56,
"Field2": "test message 2"
},
"Headers": {
"MyHeader1": "string1-2",
"MyHeader2": "string2-2"
},
"Key": [77, 78, 79],
"Partition": null
}
];
// publish
topic.publishMessageContextArray(messageContextArr);
return messageContextArr;
Whether it is Json or Avro serialized message you can search for it using JavaScript function referencing any combination of the message fields, headers, and metadata.
Searches for messages in the topic. Returns array of MessageContext
objects.
MessageContext
object and returning boolean value.PartitionOffset
objects - to limit search scope by partitions and offsets. Optional.PartitionOffset type:
interface PartitionOffset {
PartitionId: number;
MinOffset: number;
MaxOffset: number;
}
Search using JavaScript query function:
var topic = Magic.getTopic('My cluster', 'my-topic');
var searchFilter = function (context) {
return context.Headers.MyHeader1 == 'string1' && context.Message.Field1 > 100;
}
var foundMessages = topic.search(false, 10, searchFilter);
return foundMessages;
Search using timestamp range parameters: minTimestamp
and maxTimestamp
:
var topic = Magic.getTopic('My cluster', 'my-topic');
var foundMessages = topic.search(
false,
10,
function (context) { return true; },
'2020-02-20T13:14:32',
'2020-02-20T14:00:00');
return foundMessages;
Search in a single partition while limiting search scope by minimum offset:
var topic = Magic.getTopic('My cluster', 'my-topic');
var messages = topic.search(false, 10, null, null, null, [{ PartitionId: 2, MinOffset: 3 }]);
return foundMessages;
Gets topic statistics: min/max offsets and timestamps for each partition. Returns Array of PartitionStats
objects.
var topic = Magic.getTopic('My cluster', 'my-topic');
var stats = topic.getStats();
return stats;
Deletes the topic and all data in it. Can take several seconds to complete.
var topic = Magic.getTopic('My cluster', 'my-topic');
// delete topic
topic.delete();