kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.15.2/cert-manager.yaml
Flink SQL Runner - main
Introduction
This project aims to simplify the process of running SQL queries using Apache Flink on Kubernetes. Currently, in order to run SQL queries with Flink, you would need to run the Flink SQL client CLI or submit queries via a REST request to a Flink SQL Gateway instance.
This project provides a convenient wrapper application and container image for use with the Flink Kubernetes Operator’s FlinkDeployment
custom resource, allowing you to specify your SQL queries as arguments.
Installation
In order to be able to deploy Flink SQL jobs the Flink Kubernetes Operator must be installed.
CertManager Installation
The operator installs a webhook that requires CertManager to function, so this needs to be installed first:
kubectl wait deployment --all --for=condition=Available=True --timeout=300s -n cert-manager
Flink Kubernetes Operator Installation
First add the helm
repository, if you haven’t already:
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
Then install the helm chart for operator 1.10:
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--set podSecurityContext=null \
--set defaultConfiguration."log4j-operator\.properties"=monitorInterval\=30 \
--set defaultConfiguration."log4j-console\.properties"=monitorInterval\=30 \
--set defaultConfiguration."flink-conf\.yaml"="kubernetes.operator.metrics.reporter.prom.factory.class\:\ org.apache.flink.metrics.prometheus.PrometheusReporterFactory
kubernetes.operator.metrics.reporter.prom.port\:\ 9249 " \
-n <operator-namespace>
Deploying Flink jobs in other namespaces
The helm installation will create the required service, role, and role-binding to run Flink jobs in the defined operator namespace. The Flink Kubernetes Operator, by default, watches all namespaces in the Kubernetes cluster. To run Flink jobs in a namespace other than the operator’s namespace, create a service account with the required permissions in that namespace.
The Flink documentation covers the RBAC design and requirements for the Flink Kubernetes Operator in detail.
However, you can find an example service, role and role-binding in the install/namespace-rbac
directory which you can use to setup a new application namespace:
kubectl -n <application-namespace> -f install/namespace-rbac
This creates a flink
service account that must be referenced in the FlinkDeployment
resource using the spec.serviceAccount
property.
Writing Queries
For information on how Flink SQL supports writing queries, please refer to the upstream documentation.
Using secrets
You can use Kubernetes secrets with the Flink SQL Runner to provide security credentials to Flink job for connecting to the source or the target systems. Secrets can be directly templated in the SQL statements with the following pattern:
{{secret:<NAMESPACE>/<SECRET NAME>/<DATA KEY>}}
Special Characters
Note that semicolon ;
is a special character used as a statement delimiter. If it’s part of your SQL statements, make sure it is escaped by \\
.
For example, it might be used when specifying a properties.sasl.jaas.config
value for a Kafka connector configuration.
In this case, it would look something like this:
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"test-user\" password=\"{{secret:flink/test-user/user.password}}\"\\;'
Deploying a Flink SQL Query
Your SQL query can be submitted via the spec.job.args
field of the FlinkDeployment
custom resource.
It should be formed of a single string within an array literal ([ ]
).
Multi-line yaml strings (using |
,>
characters) are not currently supported.
However, newlines, tabs, and other whitespace characters within a single string are ignored, so queries can still be well-formatted.
See the example below for an illustration of the formatting.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: standalone-etl
spec:
image: quay.io/streamshub/flink-sql-runner:0.1.0
flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/streamshub/flink-sql-runner.jar
args: ["
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING,
last_name STRING>,
last_name STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
);
CREATE TABLE print_table
WITH (
'connector' = 'print'
)
LIKE orders
;
INSERT INTO print_table
SELECT *
FROM orders;
"]
parallelism: 1
upgradeMode: stateless