Flink SQL Runner - main

Introduction

This project aims to simplify the process of running SQL queries using Apache Flink on Kubernetes. Currently, in order to run SQL queries with Flink, you would need to run the Flink SQL client CLI or submit queries via a REST request to a Flink SQL Gateway instance.

This project provides a convenient wrapper application and container image for use with the Flink Kubernetes Operator’s FlinkDeployment custom resource, allowing you to specify your SQL queries as arguments.

Installation

In order to be able to deploy Flink SQL jobs the Flink Kubernetes Operator must be installed.

CertManager Installation

The operator installs a webhook that requires CertManager to function, so this needs to be installed first:

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.15.2/cert-manager.yaml
kubectl wait deployment --all  --for=condition=Available=True --timeout=300s -n cert-manager

First add the helm repository, if you haven’t already:

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/

Then install the helm chart for operator 1.10:

helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--set podSecurityContext=null \
--set defaultConfiguration."log4j-operator\.properties"=monitorInterval\=30 \
--set defaultConfiguration."log4j-console\.properties"=monitorInterval\=30 \
--set defaultConfiguration."flink-conf\.yaml"="kubernetes.operator.metrics.reporter.prom.factory.class\:\ org.apache.flink.metrics.prometheus.PrometheusReporterFactory
 kubernetes.operator.metrics.reporter.prom.port\:\ 9249 " \
-n <operator-namespace>

The helm installation will create the required service, role, and role-binding to run Flink jobs in the defined operator namespace. The Flink Kubernetes Operator, by default, watches all namespaces in the Kubernetes cluster. To run Flink jobs in a namespace other than the operator’s namespace, create a service account with the required permissions in that namespace.

The Flink documentation covers the RBAC design and requirements for the Flink Kubernetes Operator in detail. However, you can find an example service, role and role-binding in the install/namespace-rbac directory which you can use to setup a new application namespace:

kubectl -n <application-namespace> -f install/namespace-rbac

This creates a flink service account that must be referenced in the FlinkDeployment resource using the spec.serviceAccount property.

Writing Queries

For information on how Flink SQL supports writing queries, please refer to the upstream documentation.

Using secrets

You can use Kubernetes secrets with the Flink SQL Runner to provide security credentials to Flink job for connecting to the source or the target systems. Secrets can be directly templated in the SQL statements with the following pattern:

{{secret:<NAMESPACE>/<SECRET NAME>/<DATA KEY>}}

Special Characters

Note that semicolon ; is a special character used as a statement delimiter. If it’s part of your SQL statements, make sure it is escaped by \\. For example, it might be used when specifying a properties.sasl.jaas.config value for a Kafka connector configuration. In this case, it would look something like this:

'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"test-user\" password=\"{{secret:flink/test-user/user.password}}\"\\;'

Your SQL query can be submitted via the spec.job.args field of the FlinkDeployment custom resource. It should be formed of a single string within an array literal ([ ]). Multi-line yaml strings (using |,> characters) are not currently supported. However, newlines, tabs, and other whitespace characters within a single string are ignored, so queries can still be well-formatted. See the example below for an illustration of the formatting.

Example FlinkDeployment
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: standalone-etl
spec:
  image: quay.io/streamshub/flink-sql-runner:0.1.0
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/streamshub/flink-sql-runner.jar
    args: ["
        CREATE TABLE orders (
          order_number BIGINT,
          price DECIMAL(32,2),
          buyer ROW<first_name STRING,
          last_name STRING>,
          last_name STRING,
          order_time TIMESTAMP(3)
        ) WITH (
          'connector' = 'datagen'
        );
        CREATE TABLE print_table
        WITH (
          'connector' = 'print'
        )
        LIKE orders
        ;
        INSERT INTO print_table
          SELECT *
          FROM orders;
        "]
    parallelism: 1
    upgradeMode: stateless