Google BigQuery 模式
此页面上的示例展示了使用 BigQueryIO 的常见模式。
- Java SDK
- Python SDK
BigQueryIO 死信模式
在生产系统中,使用 BigQueryIO 将处理期间在 BigQueryIO 中出现错误的任何元素输出到另一个 PCollection 以供进一步处理,实施死信模式非常有用。以下示例打印了错误,但在生产系统中,可以将它们发送到死信表以供日后更正。
使用 STREAMING_INSERTS
时,可以使用 WriteResult
对象访问包含无法插入 BigQuery 的 TableRows
的 PCollection
。如果还设置了 withExtendedErrorInfo
属性,则可以从 WriteResult
访问 PCollection<BigQueryInsertError>
。然后,PCollection
将包含对表的引用、数据行以及 InsertErrors
。添加到死信队列的错误通过 InsertRetryPolicy
确定。
在结果元组中,可以访问 FailedRows
以访问失败的插入。
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);
Pipeline p = Pipeline.create(options);
// Create a bug by writing the 2nd value as null. The API will correctly
// throw an error when trying to insert a null value into a REQUIRED field.
WriteResult result =
p.apply(Create.of(1, 2))
.apply(
BigQueryIO.<Integer>write()
.withSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("num")
.setType("INTEGER")
.setMode("REQUIRED"))))
.to("Test.dummyTable")
.withFormatFunction(x -> new TableRow().set("num", (x == 2) ? null : x))
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
// Forcing the bounded pipeline to use streaming inserts
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
// set the withExtendedErrorInfo property.
.withExtendedErrorInfo()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
result
.getFailedInsertsWithErr()
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
System.out.println(" The table was " + x.getTable());
System.out.println(" The row was " + x.getRow());
System.out.println(" The error was " + x.getError());
return "";
}));
p.run();
/* Sample Output From the pipeline:
<p>The table was GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=Test,projectId=<>, tableId=dummyTable}}
<p>The row was GenericData{classInfo=[f], {num=null}}
<p>The error was GenericData{classInfo=[errors, index],{errors=[GenericData{classInfo=[debugInfo, location, message, reason], {debugInfo=,location=, message=Missing required field: Msg_0_CLOUD_QUERY_TABLE.num., reason=invalid}}],index=0}}
*/
}
# Create pipeline.
schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})
pipeline = beam.Pipeline()
errors = (
pipeline | 'Data' >> beam.Create([1, 2])
| 'CreateBrokenData' >>
beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
"<Your Project:Test.dummy_a_table",
schema=schema,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND'))
result = (
errors['FailedRows']
| 'PrintErrors' >>
beam.FlatMap(lambda err: print("Error Found {}".format(err))))
最后更新于 2024/10/31
您找到所需的一切了吗?
所有内容都实用且清晰吗?您想改变什么吗?请告诉我们!