QBoard » Big Data » Big Data on Cloud » GCP: What is the best option to setup a periodic Data pipeline from Spanner to Big Query

GCP: What is the best option to setup a periodic Data pipeline from Spanner to Big Query

  • Task: We have to setup a periodic sync of records from Spanner to Big Query. Our Spanner database has a relational table hierarchy.

    Option Considered I was thinking of using Dataflow templates to setup this data pipeline.

    • Option1: Setup a job with Dataflow template 'Cloud Spanner to Cloud Storage Text' and then another with Dataflow template 'Cloud Storage Text to BigQuery'. Con: The first template works only on a single table and we have many tables to export.

    • Option2: Use 'Cloud Spanner to Cloud Storage Avro' template which exports the entire database. Con: I only need to export selected tables within a database and I don't see a template to import Avro into Big Query.

    Questions: Please suggest what is the best option for setting up this pipeline

      September 16, 2021 2:16 PM IST
    0
  • There is currently no off-the-shelf parameterized direct export from Cloud Spanner to BigQuery.

    To meet your requirements, a custom dataflow job (spanner dataflow connectordataflow templates) scheduled periodically (12) would be the best bet. Incremental exports would require implementing change tracking in you database which can be done with commit timestamps.

    For a no-code solution, you would have to relax your requirements and bulk export all tables periodically to Cloud Storage and bulk import them periodically into BigQuery. You could use a combination of a periodic trigger of an export from Cloud Spanner to Cloud Storage and schedule a periodic import from Cloud Storage to BigQuery.

      September 27, 2021 1:54 PM IST
    0
  • Use a single Dataflow pipeline to do it in one shot/pass. Here's an example I wrote using the Java SDK to help get you started. It reads from Spanner, transforms it to a BigQuery TableRow using a ParDo, and then writes to BigQuery at the end. Under the hood it's using GCS, but that's all abstracted away from you as a user.

    enter image description here


    package org.polleyg;
    
    import com.google.api.services.bigquery.model.TableFieldSchema;
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.spanner.Struct;
    import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
    import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE;
    
    /**
     * Do some randomness
     */
    public class TemplatePipeline {
        public static void main(String[] args) {
            PipelineOptionsFactory.register(DataflowPipelineOptions.class);
            DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);
            Pipeline pipeline = Pipeline.create(options);
            PCollection<Struct> records = pipeline.apply("read_from_spanner",
                    SpannerIO.read()
                            .withInstanceId("spanner-to-dataflow-to-bq")
                            .withDatabaseId("the-dude")
                            .withQuery("SELECT * FROM Singers"));
            records.apply("convert-2-bq-row", ParDo.of(new DoFn<Struct, TableRow>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    TableRow row = new TableRow();
                    row.set("id", c.element().getLong("SingerId"));
                    row.set("first", c.element().getString("FirstName"));
                    row.set("last", c.element().getString("LastName"));
                    c.output(row);
                }
            })).apply("write-to-bq", BigQueryIO.writeTableRows()
                    .to(String.format("%s:spanner_to_bigquery.singers", options.getProject()))
                    .withCreateDisposition(CREATE_IF_NEEDED)
                    .withWriteDisposition(WRITE_TRUNCATE)
                    .withSchema(getTableSchema()));
            pipeline.run();
        }
    
        private static TableSchema getTableSchema() {
            List<TableFieldSchema> fields = new ArrayList<>();
            fields.add(new TableFieldSchema().setName("id").setType("INTEGER"));
            fields.add(new TableFieldSchema().setName("first").setType("STRING"));
            fields.add(new TableFieldSchema().setName("last").setType("STRING"));
            return new TableSchema().setFields(fields);
        }
    }

     

    Output logs:

    00:10:54,011 0    [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BatchLoads - Writing BigQuery temporary files to gs://spanner-dataflow-bq/tmp/BigQueryWriteTemp/beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12/ before loading them.
    00:10:59,332 5321 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://spanner-dataflow-bq/tmp/BigQueryWriteTemp/beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12/c374d44a-a7db-407e-aaa4-fe6aa5f6a9ef.
    00:11:01,178 7167 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.WriteTables - Loading 1 files into {datasetId=spanner_to_bigquery, projectId=grey-sort-challenge, tableId=singers} using job {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge}, attempt 0
    00:11:02,495 8484 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Started BigQuery job: {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge}.
    bq show -j --format=prettyjson --project_id=grey-sort-challenge beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0
    00:11:02,495 8484 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.WriteTables - Load job {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge} started
    00:11:03,183 9172 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Still waiting for BigQuery job beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, currently in status {"state":"RUNNING"}
    bq show -j --format=prettyjson --project_id=grey-sort-challenge beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0
    00:11:05,043 11032 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - BigQuery job {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge} completed in state DONE
    00:11:05,044 11033 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.WriteTables - Load job {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge} succeeded. Statistics: {"completionRatio":1.0,"creationTime":"1559311861461","endTime":"1559311863323","load":{"badRecords":"0","inputFileBytes":"81","inputFiles":"1","outputBytes":"45","outputRows":"2"},"startTime":"1559311862043","totalSlotMs":"218","reservationUsage":[{"name":"default-pipeline","slotMs":"218"}]}

     

    Use a single Dataflow pipeline to do it in one shot/pass. Here's an example I wrote using the Java SDK to help get you started. It reads from Spanner, transforms it to a BigQuery TableRow using a ParDo, and then writes to BigQuery at the end. Under the hood it's using GCS, but that's all abstracted away from you as a user.

    enter image description here

    package org.polleyg;
    
    import com.google.api.services.bigquery.model.TableFieldSchema;
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.spanner.Struct;
    import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
    import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE;
    
    /**
     * Do some randomness
     */
    public class TemplatePipeline {
        public static void main(String[] args) {
            PipelineOptionsFactory.register(DataflowPipelineOptions.class);
            DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);
            Pipeline pipeline = Pipeline.create(options);
            PCollection<Struct> records = pipeline.apply("read_from_spanner",
                    SpannerIO.read()
                            .withInstanceId("spanner-to-dataflow-to-bq")
                            .withDatabaseId("the-dude")
                            .withQuery("SELECT * FROM Singers"));
            records.apply("convert-2-bq-row", ParDo.of(new DoFn<Struct, TableRow>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    TableRow row = new TableRow();
                    row.set("id", c.element().getLong("SingerId"));
                    row.set("first", c.element().getString("FirstName"));
                    row.set("last", c.element().getString("LastName"));
                    c.output(row);
                }
            })).apply("write-to-bq", BigQueryIO.writeTableRows()
                    .to(String.format("%s:spanner_to_bigquery.singers", options.getProject()))
                    .withCreateDisposition(CREATE_IF_NEEDED)
                    .withWriteDisposition(WRITE_TRUNCATE)
                    .withSchema(getTableSchema()));
            pipeline.run();
        }
    
        private static TableSchema getTableSchema() {
            List<TableFieldSchema> fields = new ArrayList<>();
            fields.add(new TableFieldSchema().setName("id").setType("INTEGER"));
            fields.add(new TableFieldSchema().setName("first").setType("STRING"));
            fields.add(new TableFieldSchema().setName("last").setType("STRING"));
            return new TableSchema().setFields(fields);
        }
    }
    

     

    enter image description here

      September 29, 2021 1:56 PM IST
    0
  • There are several ways to ingest data into BigQuery:

    • Batch load a set of data records.
    • Stream individual records or batches of records.
    • Use queries to generate new data and append or overwrite the results to a table.
    • Use a third-party application or service.

    Batch loading

    With batch loading, you load the source data into a BigQuery table in a single batch operation. For example, the data source could be a CSV file, an external database, or a set of log files. Traditional extract, transform, and load (ETL) jobs fall into this category.

    Options for batch loading in BigQuery include the following:

    • Load jobs. Load data from Cloud Storage or from a local file by creating a load job. The records can be in Avro, CSV, JSON, ORC, or Parquet format.
    • BigQuery Data Transfer Service. Use BigQuery Data Transfer Service to automate loading data from Google Software as a Service (SaaS) apps or from third-party applications and services.
    • BigQuery Storage Write API. The Storage Write API lets you batch-process an arbitrarily large number of records and commit them in a single atomic operation. If the commit operation fails, you can safely retry the operation. Unlike BigQuery load jobs, the Storage Write API does not require staging the data to intermediate storage such as Cloud Storage.
    • Other managed services. Use other managed services to export data from an external data store and import it into BigQuery. For example, you can load data from Firestore exports.

    Batch loading can be done as a one-time operation or on a recurring schedule. For example, you can do the following:

    • You can run BigQuery Data Transfer Service transfers on a schedule.
    • You can use an orchestration service such as Cloud Composer to schedule load jobs.
    • You can use a cron job to load data on a schedule.

    Streaming

    With streaming, you continually send smaller batches of data in real time, so the data is available for querying as it arrives. Options for streaming in BigQuery include the following:

    • Storage Write API. The Storage Write API supports high-throughput streaming ingestion with exactly-once delivery semantics.
    • Dataflow. Use Dataflow with the Apache Beam SDK to set up a streaming pipeline that writes to BigQuery.

    Generated data

    You can use SQL to generate data and store the results in BigQuery. Options for generating data include:

    • Use data manipulation language (DML) statements to perform bulk inserts into an existing table or store query results in a new table.

    • Use a CREATE TABLE ... AS statement to create a new table from a query result.

    • Run a query and save the results to a table. You can append the results to an existing table or write to a new table. For more information, see Writing query results.

      December 10, 2021 11:15 AM IST
    0