如何使用消息队列 generate/retrieve datalake 中的文件?
How to generate/retrieve file in datalake using message queue?
我有一个 Azure 函数 QueueTrigger1
,它执行函数 executeTemplateProcess
以在 Google 驱动器上上传 tsv 文件并更新 Jira 票证。
我需要创建一个消息队列来在datalake上生成一个tsv文件,运行一个python代码,然后从datalake中检索tsv文件(位置)并将其添加到排队。
我今天对队列有了基本的了解,但我不确定如何在数据湖上生成文件并检索它的位置。我们需要将文件作为输入传递到 python 代码中,这就是为什么我认为我们需要将数据湖上的文件位置排队,但我不确定如何执行此操作。
这是 QueueTrigger1
和 executeTemplateProcess()
的命名空间
namespace DI
{
public class DIProcess
{
public static void executeTemplateProcess(string jiraKey, string jiraIssueType, string jiraSummary, Component component, string jiraDescription)
{
if (rowCount > 0)
{ //python code would run somewhere here following queue process before jira code executes below
string dfileId = CopyTemplate(component.FileId, sheetName);
// stop process if copy template not sucessfull
if (string.IsNullOrEmpty(dfileId))
return;
jira.AddComment("Google File copied.");
// Update JIRA with the web link
webLink = $"https://docs.google.com/spreadsheets/d/{dfileId}";
jira.AddWebLink(webLink, sheetName);
jira.AddComment("Jira weblink added.");
}
else
{
jira.UpdateStatus("Abandoned");
jira.AddComment("Jira status updated to Abandoned.");
}
}
}
}
namespace companyxyzjira.QueueTrigger1
{
public static class JiraQueueTrigger
{
[FunctionName("QueueTrigger1")]
public static void Run([QueueTrigger("companyxyz-jira-dev-am", Connection = "storageaccountcompanyxyzji42f6_STORAGE")]string myQueueItem
, ILogger log, ExecutionContext context)
{
dynamic jira;
string jiraKey;
string jiraIssueType;
string jiraSummary;
string jiraDescription;
string[] jiraComponentNames;
Component jiraComponent;
log.LogInformation("Queue trigger function processing");
jira = JsonConvert.DeserializeObject(myQueueItem);
jiraKey = jira.issue.key;
jiraIssueType = jira.issue.fields.issuetype.name;
jiraSummary = jira.issue.fields.summary;
jiraDescription = jira.issue.fields.description;
try
{
DIProcess.executeTemplateProcess(jiraKey, jiraIssueType, jiraSummary, jiraComponent, jiraDescription);
}
catch (System.Exception e)
{
log.LogError(e.ToString());
log.LogError(e.Message);
log.LogError(e.StackTrace);
}
}
}
}
我想这是我的思路,但我不确定如何与数据湖通信...
[FunctionName("HttpTriggerCSharp")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]
HttpRequest req, [Queue("companyxyz-jira-dev-pm-mapping-done")] ICollector<string> QueueItem, ILogger log)
{
log.LogInformation("HTTP trigger function processed a request.");
string name = req.Query["name"];
string requestBody = String.Empty;
using (StreamReader streamReader = new StreamReader(req.Body))
{
requestBody = await streamReader.ReadToEndAsync();
QueueItem.Add(requestBody); //i think?
}
dynamic data = JsonConvert.DeserializeObject(requestBody);
name = name ?? data?.name;
return name != null
? (ActionResult)new OkObjectResult($"{name}")
: new BadRequestObjectResult("Please pass a name on the query string or in the request body");
}
带有 input/output 文件的数据湖快照(手动上传,但这就是我们从现在开始要自动化的,所以我们需要 generate/retrieve 这些工件 from/to 如上所述的消息队列)
from azure.storage.filedatalake import DataLakeServiceClient
import pandas as pd
connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
service_client = DataLakeServiceClient.from_connection_string(connect_str)
#Put above code out of the body of function.
file_system_client = service_client.get_file_system_client(file_system="test")
directory_client = file_system_client.get_directory_client("test")
file_client = directory_client.create_file("uploaded-file0316.txt")
#Upload to datalake
head = ["col1" , "col2" , "col3"]
l = [[1 , 2 , 3],[4,5,6] , [8 , 7 , 9]]
df = pd.DataFrame (l , columns = head)
data = df.to_csv(index_label="idx", encoding = "utf-8")
output = data.replace(',', '\t')
print(output)
file_client.append_data(data=output, offset=0, length=len(output))
file_client.flush_data(len(output))
#download from datalake
download = file_client.download_file()
content = download.readall()
print(content)
我有一个 Azure 函数 QueueTrigger1
,它执行函数 executeTemplateProcess
以在 Google 驱动器上上传 tsv 文件并更新 Jira 票证。
我需要创建一个消息队列来在datalake上生成一个tsv文件,运行一个python代码,然后从datalake中检索tsv文件(位置)并将其添加到排队。
我今天对队列有了基本的了解,但我不确定如何在数据湖上生成文件并检索它的位置。我们需要将文件作为输入传递到 python 代码中,这就是为什么我认为我们需要将数据湖上的文件位置排队,但我不确定如何执行此操作。
这是 QueueTrigger1
和 executeTemplateProcess()
namespace DI
{
public class DIProcess
{
public static void executeTemplateProcess(string jiraKey, string jiraIssueType, string jiraSummary, Component component, string jiraDescription)
{
if (rowCount > 0)
{ //python code would run somewhere here following queue process before jira code executes below
string dfileId = CopyTemplate(component.FileId, sheetName);
// stop process if copy template not sucessfull
if (string.IsNullOrEmpty(dfileId))
return;
jira.AddComment("Google File copied.");
// Update JIRA with the web link
webLink = $"https://docs.google.com/spreadsheets/d/{dfileId}";
jira.AddWebLink(webLink, sheetName);
jira.AddComment("Jira weblink added.");
}
else
{
jira.UpdateStatus("Abandoned");
jira.AddComment("Jira status updated to Abandoned.");
}
}
}
}
namespace companyxyzjira.QueueTrigger1
{
public static class JiraQueueTrigger
{
[FunctionName("QueueTrigger1")]
public static void Run([QueueTrigger("companyxyz-jira-dev-am", Connection = "storageaccountcompanyxyzji42f6_STORAGE")]string myQueueItem
, ILogger log, ExecutionContext context)
{
dynamic jira;
string jiraKey;
string jiraIssueType;
string jiraSummary;
string jiraDescription;
string[] jiraComponentNames;
Component jiraComponent;
log.LogInformation("Queue trigger function processing");
jira = JsonConvert.DeserializeObject(myQueueItem);
jiraKey = jira.issue.key;
jiraIssueType = jira.issue.fields.issuetype.name;
jiraSummary = jira.issue.fields.summary;
jiraDescription = jira.issue.fields.description;
try
{
DIProcess.executeTemplateProcess(jiraKey, jiraIssueType, jiraSummary, jiraComponent, jiraDescription);
}
catch (System.Exception e)
{
log.LogError(e.ToString());
log.LogError(e.Message);
log.LogError(e.StackTrace);
}
}
}
}
我想这是我的思路,但我不确定如何与数据湖通信...
[FunctionName("HttpTriggerCSharp")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]
HttpRequest req, [Queue("companyxyz-jira-dev-pm-mapping-done")] ICollector<string> QueueItem, ILogger log)
{
log.LogInformation("HTTP trigger function processed a request.");
string name = req.Query["name"];
string requestBody = String.Empty;
using (StreamReader streamReader = new StreamReader(req.Body))
{
requestBody = await streamReader.ReadToEndAsync();
QueueItem.Add(requestBody); //i think?
}
dynamic data = JsonConvert.DeserializeObject(requestBody);
name = name ?? data?.name;
return name != null
? (ActionResult)new OkObjectResult($"{name}")
: new BadRequestObjectResult("Please pass a name on the query string or in the request body");
}
带有 input/output 文件的数据湖快照(手动上传,但这就是我们从现在开始要自动化的,所以我们需要 generate/retrieve 这些工件 from/to 如上所述的消息队列)
from azure.storage.filedatalake import DataLakeServiceClient
import pandas as pd
connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
service_client = DataLakeServiceClient.from_connection_string(connect_str)
#Put above code out of the body of function.
file_system_client = service_client.get_file_system_client(file_system="test")
directory_client = file_system_client.get_directory_client("test")
file_client = directory_client.create_file("uploaded-file0316.txt")
#Upload to datalake
head = ["col1" , "col2" , "col3"]
l = [[1 , 2 , 3],[4,5,6] , [8 , 7 , 9]]
df = pd.DataFrame (l , columns = head)
data = df.to_csv(index_label="idx", encoding = "utf-8")
output = data.replace(',', '\t')
print(output)
file_client.append_data(data=output, offset=0, length=len(output))
file_client.flush_data(len(output))
#download from datalake
download = file_client.download_file()
content = download.readall()
print(content)