Itential adapter to connect to kafka

Downloads in past


0.3.116 months ago3 years agoMinified + gzip package size for @itentialopensource/adapter-kafka in KB


Kafka Adapter
This adapter is used to integrate the Itential Automation Platform (IAP) with the Kafka System. The API for Kafka is available at undefined API URL. The adapter utilizes the Kafka API to provide the integrations that are deemed pertinent to IAP. This ReadMe file is intended to provide information on this adapter.
>Note: It is possible that some integrations will be supported through the Kafka adapter while other integrations will not.
Itential provides information on all of its product adapters in the Customer Knowledge Base. Information in the Customer Knowledge Base is consistently maintained and goes through documentation reviews. As a result, it should be the first place to go for information.
For custom built adapters, it is a starting point to understand what you have built, provide the information for you to be able to update the adapter, and assist you with deploying the adapter into IAP.


Itential Product adapters utilize SemVer for versioning. The current version of the adapter can be found in the package.json file or viewed in the IAP GUI on the System page. For Open Source Adapters, the versions available can be found in the Itential OpenSource Repository.

Release History

Any release prior to 1.0.0 is a pre-release. Initial builds of adapters are generally set up as pre-releases as there is often work that needs to be done to configure the adapter and make sure the authentication process to Kafka works appropriately.
Release notes can be viewed in CHANGELOG.md or in the Customer Knowledge Base for Itential adapters.

Getting Started

These instructions will help you get a copy of the project on your local machine for development and testing. Reading this section is also helpful for deployments as it provides you with pertinent information on prerequisites and properties.

Environment Prerequisites

The following is a list of required packages for an adapter.

Adapter Prerequisites

The following list of packages are required for Itential product adapters or custom adapters that have been built utilizing the Itential Adapter Builder.
| Package | Description | | ------- | ------- | | @itentialopensource/adapter-utils | Runtime library classes for all adapters; includes request handling, connection, throttling, and translation. | | ajv | Required for validation of adapter properties to integrate with Kafka. | | fs-extra | Utilized by the node scripts that are included with the adapter; helps to build and extend the functionality. | | readline-sync | Utilized by the testRunner script that comes with the adapter; helps to test unit and integration functionality. |

Additional Prerequisites for Development and Testing

If you are developing and testing a custom adapter, or have testing capabilities on an Itential product adapter, you will need to install these packages as well.

Specific Prerequisites

At the current time the Kafka adapter does not utilize the adapter utilities as it makes use of the following library instead.
| Package | Description | | ------- | ------- | | kafka-node | Library that provides kafka connectivity through nodejs. |

Creating a Workspace

The following provides a local copy of the repository along with adapter dependencies.
git clone git@gitlab.com:\@itentialopensource/adapters/adapter-Kafka
npm install

Adapter Properties and Descriptions

This section defines all the properties that are available for the adapter, including detailed information on what each property is for. If you are not using certain capabilities with this adapter, you do not need to define all of the properties. An example of how the properties for this adapter can be used with tests or IAP are provided in the Installation section.
  "host": "localhost",
  "port": 9092,
  "interval_time": 5000,
  "stub": false,
  "client": {
    "connectTimeout": 10000,
    "requestTimeout": 30000,
    "autoConnect": true,
    "connectRetryOptions": {},
    "idleConnection": 300000,
    "reconnectOnIdle": true,
    "maxAsyncRequests": 10,
    "sslOptions": {},
    "sasl": {}
  "producer": {
    "requireAcks": 1,
    "ackTimeoutMs": 100,
    "partitionerType": 0
  "consumer": {
    "groupId": "kafka-node-group",
    "autoCommit": false,
    "autoCommitIntervalMs": 5000,
    "fetchMaxWaitMs": 100,
    "fetchMinBytes": 1,
    "fetchMaxBytes": 1048576,
    "fromOffset": true,
    "encoding": "utf8",
    "keyEncoding": "utf8"

Topic Properties

Operator can configure message filtering per topic by providing filters in topics[*].subscriberInfo[*].filters property. Regular expressions are accepted, guide on how to build regular expressions: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/RegularExpressions Operator shall stick to simple regular expressions as complex ones could cause catastrophic backtracking problem https://www.regular-expressions.info/catastrophic.html leading to adapter being unresponsive.
Example of adapter's configuration with filtering applied:
"topics": [
    "name": "t1",
    "always": true,
    "subscriberInfo": [
        "subname": "default",
        "filters": [
          "F: (\\w+), L: (\\w+)",
        "rabbit": "kafka",
        "throttle": {}

When operator skips to provide filters property or the property contains no actual filters set, then all messages are passed.

Connection Properties

These base properties are used to connect to Kafka upon the adapter initially coming up. It is important to set these properties appropriately. Kafka server location is set either by providing (host, port) tuple or by providing hostList property
| Property | Description | | ------- | ------- | | host | Optional. A fully qualified domain name or IP address.| | port | Required if host set. Used to connect to the server.| | hostList | Optional. A string of kafka broker/host combination delimited by comma| | interval
time | Optional. The Kafka adapter keeps information about topics and offsets in memory in order to be more efficient. In order to work across restarts the adapter must persist the data. So the data is written into the .topics.json file. This write time defines how often to write the file.| | stub | Optional. Slightlu different meaning than normal, this is just telling the adapter whether to run the write topics interval. This should always be false unless you are running standalone tests.|


When adapter works in consumer configuration: autoCommit:false fromOffset:true current offset setting on adapter startup is loaded from .topic.json file for each (topic:partition). Setting of property interval_time affects adapter's behaviour after adapter restart. If adapter is restarted before .topics.json is updated with latest offset for given (topic:partition), then after adapter goes up, consumer offsets are set based on .topic.json content or outOfRange resolution. Depending on the offset being set all messages present on kafka server with offset=(consumer offset + 1) will be read and emitted to subscriber (e.g. OperationManager) after restart. This can trigger duplicated jobs run by OperationManager. To avoid that, operator shall wait interval_time after last message read by consumer before restarting adapter to avoid duplicated jobs.

Client Properties

The following properties are used to define the Kafka Client. These properties all have default values in the adapter and in Kafka. Definitions are taken from kafka-node page as these properties are directly passed to Kafka.
| Property | Description | | ------- | ------- | | connectTimeout | ms it takes to wait for a successful connection before moving to the next host.| | requestTimeout | ms for a kafka request to timeout .| | autoConnect | automatically connect when KafkaClient is instantiated otherwise you need to manually call connect.| | connectRetryOptions | object hash that applies to the initial connection.| | idleConnection | ms before allowing the broker to disconnect an idle connection from a client. | | reconnectOnIdle | when the connection is closed due to client idling, client will attempt to auto-reconnect.| | maxAsyncRequests | maximum async operations at a time toward the kafka cluster..| | sslOptions | Object, options to be passed to the tls broker sockets, ex. { rejectUnauthorized: false }.| | sasl | Object, SASL authentication configuration (only SASL/PLAIN is currently supported), ex. { mechanism: 'plain', username: 'foo', password: 'bar' }.|

Producer Properties

The following properties are used to define the Kafka Producer. These properties all have default values in Kafka. Definitions are taken from kafka-node page as these properties are directly passed to Kafka.
| Property | Description | | ------- | ------- | | requireAcks | Configuration for when to consider a message as acknowledged, default 1. | | ackTimeoutMs | The amount of time in milliseconds to wait for all acks before considered, default 100ms.| | partitionerType | Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0.|

Consumer Properties

The following properties are used to define the Kafka Consumer. These properties all have default values in Kafka. Definitions are taken from kafka-node page as these properties are directly passed to Kafka.
| Property | Description | | ------- | ------- | | groupId | consumer group id, default kafka-node-group.| | autoCommit | Auto commit config.| | autoCommitIntervalMs | | | fetchMaxWaitMs | max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms| | fetchMinBytes | minimum number of bytes of messages that must be available to give a response, default 1 byte.| | fetchMaxBytes | maximum bytes to include in the message set for this partition. This helps bound the size of the response.| | fromOffset | If set true, consumer will fetch message from the given offset in the payloads.| | encoding | If set to 'buffer', values will be returned as raw buffer objects..| | keyEncoding | |

Testing an Itential Product Adapter

Mocha is generally used to test all Itential Product Adapters. There are unit tests as well as integration tests performed. Integration tests can generally be run as standalone using mock data and running the adapter in stub mode, or as integrated. When running integrated, every effort is made to prevent environmental failures, however there is still a possibility.

Unit Testing

Unit Testing includes testing basic adapter functionality as well as error conditions that are triggered in the adapter prior to any integration. There are two ways to run unit tests. The prefered method is to use the testRunner script; however, both methods are provided here.
node utils/testRunner --unit

npm run test:unit

To add new unit tests, edit the test/unit/adapterTestUnit.js file. The tests that are already in this file should provide guidance for adding additional tests.

Integration Testing - Standalone

Standalone Integration Testing requires mock data to be provided with the entities. If this data is not provided, standalone integration testing will fail. When the adapter is set to run in stub mode (setting the stub property to true), the adapter will run through its code up to the point of making the request. It will then retrieve the mock data and return that as if it had received that data as the response from Kafka. It will then translate the data so that the adapter can return the expected response to the rest of the Itential software. Standalone is the default integration test.
Similar to unit testing, there are two ways to run integration tests. Using the testRunner script is better because it prevents you from having to edit the test script; it will also resets information after testing is complete so that credentials are not saved in the file.
node utils/testRunner
  answer no at the first prompt

npm run test:integration

To add new integration tests, edit the test/integration/adapterTestIntegration.js file. The tests that are already in this file should provide guidance for adding additional tests.

Integration Testing

Integration Testing requires connectivity to Kafka. By using the testRunner script it prevents you from having to edit the integration test. It also resets the integration test after the test is complete so that credentials are not saved in the file.
Note: These tests have been written as a best effort to make them work in most environments. However, the Adapter Builder often does not have the necessary information that is required to set up valid integration tests. For example, the order of the requests can be very important and data is often required for creates and updates. Hence, integration tests may have to be enhanced before they will work (integrate) with Kafka. Even after tests have been set up properly, it is possible there are environmental constraints that could result in test failures. Some examples of possible environmental issues are customizations that have been made within Kafka which change order dependencies or required data.

node utils/testRunner
answer yes at the first prompt
answer all other questions on connectivity and credentials

Test should also be written to clean up after themselves. However, it is important to understand that in some cases this may not be possible. In addition, whenever exceptions occur, test execution may be stopped, which will prevent cleanup actions from running. It is recommended that tests be utilized in dev and test labs only.
Reminder: Do not check in code with actual credentials to systems.

Installing an Itential Product Adapter

  1. Set up the name space location in your IAP nodemodules.

cd /opt/pronghorn/current/node_modules
if the @itentialopensource directory does not exist, create it:
   mkdir @itentialopensource

  1. Clone the adapter into your IAP environment.

cd \@itentialopensource
git clone git@gitlab.com:\@itentialopensource/adapters/adapter-Kafka

  1. Install the dependencies for the adapter.

cd adapter-Kafka
npm install

  1. Add the adapter properties for Kafka (created from Adapter Builder) to the properties.json file for your Itential build. You will need to change the credentials and possibly the host information below.
Kafka sample properties
  1. Restart IAP

systemctl restart pronghorn

Installing a Custom Adapter

If you built this as a custom adapter through the Adapter Builder, it is recommended you go through setting up a development environment and testing the adapter before installing it. There is often configuration and authentication work that is required before the adapter will work in IAP.
  1. Move the adapter into the IAP node_modules directory.

Depending on where your code is located, this process is different.
    Could be a tar, move, untar
    Could be a git clone of a repository
    Could also be a cp -R from a coding directory
Adapter should be placed into: /opt/pronghorn/current/node_modules/\@itentialopensource

  1. Follow Steps 3-5 (above) to install an Itential adapter to load your properties, dependencies and restart IAP.

Using this Adapter

The adapter.js file contains the calls the adapter makes available to the rest of the Itential Platform. The API detailed for these calls should be available through JSDOC. The following is a brief summary of the calls.

Generic Adapter Calls

The connect call is run when the Adapter is first loaded by he Itential Platform. It validates the properties have been provided correctly.

Insures that the adapter can communicate with Kafka. The actual call that is used is defined in the adapter properties.

Specific Adapter Calls

Specific adapter calls are built based on the API of the Kafka. The Adapter Builder creates the proper method comments for generating JS-DOC for the adapter. This is the best way to get information on the calls.

Troubleshooting the Adapter

Connectivity Issues

  1. Verify the adapter properties are set up correctly.

Go into the Itential Platform GUI and verify/update the properties

  1. Verify there is connectivity between the Itential Platform Server and Kafka Server.

ping the ip address of Kafka server
try telnet to the ip address port of Kafka

  1. Verify the credentials provided for Kafka.

login to Kafka using the provided credentials

  1. Verify the API of the call utilized for Kafka Healthcheck.

Go into the Itential Platform GUI and verify/update the properties

Functional Issues

Adapter logs are located in /var/log/pronghorn. In older releases of the Itential Platform, there is a pronghorn.log file which contains logs for all of the Itential Platform. In newer versions, adapters are logging into their own files.

Contributing to Kafka

Please check out the Contributing Guidelines.

License & Maintainers

Maintained By

Itential Product Adapters are maintained by the Itential Adapter Team.
Itential OpenSource Adapters are maintained by the community at large.
Custom Adapters are maintained by other sources.

Product License

Apache 2.0